https://prefect.io logo
n

Nejc Vesel

09/25/2020, 7:18 AM
Hi , I have a question about combining parametrized flow runs into one flow. Assume this pseudocode:
Copy code
with Flow('flow-a') as flowA:
    paramsFlowA = Parameter('flow-a-param', default=...)
   < Does something here > 
with Flow('flow-b') as flowB:
   paramsFlowB = Parameter('flow-b-param', default=...) 
  < Does something here> 
with Ffow('combined-flow') as combined_flow: 
   flow_a = FlowRunTask(flow_name='flow-a', project_name='test') 
   flow_b = FlowRunTask(flow_name='flow-b', project_name='test) 

   result = flow_b(upstream_tasks[flow_a])
When I deploy the combined_flow to the server, I can't set the parameters for FlowA and FlowB. Is it possible to do so and how? Thanks!
e

emre

09/25/2020, 8:47 AM
https://docs.prefect.io/api/latest/tasks/prefect.html#flowruntask The
FlowRunTask
accepts a
parameters
argument, both at init time and run-time. You could provide parameters from the combined flow that way. Also keep in mind that all
FlowRunTask
does is schedule a flow run on the prefect server/cloud. It will not run flow_a or flow_b within the combined flow. If you want to do that, you can just define and run your subflows in a task.
n

Nejc Vesel

09/25/2020, 11:30 AM
Thanks for the explanation, but I have a follow up question. I constructed the following flow as the pseudocode below:
Copy code
@task 
def run_flow1(params): 
    flow1.run(params={'params': params})

with Flow('flow-running-flow') as flow: 
    p = Parameter('flow1_param') 
    result = run_flow1(p)
While this works when I run it locally (i.e. flow.run()), when I deploy it to server and run there, it doesnt seem to work. The tasks inside flow1 don't seem to run, even though no error is returned. Furthermore, unless I specify the
executor=LocalExecutor()
in the
flow1.run()
call, I get an error saying,
AssertionError: daemonic processes are not allowed to have children
My question thus is two fold: 1. What could be the reason, that this works locally but not when deployed 2. Is this compatible with other executors other than the LocalExecutor Thanks again for the help.
e

emre

09/25/2020, 12:27 PM
So first off, I kind of assumed you were not using server. Since you are, I encourage you to just roll with
FlowRunTask
and let prefect run and monitor
flow1
separately. 1. My first instinct is to ask: where is
flow1
defined? When running on core (locally), you can define things outside of the flow definition and use them in tasks, much like normal python. But when running with server your main flow and its tasks will be serialized, and executed in a different python context. Variables that aren’t initialized in tasks are left as references. I assume
flow1
is unavailable, or empty when the flow is deployed and executed by the server. 2. Dask probably doesn’t permit the processes it manages to create children processes. The
LocalExecutor()
works since it runs the flow in the calling process, but
DaskExecutor()
creates a dask cluster, i. e. some processes of its own. As you can see, running flows inside tasks can get hairy pretty quickly, I didn’t know that either 😅
upvote 1
If you still want to execute within the flow, here are some ideas: 1. Specifically build the flow in the task, or in an upstream task. That way you can make sure the flow is populated with tasks and task dependencies during flow run. 2. You can’t create a new dask cluster, but your main flow is already running on one. You can probably get the dask_address for your existing dask cluster (for the parent flow), and provide it as
DaskExecutor(address=...)
(when calling
flow1.run()
). This config uses an existing cluster rather than creating a new one. Just some ideas, I don’t know how well they will work.
upvote 1