The question that brought me here: how to create d...
# ask-community
d
The question that brought me here: how to create dynamic dependencies inside tasks? Seems like the Prefect way is to run subflows, but flows don't have return values that can be taken as parameter by other flows.. I'm trying to do something like: a = flow_a.run() b = flow_b.run(a) or alternatively call task from other tasks
Seems like in Orion this is straightforward btw, but we want to start with the production ready version
k
So in Orion is was made straightforward because it was a pain point in current Prefect.
flow.run()
is not meant for production so you need to register the subflows and then trigger them with the
StartFlowRun
or
create_flow_run
task like this
Calling a task from another task can be done like this:
Copy code
@task
def abc():
    return 1

@task
def bcd():
    abc.run()
but the
abc
in
bcd
is no longer a task. it’s like a regular Python function so you don’t get observability
d
thanks @Kevin Kho! Then I have the right understanding here.. is there an estimate on when Orion will be production ready?
It may be overly simple question but.. I'm wondering if I'm trying to force Prefect on my use case: Was Prefect designed to run deeply dependent dynamic pipelines? It seems like running a task in task or a flow in a flow are pretty obvious requirements of ETLs but maybe my sight is narrow here
k
No public timeline for that, but a lot of current users do run this subflow pattern. Task in a task is a bit more vague. Could you give me an example of what you are thinking?
d
So in airflow I can declare that task A is dependent on task B. This tells the engine to run the B node before the A node. I do that on a task - I'd call that "deep dependency declaration". It seems like in Prefect I can only do that within a Flow so does that mean that I need to serialize the logic that creates the dependencies?
operator* not task sorry
k
Ah ok so in Prefect upstream and downstream dependencies are on the Flow level because tasks can be reused with different dependencies each time. Example:
Copy code
@task
def plus_one(x):
    return x+1

with Flow("name") as flow:
    a = plus_one(1)
    b = plus_one(a)
This will implicitly build the dependencies for you and pass the value of
a
in memory
You can also set it with:
Copy code
with Flow(...) as flow:
    a = first_task()
    b = second_task()
    c = third_task(c_inputs, upstream_tasks=[a,b])
or
Copy code
with Flow("ex") as flow:
    a = first_task()
    b = second_task(upstream_tasks=[a])
    c = third_task(upstream_tasks=[b])
if there is no data dependency
d
Thanks again @Kevin Kho. Sounds good except the slightly awkward way to fetch subflow results. I think we can use this for now and migrate later. Looking forward to the public release date 🙂
k
Yes for sure. In case you haven’t seen it, this blog post shows how to retrieve results.
🙌 1