itamar
05/31/2021, 9:51 AMAmit Gal
05/31/2021, 10:43 AMflow_a = StartFlowRun(flow_name="A", project_name="examples", wait=True)
flow_b = StartFlowRun(flow_name="B", project_name="examples", wait=True)
## Flow of flows
with Flow("parent-flow") as flow:
flow_b.set_upstream(flow_a)
flow.run()
as far as I can see, running this script waits with the scheduling of flow_b
, until flow_a
is completed. Is there a way to schedule them both, but wait with running flow_b
until flow_a
is completed?
My use case is I use a VM to run the flows, but my local machine (or phone) to run a small script that schedules them ad-hoc. flow_a
is often long, while flow_b
is a short one - and so as it currently stands any issues with my local machine might interrupt scheduling flow_b
.
There are a couple of ways I can imagine to solve this, but I wanted to make sure if my understanding is correct, or if there is a way to schedule both at and create a dependency for the running
of the flow?Jacob Blanco
05/31/2021, 10:51 AMfabian wolfmann
05/31/2021, 3:19 PMNikola Lusic
05/31/2021, 10:05 PMfrom prefect import Flow, task
from prefect.executors import LocalDaskExecutor
@task
def process_key(key: str):
import time
time.sleep(3)
return key
with Flow("process_keys") as flow:
processed_keys = process_key.map([1,2,3])
if __name__ == '__main__':
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=3)
flow.run()
This is the output:
[2021-06-01 00:01:36+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'process_keys'
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key': Starting task run...
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key': Finished task run for task with final state: 'Mapped'
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Starting task run...
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Starting task run...
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Starting task run...
[2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Finished task run for task with final state: 'Success'
[2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Finished task run for task with final state: 'Success'
[2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Finished task run for task with final state: 'Success'
[2021-06-01 00:01:39+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
However, if I change the scheduler from threads
to processes
, it seems the parallel execution is substituted with a serial execution:
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
@task
def process_key(key: str):
import time
time.sleep(3)
return key
with Flow("process_keys") as flow:
processed_keys = process_key.map([1,2,3])
if __name__ == '__main__':
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=3)
flow.run()
This is the new output:
[2021-06-01 00:04:09+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'process_keys'
[2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key': Starting task run...
[2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key': Finished task run for task with final state: 'Mapped'
[2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Starting task run...
[2021-06-01 00:04:12+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Finished task run for task with final state: 'Success'
[2021-06-01 00:04:12+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Starting task run...
[2021-06-01 00:04:15+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Finished task run for task with final state: 'Success'
[2021-06-01 00:04:15+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Starting task run...
[2021-06-01 00:04:18+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Finished task run for task with final state: 'Success'
[2021-06-01 00:04:18+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
I cannot get any scheduler=processes
flow to actually run in parallel, is there something I'm missing?Jason Prado
05/31/2021, 10:57 PMSchedule(
clocks=[
IntervalClock(
start_date=pendulum.datetime(2021, 1, 1, 9, tz="America/New_York"),
interval=timedelta(days=1),
),
],
)
Aurélien Vallée
06/01/2021, 4:52 AM13:58 -> Collect data
13:59 -> Collect data
14:00 -> Collect data + generate analytics
14:01 -> Collect data
Aurélien Vallée
06/01/2021, 4:54 AMFlowStartRun
of "collect data" before "generate analytics" they would be collected two times right?Aurélien Vallée
06/01/2021, 4:55 AMAurélien Vallée
06/01/2021, 4:56 AMNikola Lusic
06/01/2021, 10:25 AMECSRun
configuration combined with LocalDaskExecutor(scheduler="processes", num_workers=4)
support parallel execution of mapped tasks?
Currently I'm unable to get the ECS task to spawn any additional processes - all are run in sequence (first image).
When running the same flow on the local Prefect environment, the tasks are all done in parallel (second image).
If I use LocalDaskExecutor(scheduler="threads", num_workers=4)
, the flow tasks are executed in parallel, but threaded flow only covers part of our use cases.Howard Cornwell
06/01/2021, 1:45 PMFailed to load and execute Flow's environment: TypeError("default() got an unexpected keyword argument 'default_scopes'")
I tried out some already-deployed flows, they run fine. But if I re-deploy them they start raising the same error.
Running 0.13.19
in the container and on the server. Any advice would be great!Zach Schumacher
06/01/2021, 3:18 PMCarlos Gutierrez
06/01/2021, 3:21 PMflow.serialized_hash()
for flow change detection in automated flow register processes. I found out that whenever a flow is registered with a particular task, lets say task_A
, and then I update the values of the parameters passed to the task (imagine for instance task_A(var='typo_string') --> task_A(var='correct_string')
), the serialized_hash()
will remain invariant and thus the flow will not work according to the last changes because it will not bump a new version to the server. Would like to know if there is a better way to do this or I might be using the wrong approachRaúl Mansilla
06/01/2021, 4:11 PMFailed to load and execute Flow's environment: ModuleNotFoundError("No module named 'ecs_test'")
Shea O'Rourke
06/01/2021, 5:05 PMAlex Furrier
06/01/2021, 8:13 PMtest-flow-of-flows:
@echo 'Running flow A'
@python flows/flow_a.py \
param1=foo \
param2=bar
@python flows/flow_b.py \
param1=foo \
param2=bar
@python flows/flow_c.py \
param1=baz \
param2=bar
I would like to combine those into a single Flow run with shared parameters passed to the flow runs. That seems to be what's described in this documentation. That mentions registering flows using the orchestration API to specific projects. So far I've been running flows without that. Is there any way to create a flow of flows with local importing of flows or do they have to be registered with the orchestration API to do so?Nathan Atkins
06/01/2021, 10:11 PMafter
into the schedule as part of flow.run()
. I have hacked flow._run()
to support this.
2. Something past my Python knowledge is causing CronClock.events()
to do something weird when the yield returns. This causes the execution to drop directly out of the while loop and exit the method. Each new call to schedule.next()
winds up creating a new croniter
and running the same event start_date again.
If I build the clock directly and call it’s next on the iterable returned events()
it works as I would expect.Ben Collier
06/02/2021, 4:24 AMBen Collier
06/02/2021, 4:36 AMrequests
in python uses its own bundle and set REQUESTS_CA_BUNDLE
. All now good.Amanda Wee
06/02/2021, 9:19 AM=
to the root of the prefect repository:
https://github.com/PrefectHQ/prefect/pull/4499/files
Is that intentional?Snehotosh
06/02/2021, 11:59 AMash
06/02/2021, 12:31 PMprefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured
Romain
06/02/2021, 1:07 PMTomasz Szuba
06/02/2021, 2:20 PMTomasz Szuba
06/02/2021, 2:21 PMTomasz Szuba
06/02/2021, 2:23 PMdef run(self, parameter):
result="hello"
raise signals.SUCCESS(
message=f'{parameter}',
result=result
)
But this make result to be None in another task that depends on this resultPedro Henrique
06/02/2021, 2:38 PMPeter Roelants
06/02/2021, 2:57 PMflow_a = FlowRunTask(flow_name='flow_a', wait=True)
@task
def task_b(param_b):
...
with Flow('flow_c') as flow:
param_a = Parameter('param_a')
result_a = flow_a(parameters={'param_a': param_a})
result_b = task_b(result_a)
However I noticed people creating some workarounds. However, I want to avoid setting a persisted result, and want to share a serializable result directly to the next task.
I was wondering: Is there a canonical way to add a result to a flow, get that result from a StartFlowRun, and use that result in a following task?itay livni
06/02/2021, 3:19 PMitay livni
06/02/2021, 3:19 PMKevin Kho
06/02/2021, 3:58 PMitay livni
06/02/2021, 4:23 PM