Robin
07/15/2020, 5:11 PMAWS secrets manager
with prefect?
1️⃣ Fetch the credentials within each task
2️⃣ Create a task that fetches the credentials once
3️⃣ another option
Which one is considered most save?
We are aware of Prefect secrets, but would also like to save all secrets at one place, e.g. AWS secrets manager…Ankit
07/15/2020, 6:01 PMDask executor
along with prefect ui+server. My prefect ui+server and dask are all on docker containers via a docker-compose file, belonging to the same network. I start up my agent, which is a local agent on a docker container and use flow.register()
to register my flow. When I try to set the executor in flow.environment , I get an error. Not sure how to proceed here.
This is how I am setting the dask executor:
client = Client("scheduler:8786")
dask_executor = DaskExecutor(address=client.scheduler.address)
flow.environment = LocalEnvironment(executor=dask_executor)
flow.register()
The error that I get is :
flow.environment = LocalEnvironment(executor=dask_executor)
TypeError: __init__() got an unexpected keyword argument 'executor'
I am also getting a warning:
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
Gary
07/16/2020, 3:24 AM@task
def get_stock_id_list():
# query stock id list from database
stock_id_list = query_stock_id_list_from_db()
return stock_id_list
@task
def crawl_stock_data_from_external_website(stock_id):
# Crawler related work
return crawl_stock_data(stock_id)
@task
def perform_calculation(crawled_data):
# Some calculation with Pandas
perform_some_calculation(crawled_data)
with Flow('Example flow') as flow:
# The number of stock id in list is about ten thousand.
stock_id_list = get_stock_id_list()
crawled_data_list = crawl_stock_data_from_external_website.map(stock_id_list)
perform_calculation.map(crawled_data_list)
Is it okay for Prefect to generate about 10,000~50,000 mapped tasks within the above flow without any problem?
Another scenario is to generate about 1 million tasks within a flow. In this scenario, we query user id list from our database, and perform some calculation about user behavior analysis, (One user is mapped to one mapped task.)
Is it okay? Or is there a better way to do this?simone
07/16/2020, 8:27 AMprefect.context
and avoid the file loading. Is there any reason why I should not have the config data added to the context. Thanks a lot!Arlo Bryer
07/16/2020, 9:30 AMArlo Bryer
07/16/2020, 9:31 AMIain Dillingham
07/16/2020, 11:27 AMAlfie
07/16/2020, 12:20 PMAlfie
07/16/2020, 12:20 PMAvi A
07/16/2020, 12:27 PMOpenBLAS blas_thread_init: pthread_create failed for thread 29 of 32: Resource temporarily unavailable
OpenBLAS blas_thread_init: RLIMIT_NPROC 837332 current, 837332 max
OpenBLAS blas_thread_init: pthread_create failed for thread 30 of 32: Resource temporarily unavailable
OpenBLAS blas_thread_init: RLIMIT_NPROC 837332 current, 837332 max
OpenBLAS blas_thread_init: pthread_create failed for thread 31 of 32: Resource temporarily unavailable
OpenBLAS blas_thread_init: RLIMIT_NPROC 837332 current, 837332 max
_run_code(code, mod_globals, init_globals,
File "/usr/lib/python3.8/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/home/avi/.virtualenvs/prefect/bin/prefect", line 5, in <module>
from prefect.cli import cli
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/__init__.py", line 8, in <module>
import prefect.triggers
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/triggers.py", line 48, in <module>
from prefect.engine import signals
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/usr/lib/python3.8/multiprocessing/spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "/usr/lib/python3.8/multiprocessing/spawn.py", line 125, in _main
prepare(preparation_data)
File "/usr/lib/python3.8/multiprocessing/spawn.py", line 236, in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "/usr/lib/python3.8/multiprocessing/spawn.py", line 287, in _fixup_main_from_path
main_content = runpy.run_path(main_path,
File "/usr/lib/python3.8/runpy.py", line 262, in run_path
return _run_module_code(code, init_globals, run_name,
File "/usr/lib/python3.8/runpy.py", line 95, in _run_module_code
_run_code(code, mod_globals, init_globals,
File "/usr/lib/python3.8/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/home/avi/.virtualenvs/prefect/bin/prefect", line 5, in <module>
from prefect.cli import cli
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/__init__.py", line 8, in <module>
import prefect.triggers
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/triggers.py", line 48, in <module>
from prefect.engine import signals
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/engine/__init__.py", line 8, in <module>
from prefect.engine.flow_runner import FlowRunner
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 13, in <module>
from prefect.core import Edge, Flow, Task
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/core/__init__.py", line 4, in <module>
from prefect.core.flow import Flow
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/core/flow.py", line 40, in <module>
from prefect.environments import Environment
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/environments/__init__.py", line 1, in <module>
from prefect.environments.execution import (
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/environments/execution/__init__.py", line 8, in <module>
from prefect.environments.execution.dask import DaskKubernetesEnvironment
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/environments/execution/dask/__init__.py", line 2, in <module>
from prefect.environments.execution.dask.cloud_provider import (
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/environments/execution/dask/cloud_provider.py", line 5, in <module>
from distributed.deploy.cluster import Cluster
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/__init__.py", line 3, in <module>
from .actor import Actor, ActorFuture
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/actor.py", line 6, in <module>
from .client import Future, default_client
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/client.py", line 43, in <module>
from .batched import BatchedSend
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/batched.py", line 8, in <module>
from .core import CommClosedError
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/core.py", line 18, in <module>
from .comm import (
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/comm/__init__.py", line 1, in <module>
from .addressing import (
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/comm/addressing.py", line 5, in <module>
from ..utils import get_ip_interface
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/utils.py", line 97, in <module>
mp_context = _initialize_mp_context()
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/utils.py", line 88, in _initialize_mp_context
importlib.import_module(pkg)
File "/usr/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/numpy/__init__.py", line 142, in <module>
from . import core
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/numpy/core/__init__.py", line 24, in <module>
from . import multiarray
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/numpy/core/multiarray.py", line 14, in <module>
from . import overrides
File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/numpy/core/overrides.py", line 7, in <module>
from numpy.core._multiarray_umath import (
KeyboardInterrupt
Avi A
07/16/2020, 12:27 PMDaskExecutor
in the past but it was even worse because every now and then the execution froze and I stopped receiving logs so I left it for the time being because it’s enough for me to run on a strong machine and don’t need a cluster yet.Avi A
07/16/2020, 12:27 PMLocalDaskExecutor
. It has been working well for a while but I’m getting the following errors in the last couple of days, in the middle of running a long flow. IDK what changed, but it seems that the process is unable to spawn a new thread of something. So I reset the server to make sure that stale processes/threads don’t exist in the system, but I’m still getting these.Sven Teresniak
07/16/2020, 12:50 PM$DASK_DISTRIBUTED__SCHEDULER__BLOCKED_HANDLERS
?Sven Teresniak
07/16/2020, 1:49 PMChris Goddard
07/16/2020, 3:19 PMrun
method of DbtShellTask doesn't return the result of the superclass's run method.Sven Teresniak
07/16/2020, 4:33 PMprefect run serer --name flowname
but I need to set a parameter for this run. Is this possible?Luke Orland
07/16/2020, 5:26 PMJennifer Meng
07/16/2020, 8:14 PMmutation($parameters: JSON!) {create_flow_run(input: { flow_id: "my_flow_id", parameters: $parameters }) {id}}
{"parameters":{"param1":"my_param1"}}
Argument 'input' has invalid value {flow_id: \"my_flow_id\", parameters: $parameters}.
Matt Allen
07/16/2020, 11:38 PMAlfie
07/17/2020, 5:39 AMMac Gréco Péralte Chéry
07/17/2020, 5:41 AMmutation {
setFlowScheduleState(input: { flow_id: "<flow id>", set_active: true }) {
success
}
}
but i get this error:
{
"graphQLErrors": [],
"networkError": {
"name": "ServerError",
"response": {},
"statusCode": 400,
"result": {
"errors": [
{
"message": "Cannot query field \"setFlowScheduleState\" on type \"Mutation\". Did you mean \"set_schedule_active\", \"set_flow_run_states\", or \"set_schedule_inactive\"?",
"locations": [
{
"line": 2,
"column": 3
}
],
"extensions": {
"code": "GRAPHQL_VALIDATION_FAILED"
}
}
]
}
},
"message": "Network error: Response not successful: Received status code 400"
}
Sumant Agnihotri
07/17/2020, 7:34 AMdef main():
# list of arguments
ls = [1,2,3,4]
# start a thread for calculate method for each arg in ls
for each in ls:
thread(each)
def thread(arg):
t = threading.Thread(target=calculate, args=(arg))
t.start()
def calculate(*args):
# does time consuming calculations and pushes the result in the db
I'm running calculate
method in a python thread, so that multiple threads can run the calculations. Issue is, if ls
is too long, I crash the server (maybe due to too many threads).
I want to do this using prefect-dask, but I'm not sure how to do it, or where to look for examples. Any help will be much appreciated. Thanks.Sven Teresniak
07/17/2020, 8:47 AMrun.py
from the CLI code?
client = Client()
result = client.graphql(query)
I don't want to use prefect run server …
from a shell task. I like it pythonic
https://docs.prefect.io/api/latest/client/client.html seems perfect. But it states Client for communication with Prefect Cloud
On the other hand: prefect CLI uses the Client
class for non-cloud schedules as well… 🙂Matthias
07/17/2020, 9:44 AMflow.register()
and run via UI. When run via flow.run()
it does not store anything. What do I have to do to not store anything? This keeps filling up storage on my server. It is just an example, it seems to me to be related to the mapping.
from prefect import Flow, task
from string import ascii_lowercase
@task(checkpoint=False)
def extract_things(c):
return c
with Flow(
"Do it",
) as flow:
extract_things.map(list(ascii_lowercase))
Klemen Strojan
07/17/2020, 11:35 AMmutation {
delete_flow(input: {flow_id: "e1bd5dfa-3761-41ca-ac82-2104cf5157c1"}) {
success,
error
}
}
Here is the error:
{
"graphQLErrors": [
{
"path": [
"delete_flow"
],
"message": "Operation timed out",
"extensions": {
"code": "API_ERROR"
}
}
],
"networkError": null,
"message": "GraphQL error: Operation timed out"
}
I was unable to delete it in the UI (the older versions were deleted though). We are using Cloud. Any ideas?bruno.corucho
07/17/2020, 12:26 PM@task
def hello_world():
time.sleep(15)
logger = prefect.context.get("logger")
logger.info("I WOKE UP, GUYS!!!")
with Flow("strdata-test") as test:
hello_world()
hello_world()
hello_world()
hello_world()
hello_world()
for n in range(4):
test.storage = Docker(registry_url=os.environ["REGISTRY_URL"],
dockerfile="deploy/Dockerfile",
image_name="strdata-flow"
)
test.environment = DaskKubernetesEnvironment(max_workers=4)
test.register(project_name=os.environ["PROJECT_NAME"])
Output:Matt Wong-Kemp
07/17/2020, 12:27 PMunsupported pickle protocol: 5
as an error on deserializing a flow inside an image that has cloudpickle==1.2.2
in it. FWIW it serialized fine from inside the same image.karteekaddanki
07/17/2020, 12:53 PMFlowRunTask
expected to update the runs of the child flow in prefect cloud? I'm not seeing it. I'm trying to implement backfill functionality using FlowRunTask
inside a generic wrapper flow. Any help on the idiomatic way to achieve backfill functionality would be appreciated.Nakul Goyal
07/17/2020, 1:28 PMBernard Greyling
07/17/2020, 2:07 PMAWS_CREDENTIALS
to dict as is did in 0.12.3
.
eg.
cat <<EOF > ~/.prefect/config.toml
[context.secrets]
AWS_CREDENTIALS='{"ACCESS_KEY": "${AWS_ACCESS_KEY_ID}", "SECRET_ACCESS_KEY": "${AWS_SECRET_ACCESS_KEY}"}'
DASK_GATEWAY_URL="${DASK_GATEWAY_URL}"
SLACK_WEBHOOK_URL="${SLACK_WEBHOOK_URL}"
[flows]
CHECKPOINTING=true
EOF
...
>> flow.storage = S3(<>)
>> flow.register('Report')
/usr/local/lib/python3.7/site-packages/prefect/utilities/aws.py in get_boto_client(resource, credentials, use_session, **kwargs)
36 else:
37 ctx_credentials = prefect.context.get("secrets", {}).get("AWS_CREDENTIALS", {})
---> 38 aws_access_key = ctx_credentials.get("ACCESS_KEY")
39 aws_secret_access_key = ctx_credentials.get("SECRET_ACCESS_KEY")
40 aws_session_token = ctx_credentials.get("SESSION_TOKEN")
AttributeError: 'str' object has no attribute 'get'
Does line 37 need something like:
ctx_credentials = json.load(prefect.context.get("secrets", {}).get("AWS_CREDENTIALS", {}))
or perhaps a schema validated load like:
from marshmallow import Schema, fields
class AWS_CREDENTIALS_Schema(Schema):
ACCESS_KEY = fields.Str(required=True)
SECRET_ACCESS_KEY = fields.Str(required=True)
SESSION_TOKEN = fields.Str(required=False)
...
schema = AWS_CREDENTIALS_Schema()
ctx_credentials = schema.load(prefect.context.get("secrets", {}).get("AWS_CREDENTIALS", {}))