Abhas P
09/02/2021, 6:21 PM@task
def transform(df):
return transformed_df
with Flow("transform-1") as flow:
transform(df). # current implementation
for each df in filtered_dfs: # suggested way to utilize paralellization optimization on tasks
transform(df)
1. Will breaking down the data frame into small chunks and the calling the transformation task on each batch of the dataframe help me reap the benefits of prefect parellelization using a dask executor?
2. Given that the input to a transform task is a large dataframe - what other steps can I consider to optimize the turn around time of the flow ? (suggest using different run configs and prefect paradigms of writing the code)Jeremy Yeo
09/02/2021, 8:24 PMdirs = ["dir1", "dir2", "dir3"]
commands = [f"rm -rf {d}" for d in dirs]
names = [{"name": f"removing dir {d}"} for d in dirs]
shell = ShellTask()
with Flow as f:
shell.map(command = commands, task_args={"name": "removing dir"})
# shell.map(command = commands, task_args={"name": dirs}) # not working
# shell.map(command = commands, task_args=names) # not working
f.run()
Possibly not getting some syntax right. Thanks.Brad
09/03/2021, 1:44 AMBrad I
09/03/2021, 5:41 AMDaskExecutor
not getting the env vars set in the run config, this has caused other issues in the way we have been deploying flows to different environments (test, dev, prod, etc). Previously, using the LocalDaskExecutor
, we could set the executor on the flow itself and it was generic enough to work across all of our environments. Now we also need to change the dask config depending on our target environment, but it seems like setting the executor outside of the with Flow()
definition just gets ignored. I believe this has something to do with the storage
and run_config
being serialized with the flow but the executor
is not right now. An example flow and registration script is attached inside this thread. Does anyone know of a work around for this issue or have a better way to do this?Arun Dass
09/03/2021, 6:01 AMg.suijker
09/03/2021, 7:54 AMBastian Röhrig
09/03/2021, 9:49 AMcreate_flow_run
and get_task_run_result
. We unit tested the parent flow using mocks to simulate the possible results of the child flow. However, it would be great to have a basic integration test where we ideally only mock the outside dependencies like sql and api calls. Do you have any ideas how to achieve that?Paulo Maia
09/03/2021, 10:44 AMexecution_id = task_run_inference()
task_decision_process(execution_id, config, upstream_tasks=[task_run_inference])
task_append_to_m2m(execution_id, config, upstream_tasks=[task_decision_process])
task_append_to_m2r(execution_id, config, upstream_tasks=[task_decision_process])
task_run_inference_cleanup(execution_id, config, upstream_tasks=[task_append_to_m2r, task_append_to_m2m])
This is returning a weird dependency plot where task_run_inference is pointing to task_append_to_m2r. Do I need to explicitly set all tasks as upstream? wont this create a dependency between them?Qin XIA
09/03/2021, 1:12 PMShyam
09/03/2021, 3:45 PMKevin Mullins
09/03/2021, 4:57 PMKevin Mullins
09/03/2021, 7:52 PMdefault
service account in the prefect
namespace I’m attempting to use:
system:serviceaccount:prefect:default
(403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': '0cb3f7e4-3156-4e9e-b025-5c9deb274813', 'X-Kubernetes-Pf-Prioritylevel-Uid': '96071337-eda2-47ce-9026-95ed0ab85b02', 'Date': 'Fri, 03 Sep 2021 19:49:12 GMT', 'Content-Length': '311'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:prefect:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"prefect\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
I specified SERVICE_ACCOUNT_NAME in the kubernetes manifest but it doesn’t seem to be honoring it. Am I perhaps missing another configuration?Jessica Smith
09/03/2021, 9:29 PMVincent
09/03/2021, 10:07 PMJosiah Berkebile
09/04/2021, 5:24 PMTim Chklovski
09/04/2021, 7:18 PMJulie Sturgeon
09/05/2021, 12:37 AMOmar Sultan
09/05/2021, 12:19 PMOmar Sultan
09/05/2021, 1:51 PMHari Krishna Sunkari
09/06/2021, 8:14 AMBenjamin Rousselière
09/06/2021, 12:24 PMBenjamin Rousselière
09/06/2021, 12:24 PMMichael
09/06/2021, 12:57 PMMichael Hadorn
09/06/2021, 1:36 PMrequests.exceptions.ConnectionError: HTTPConnectionPool(host='host.docker.internal', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f43cca967c0>: Failed to establish a new connection: [Errno 111] Connection refused'))
First time i got this was in 0.14.19, but not in 0.15.4.Tim Pörtner
09/06/2021, 3:34 PM:4200/graphql
were refused. After some trying we gave up and downgraded back to 0.14.15 again (it worked again but we lost the data - flows, flow runs etc.) - no big deal.
I was curious and thought if connections were just refused it shouldn't be too difficult to get it running - so i created my own docker-compose.yaml
file from docker prefect config
and changed some values. I ended up changing the interfaces that the services are listening on (graphql,apollo,ui) to 0.0.0.0 - after that it worked again.
This makes it impossible for us to update our prefect server 😞
I couldn't find anything in the prefecthq/server
repo that could cause those connection problems (at least from the most recent changes) - could someone please look into this?
If you need any more information please let me know 👍
Thanks in advance!Fabio Grätz
09/06/2021, 4:16 PM@task
def load_image_from_bucket(fn):
# Load image img from filepath fn in bucket
return img, fn
@task
def rotate_image(x):
img, fn = x
return img.rotate(90), fn
@task
def save_image_to_bucket(x):
# Save rotated image to new bucket
The flow looks as follows:
with Flow(
'Load images from bucket',
run_config=KubernetesRun(),
executor=DaskExecutor(...)
):
# Get list of image uris `imgs`
loaded_images = load_image_from_bucket.map(imgs)
rotated_images = rotate_image.map(loaded_images)
save_image_to_bucket.map(rotated_images)
According to the docs, this should create parallel pipelines (without reducing after getting e.g. loaded_images
):
However, if a mapped task relies on another mapped task, Prefect does not reduce the upstream result. Instead, it connects the nth upstream child to the nth downstream child, creating independent parallel pipelines.
From what I observe when using the `DaskExecutor(…)`and applying the above flow to 500 images, all workers only download images at first and only start rotating once all have been loaded. It either takes multiple minutes until they start rotating and uploading but often the flow just crashes before the flow reaches that point.
I would want depth-first execution: a worker downloads an image, rotates it, pushes it to the bucket, forgets about this image and moves on to the next one.
This appears to be (roughly) the case when running with the following config using executor=LocalDaskExecutor()
instead of an ephemeral dask cluser in k8s:
with Flow("name") as flow:
...
state = flow.run(executor=LocalDaskExecutor())
Transforming 500 images in a bucket then takes only ~50s.
Is there a way to influence the order in which the tasks are executed when using the DaskExecutor with an ephemeral dask cluster in k8s?
Thanks a lot!Fabio Grätz
09/06/2021, 5:25 PM//anaconda3/envs/sandbox/lib/python3.8/site-packages/distributed/worker.py:3862: UserWarning: Large object of size 1.65 MiB detected in task graph:
{'task': <Task: load_image_from_bucket>, 'state': ... _parent': True}
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
Is it a bad idea to pass e.g. images around between prefect tasks?Jeffery Newburn
09/06/2021, 5:29 PMwith Flow("ROI Report Generator") as flow:
start_date = Parameter(name="start_date")
end_date = Parameter(name="end_date")
to_list = Parameter(name="to_list")
email_domain = Parameter(name="email_domain", default="")
account_uuid = Parameter(name="account_uuid")
Run:
state = flow.run(
account_uuid=527132950,
start_date="2020-11-01",
email_domain="",
end_date=None,
to_list=["<mailto:jeff.newburn@logikcull.com|jeff.newburn@logikcull.com>"],
)
Error:
TypeError: run() got an unexpected keyword argument 'account_uuid'
Can someone help me understand what is going on here? Why do Parameters have to be used in a flow for the flow not to explode?Blake List
09/07/2021, 2:09 AMa
, b
, and c
started within parent flow p
using startflowrun
, how can I trigger (or not trigger) flow c
, given the states of flows (startflowrun tasks) a
and b
? Within flows a
and b
, there is a task to skip downstream tasks (raising a SKIP signal) based on a condition. Thanks in advance.Omar Sultan
09/07/2021, 6:17 AMlivy_url = f"https://{host}:{port}/gateway/default/livy/v1"
host and port are referring to variables that have the following code: os.environ.get('LIVY_HOST')
and os.environ.get('LIVY_PORT')
the livy_url is getting the variables from my local machine when the flow is being built. and thus the values for host and port are incorrect.Omar Sultan
09/07/2021, 6:17 AMlivy_url = f"https://{host}:{port}/gateway/default/livy/v1"
host and port are referring to variables that have the following code: os.environ.get('LIVY_HOST')
and os.environ.get('LIVY_PORT')
the livy_url is getting the variables from my local machine when the flow is being built. and thus the values for host and port are incorrect.Julian
09/07/2021, 8:34 AMOmar Sultan
09/07/2021, 9:00 AM