Thread
#prefect-community
    Mahesh

    Mahesh

    1 year ago
    Hello Team, I have a query regarding upstream tasks,
    Am looping tasks in flow like below,
    sql_list=get_values()
        tasks = [
            shell_exec(val)
            for val in sql_list
        ]
        for i in range(1, len(tasks)):
            show_output(tasks[i].set_upstream(tasks[i - 1]))
    Now i want to run another shell task once above looping tasks executed successfully. Let me know how to do it.
    emre

    emre

    1 year ago
    You could define your final shell task before the loop, and use
    set_upstream
    inside your loop to place it downstream to each task in the loop.
    sql_list=get_values()
        tasks = [
            shell_exec(val)
            for val in sql_list
        ]
        final_task = some_task()
        for i in range(1, len(tasks)):
            out = show_output(tasks[i].set_upstream(tasks[i - 1]))
            final_task.set_upstream(out)
    On another side note, this won't work if
    get_values()
    is a task that returns a list. You are currently building a workflow. If
    get_values()
    is a task, that means its value, and length, are not known in build time. So your for loop won't know what to do.
    Mahesh

    Mahesh

    1 year ago
    get_values()
    is normal function not a task, Let me try this.
    @emre The above example works fine, But now i want to add a new task before looping tasks.
    emre

    emre

    1 year ago
    Check out task mapping:https://docs.prefect.io/core/concepts/mapping.html#prefect-approach This is the best way to run multiple copies of the same task, for each element of an upstream tasks output.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Mahesh! I think you can add it at the start of the list like follows. I think he needs sequential tasks @emre.
    from prefect import Flow, flatten, task
    from prefect.engine import signals
    import prefect
    from prefect.triggers import all_successful, all_failed, always_run, any_failed, any_successful
    
    @task(trigger = always_run, skip_on_upstream_skip = False)
    def train(x):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(x)
        return
    
    with Flow("test") as flow:
        samples = [1,2,3,4,5,6]
        all_tasks = []
    
        all_tasks.append(train(0))
    
        for i in samples:
            all_tasks.append(train(i))
    
        for i in range(1, (len(samples)+1)):
            all_tasks[i].set_upstream(all_tasks[i - 1])
    
    flow.run()
    Mahesh

    Mahesh

    1 year ago
    thanks @Kevin Kho, Will try this
    @Kevin Kho I have inserted my first task and final task as 0th and last element in list. It works fine. This is what i expected. Thanks @emre @Kevin Kho