Eric Feldman
10/26/2021, 3:04 PMdef fetch_result(data):
# data ?
raise
class GetData(Task):
def run(self):
return {'data': 7}
recipe_flow = StartFlowRun(flow_name="batch", project_name="proj", wait=True)
with Flow(name='schedule') as sched:
recipe_flow.set_upstream(GetData(), key='parameters')
FunctionTask(fetch_result)(data=recipe_flow)
but the only thing I get as data
is prefect.engine.signals.SUCCESS
object
calling data.state
isn’t really helpfull, and data.state.result
I get the prefect.engine.signals.SUCCESS
all over again
is there any way to get the real tasks/flowrunid/data of the the inner flow?Kevin Kho
get_task_run_result
doesEric Feldman
10/26/2021, 3:07 PMfetch_result
? it flow run id exists only as part of string
with what values I can call get_task_run_result
?Eric Feldman
10/26/2021, 3:13 PMdata.state.message
Zanie
create_flow_run
/ wait_for_flow_run
tasks to replace StartFlowRun
-- create_flow_run
just returns the flow run id of the child flow run.Eric Feldman
10/26/2021, 3:21 PMKevin Kho
Eric Feldman
10/26/2021, 3:25 PMfetch_result
i’ll do
def fetch_result(data):
flow_run_id = b.state.message[:36] # this is the only place the id exists in
return get_task_run_result.run(flow_run_id, '<get reference task somehow>')
it will workZanie
create_flow_run
to get the id without parsing it like thatZanie
Eric Feldman
10/26/2021, 4:14 PMKevin Kho