https://prefect.io logo
s

Sanjay Patel

12/11/2020, 10:50 PM
Hi, I'm trying to understand how to use StartFlowRun as per Running dependent flows | Prefect Docs but can't get my head around the concept of passing parameters from one flow into the next. the 'Running a parametrized flow' example here doesn't use a flow as an input. I've done a few searches but can't find exactly what I am looking for. this is one example I was looking at but it doesn't give enough detail
Copy code
with Flow('first') as flow:
    a = task_a()
    b = task_b(a)
    c = task_c(b)
    StartFlowRun(flow_name='second', project_name=...)(parameters={"input"=c})
this is the closest to what I am trying to do but the response ended with using state_result.
Copy code
with Flow('first') as flow:
    a = task_a()
    b = task_b(a)
    c = task_c(b)
    
with Flow('second') as flow:
    param = Parameter('input')
    d = task_d(param)
    
# How to do something like this:
flow_a = StartFlowRun(flow_name="first", project_name="examples", wait=True)
flow_b = StartFlowRun(flow_name="second", project_name="examples", wait=True)
Copy code
with Flow('total') as flow:
    a = flow_a()
    b = flow_b()(upstream_tasks=[a], parameters={'input': a.d})
I have hte option of changing one of my flows so it produces the output I need for the next flow. so 2 questions • how does the output of flow_a get referenced into flow b • how do you access the parameter input that comes from flowa into flowb thank you!
n

nicholas

12/12/2020, 12:33 AM
Hi @Sanjay Patel - for what you're trying to achieve, think about
StartFlowRun
as equivalent to
flow.run
or pressing the
Run
button in the UI. It boils down to: anything you can do with the latter, you can do with
StartFlowRun
, which is a fancy wrapper where the above are instead represented in a task. That task returns something just like any other task, it's just that that task happens to be a
State
object. So you could build a flow that orchestrates flows to look something like this:
Copy code
@task
def task_a():
  return "hello"

task_b = StartFlowRun(flow_name="flow_b", project_name="examples", wait=True)
task_c = StartFlowRun(flow_name="flow_c", project_name="examples", wait=True)

with Flow("flow_a") as flow:
  result_a = task_a()
  result_b = task_b(upstream_tasks=[result_a], parameters={'result_a': result_a})
  result_c = task_c(upstream_tasks=[result_b], parameters={'result_b': result_b.result}) # We access the .result of result_b because StartFlowRun returns a State object, which includes the state message, result, context, and any cached inputs
Given that
StartFlowRun
returns a
State
object, any results you want to access from that can be accessed as you would any other data on an object. For more information on the state object, check out the State docs. Hopefully that clears it up a bit!
s

Sanjay Patel

12/12/2020, 12:47 AM
oh thank you so much. this helps a lot. Let me give this a try and get back to you
n

nicholas

12/12/2020, 12:51 AM
Happy to help! 😄
s

Sanjay Patel

12/12/2020, 3:19 AM
Just one follow up please. How would I then include the mapping function as part of this? will task_b.map work the same way where result_a will just be an array instead?
Copy code
with Flow("flow_a") as flow:
  result_a = task_a.map(some_iterable)
  result_b = task_b.map(upstream_tasks=[result_a], parameters={'result_a': result_a}) # map
n

nicholas

12/14/2020, 3:06 PM
@Sanjay Patel - yes, that's correct! And to unclude any parameters that you don't want to map over, you can use the
unmapped
utility function.
j

Justin Chavez

03/10/2021, 10:04 PM
Hi @nicholas, I am trying to perform a similar parameterized flow of flows like Sanjay, except when I run my flow using the result of a StartFlowRun as a parameter I get the error
AttributeError: 'StartFlowRun' object has no attribute 'result'
When I look at the return value of StartFlowRun, it is a Task, but in the code example above it looks like StartFlowRun is supposed to return a State object. Did something change?