Thread
#prefect-community
    d

    Daniel Suissa

    9 months ago
    The question that brought me here: how to create dynamic dependencies inside tasks? Seems like the Prefect way is to run subflows, but flows don't have return values that can be taken as parameter by other flows.. I'm trying to do something like: a = flow_a.run() b = flow_b.run(a) or alternatively call task from other tasks
    Seems like in Orion this is straightforward btw, but we want to start with the production ready version
    Kevin Kho

    Kevin Kho

    9 months ago
    So in Orion is was made straightforward because it was a pain point in current Prefect.
    flow.run()
    is not meant for production so you need to register the subflows and then trigger them with the
    StartFlowRun
    or
    create_flow_run
    task like this
    Calling a task from another task can be done like this:
    @task
    def abc():
        return 1
    
    @task
    def bcd():
        abc.run()
    but the
    abc
    in
    bcd
    is no longer a task. it’s like a regular Python function so you don’t get observability
    d

    Daniel Suissa

    9 months ago
    thanks @Kevin Kho! Then I have the right understanding here.. is there an estimate on when Orion will be production ready?
    It may be overly simple question but.. I'm wondering if I'm trying to force Prefect on my use case: Was Prefect designed to run deeply dependent dynamic pipelines? It seems like running a task in task or a flow in a flow are pretty obvious requirements of ETLs but maybe my sight is narrow here
    Kevin Kho

    Kevin Kho

    9 months ago
    No public timeline for that, but a lot of current users do run this subflow pattern. Task in a task is a bit more vague. Could you give me an example of what you are thinking?
    d

    Daniel Suissa

    9 months ago
    So in airflow I can declare that task A is dependent on task B. This tells the engine to run the B node before the A node. I do that on a task - I'd call that "deep dependency declaration". It seems like in Prefect I can only do that within a Flow so does that mean that I need to serialize the logic that creates the dependencies?
    operator* not task sorry
    Kevin Kho

    Kevin Kho

    9 months ago
    Ah ok so in Prefect upstream and downstream dependencies are on the Flow level because tasks can be reused with different dependencies each time. Example:
    @task
    def plus_one(x):
        return x+1
    
    with Flow("name") as flow:
        a = plus_one(1)
        b = plus_one(a)
    This will implicitly build the dependencies for you and pass the value of
    a
    in memory
    You can also set it with:
    with Flow(...) as flow:
        a = first_task()
        b = second_task()
        c = third_task(c_inputs, upstream_tasks=[a,b])
    or
    with Flow("ex") as flow:
        a = first_task()
        b = second_task(upstream_tasks=[a])
        c = third_task(upstream_tasks=[b])
    if there is no data dependency
    d

    Daniel Suissa

    9 months ago
    Thanks again @Kevin Kho. Sounds good except the slightly awkward way to fetch subflow results. I think we can use this for now and migrate later. Looking forward to the public release date 🙂
    Kevin Kho

    Kevin Kho

    9 months ago
    Yes for sure. In case you haven’t seen it, this blog post shows how to retrieve results.