Arun Dass
09/03/2021, 6:01 AMg.suijker
09/03/2021, 7:54 AMBastian Röhrig
09/03/2021, 9:49 AMcreate_flow_run
and get_task_run_result
. We unit tested the parent flow using mocks to simulate the possible results of the child flow. However, it would be great to have a basic integration test where we ideally only mock the outside dependencies like sql and api calls. Do you have any ideas how to achieve that?Paulo Maia
09/03/2021, 10:44 AMexecution_id = task_run_inference()
task_decision_process(execution_id, config, upstream_tasks=[task_run_inference])
task_append_to_m2m(execution_id, config, upstream_tasks=[task_decision_process])
task_append_to_m2r(execution_id, config, upstream_tasks=[task_decision_process])
task_run_inference_cleanup(execution_id, config, upstream_tasks=[task_append_to_m2r, task_append_to_m2m])
This is returning a weird dependency plot where task_run_inference is pointing to task_append_to_m2r. Do I need to explicitly set all tasks as upstream? wont this create a dependency between them?Qin XIA
09/03/2021, 1:12 PMShyam
09/03/2021, 3:45 PMKevin Mullins
09/03/2021, 4:57 PMKevin Mullins
09/03/2021, 7:52 PMdefault
service account in the prefect
namespace I’m attempting to use:
system:serviceaccount:prefect:default
(403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': '0cb3f7e4-3156-4e9e-b025-5c9deb274813', 'X-Kubernetes-Pf-Prioritylevel-Uid': '96071337-eda2-47ce-9026-95ed0ab85b02', 'Date': 'Fri, 03 Sep 2021 19:49:12 GMT', 'Content-Length': '311'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:prefect:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"prefect\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
I specified SERVICE_ACCOUNT_NAME in the kubernetes manifest but it doesn’t seem to be honoring it. Am I perhaps missing another configuration?Jessica Smith
09/03/2021, 9:29 PMVincent
09/03/2021, 10:07 PMJosiah Berkebile
09/04/2021, 5:24 PMTim Chklovski
09/04/2021, 7:18 PMJulie Sturgeon
09/05/2021, 12:37 AMOmar Sultan
09/05/2021, 12:19 PMOmar Sultan
09/05/2021, 1:51 PMHari Krishna Sunkari
09/06/2021, 8:14 AMBenjamin Rousselière
09/06/2021, 12:24 PMBenjamin Rousselière
09/06/2021, 12:24 PMMichael
09/06/2021, 12:57 PMMichael Hadorn
09/06/2021, 1:36 PMrequests.exceptions.ConnectionError: HTTPConnectionPool(host='host.docker.internal', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f43cca967c0>: Failed to establish a new connection: [Errno 111] Connection refused'))
First time i got this was in 0.14.19, but not in 0.15.4.Tim Pörtner
09/06/2021, 3:34 PM:4200/graphql
were refused. After some trying we gave up and downgraded back to 0.14.15 again (it worked again but we lost the data - flows, flow runs etc.) - no big deal.
I was curious and thought if connections were just refused it shouldn't be too difficult to get it running - so i created my own docker-compose.yaml
file from docker prefect config
and changed some values. I ended up changing the interfaces that the services are listening on (graphql,apollo,ui) to 0.0.0.0 - after that it worked again.
This makes it impossible for us to update our prefect server 😞
I couldn't find anything in the prefecthq/server
repo that could cause those connection problems (at least from the most recent changes) - could someone please look into this?
If you need any more information please let me know 👍
Thanks in advance!Fabio Grätz
09/06/2021, 4:16 PM@task
def load_image_from_bucket(fn):
# Load image img from filepath fn in bucket
return img, fn
@task
def rotate_image(x):
img, fn = x
return img.rotate(90), fn
@task
def save_image_to_bucket(x):
# Save rotated image to new bucket
The flow looks as follows:
with Flow(
'Load images from bucket',
run_config=KubernetesRun(),
executor=DaskExecutor(...)
):
# Get list of image uris `imgs`
loaded_images = load_image_from_bucket.map(imgs)
rotated_images = rotate_image.map(loaded_images)
save_image_to_bucket.map(rotated_images)
According to the docs, this should create parallel pipelines (without reducing after getting e.g. loaded_images
):
However, if a mapped task relies on another mapped task, Prefect does not reduce the upstream result. Instead, it connects the nth upstream child to the nth downstream child, creating independent parallel pipelines.
From what I observe when using the `DaskExecutor(…)`and applying the above flow to 500 images, all workers only download images at first and only start rotating once all have been loaded. It either takes multiple minutes until they start rotating and uploading but often the flow just crashes before the flow reaches that point.
I would want depth-first execution: a worker downloads an image, rotates it, pushes it to the bucket, forgets about this image and moves on to the next one.
This appears to be (roughly) the case when running with the following config using executor=LocalDaskExecutor()
instead of an ephemeral dask cluser in k8s:
with Flow("name") as flow:
...
state = flow.run(executor=LocalDaskExecutor())
Transforming 500 images in a bucket then takes only ~50s.
Is there a way to influence the order in which the tasks are executed when using the DaskExecutor with an ephemeral dask cluster in k8s?
Thanks a lot!Fabio Grätz
09/06/2021, 5:25 PM//anaconda3/envs/sandbox/lib/python3.8/site-packages/distributed/worker.py:3862: UserWarning: Large object of size 1.65 MiB detected in task graph:
{'task': <Task: load_image_from_bucket>, 'state': ... _parent': True}
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
Is it a bad idea to pass e.g. images around between prefect tasks?Jeffery Newburn
09/06/2021, 5:29 PMwith Flow("ROI Report Generator") as flow:
start_date = Parameter(name="start_date")
end_date = Parameter(name="end_date")
to_list = Parameter(name="to_list")
email_domain = Parameter(name="email_domain", default="")
account_uuid = Parameter(name="account_uuid")
Run:
state = flow.run(
account_uuid=527132950,
start_date="2020-11-01",
email_domain="",
end_date=None,
to_list=["<mailto:jeff.newburn@logikcull.com|jeff.newburn@logikcull.com>"],
)
Error:
TypeError: run() got an unexpected keyword argument 'account_uuid'
Can someone help me understand what is going on here? Why do Parameters have to be used in a flow for the flow not to explode?Blake List
09/07/2021, 2:09 AMa
, b
, and c
started within parent flow p
using startflowrun
, how can I trigger (or not trigger) flow c
, given the states of flows (startflowrun tasks) a
and b
? Within flows a
and b
, there is a task to skip downstream tasks (raising a SKIP signal) based on a condition. Thanks in advance.Omar Sultan
09/07/2021, 6:17 AMlivy_url = f"https://{host}:{port}/gateway/default/livy/v1"
host and port are referring to variables that have the following code: os.environ.get('LIVY_HOST')
and os.environ.get('LIVY_PORT')
the livy_url is getting the variables from my local machine when the flow is being built. and thus the values for host and port are incorrect.Trevor Campbell
09/07/2021, 6:34 AMСергей Романкевич
09/07/2021, 8:48 AMChhaya Vankhede
09/07/2021, 8:54 AMdeploying-->completed deployment-->returned with exit code -11!
, Attaching the screenshot. What is the exit code -11!
here and how can I solve this issue.Marko Herkaliuk
09/07/2021, 9:18 AMimage=f"project_name/prefect:etl_tools-{sha}"
where sha is a git commit SHA. And I have a registration bash file that call prefect register --project etl -p "/flows/etl/**/**.py"
. Obvious that every commit to repo change SHA, change the image, and registration process bump version for all flows. There is a way to NOT bump all version and exclude image name from the idempotency key?