Nejc Vesel
09/25/2020, 7:18 AMwith Flow('flow-a') as flowA:
paramsFlowA = Parameter('flow-a-param', default=...)
< Does something here >
with Flow('flow-b') as flowB:
paramsFlowB = Parameter('flow-b-param', default=...)
< Does something here>
with Ffow('combined-flow') as combined_flow:
flow_a = FlowRunTask(flow_name='flow-a', project_name='test')
flow_b = FlowRunTask(flow_name='flow-b', project_name='test)
result = flow_b(upstream_tasks[flow_a])
When I deploy the combined_flow to the server, I can't set the parameters for FlowA and FlowB. Is it possible to do so and how?
Thanks!emre
09/25/2020, 8:47 AMFlowRunTask
accepts a parameters
argument, both at init time and run-time. You could provide parameters from the combined flow that way.
Also keep in mind that all FlowRunTask
does is schedule a flow run on the prefect server/cloud. It will not run flow_a or flow_b within the combined flow. If you want to do that, you can just define and run your subflows in a task.Nejc Vesel
09/25/2020, 11:30 AM@task
def run_flow1(params):
flow1.run(params={'params': params})
with Flow('flow-running-flow') as flow:
p = Parameter('flow1_param')
result = run_flow1(p)
While this works when I run it locally (i.e. flow.run()), when I deploy it to server and run there, it doesnt seem to work. The tasks inside flow1 don't seem to run, even though no error is returned.
Furthermore, unless I specify the executor=LocalExecutor()
in the flow1.run()
call, I get an error saying, AssertionError: daemonic processes are not allowed to have children
My question thus is two fold:
1. What could be the reason, that this works locally but not when deployed
2. Is this compatible with other executors other than the LocalExecutor
Thanks again for the help.emre
09/25/2020, 12:27 PMFlowRunTask
and let prefect run and monitor flow1
separately.
1. My first instinct is to ask: where is flow1
defined? When running on core (locally), you can define things outside of the flow definition and use them in tasks, much like normal python. But when running with server your main flow and its tasks will be serialized, and executed in a different python context. Variables that aren’t initialized in tasks are left as references. I assume flow1
is unavailable, or empty when the flow is deployed and executed by the server.
2. Dask probably doesn’t permit the processes it manages to create children processes. The LocalExecutor()
works since it runs the flow in the calling process, but DaskExecutor()
creates a dask cluster, i. e. some processes of its own.
As you can see, running flows inside tasks can get hairy pretty quickly, I didn’t know that either 😅DaskExecutor(address=...)
(when calling flow1.run()
). This config uses an existing cluster rather than creating a new one.
Just some ideas, I don’t know how well they will work.