hey, is it possible to data between flows using dependent flows?
Copy code
def fetch_result(data):
# data ?
raise
class GetData(Task):
def run(self):
return {'data': 7}
recipe_flow = StartFlowRun(flow_name="batch", project_name="proj", wait=True)
with Flow(name='schedule') as sched:
recipe_flow.set_upstream(GetData(), key='parameters')
FunctionTask(fetch_result)(data=recipe_flow)
but the only thing I get as
data
is
prefect.engine.signals.SUCCESS
object
calling
data.state
isn’t really helpfull, and
data.state.result
I get the
prefect.engine.signals.SUCCESS
all over again
is there any way to get the real tasks/flowrunid/data of the the inner flow?
k
Kevin Kho
10/26/2021, 3:05 PM
It is not possible without using the GraphQL API which is what the
get_task_run_result
does
e
Eric Feldman
10/26/2021, 3:07 PM
how do i get the flow run id in
fetch_result
? it flow run id exists only as part of string
with what values I can call
get_task_run_result
?
Eric Feldman
10/26/2021, 3:13 PM
the only way I think of is getting the flow run id using regex from
data.state.message
z
Zanie
10/26/2021, 3:20 PM
This is the reason we introduced the
create_flow_run
/
wait_for_flow_run
tasks to replace
StartFlowRun
--
create_flow_run
just returns the flow run id of the child flow run.
e
Eric Feldman
10/26/2021, 3:21 PM
so in order to have flow A that schedules flow B, i need to have a task in flow A that creates a flow run and wait for its response?
k
Kevin Kho
10/26/2021, 3:22 PM
Yes. create_flow_b -> wait_for_flow_b as tasks in Flow A
e
Eric Feldman
10/26/2021, 3:25 PM
got it
but if in
fetch_result
i’ll do
Copy code
def fetch_result(data):
flow_run_id = b.state.message[:36] # this is the only place the id exists in
return get_task_run_result.run(flow_run_id, '<get reference task somehow>')
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.