Sanjay Patel
12/11/2020, 10:50 PMwith Flow('first') as flow:
a = task_a()
b = task_b(a)
c = task_c(b)
StartFlowRun(flow_name='second', project_name=...)(parameters={"input"=c})
this is the closest to what I am trying to do but the response ended with using state_result.
with Flow('first') as flow:
a = task_a()
b = task_b(a)
c = task_c(b)
with Flow('second') as flow:
param = Parameter('input')
d = task_d(param)
# How to do something like this:
flow_a = StartFlowRun(flow_name="first", project_name="examples", wait=True)
flow_b = StartFlowRun(flow_name="second", project_name="examples", wait=True)
with Flow('total') as flow:
a = flow_a()
b = flow_b()(upstream_tasks=[a], parameters={'input': a.d})
I have hte option of changing one of my flows so it produces the output I need for the next flow. so 2 questions
• how does the output of flow_a get referenced into flow b
• how do you access the parameter input that comes from flowa into flowb
thank you!nicholas
StartFlowRun
as equivalent to flow.run
or pressing the Run
button in the UI. It boils down to: anything you can do with the latter, you can do with StartFlowRun
, which is a fancy wrapper where the above are instead represented in a task. That task returns something just like any other task, it's just that that task happens to be a State
object. So you could build a flow that orchestrates flows to look something like this:
@task
def task_a():
return "hello"
task_b = StartFlowRun(flow_name="flow_b", project_name="examples", wait=True)
task_c = StartFlowRun(flow_name="flow_c", project_name="examples", wait=True)
with Flow("flow_a") as flow:
result_a = task_a()
result_b = task_b(upstream_tasks=[result_a], parameters={'result_a': result_a})
result_c = task_c(upstream_tasks=[result_b], parameters={'result_b': result_b.result}) # We access the .result of result_b because StartFlowRun returns a State object, which includes the state message, result, context, and any cached inputs
Given that StartFlowRun
returns a State
object, any results you want to access from that can be accessed as you would any other data on an object. For more information on the state object, check out the State docs.
Hopefully that clears it up a bit!Sanjay Patel
12/12/2020, 12:47 AMnicholas
Sanjay Patel
12/12/2020, 3:19 AMwith Flow("flow_a") as flow:
result_a = task_a.map(some_iterable)
result_b = task_b.map(upstream_tasks=[result_a], parameters={'result_a': result_a}) # map
nicholas
unmapped
utility function.Justin Chavez
03/10/2021, 10:04 PMAttributeError: 'StartFlowRun' object has no attribute 'result'