Hello Team, I have a query regarding upstream task...
# ask-community
m
Hello Team, I have a query regarding upstream tasks,
Am looping tasks in flow like below,
Copy code
sql_list=get_values()
    tasks = [
        shell_exec(val)
        for val in sql_list
    ]
    for i in range(1, len(tasks)):
        show_output(tasks[i].set_upstream(tasks[i - 1]))
Now i want to run another shell task once above looping tasks executed successfully. Let me know how to do it.
e
You could define your final shell task before the loop, and use
set_upstream
inside your loop to place it downstream to each task in the loop.
Copy code
sql_list=get_values()
    tasks = [
        shell_exec(val)
        for val in sql_list
    ]
    final_task = some_task()
    for i in range(1, len(tasks)):
        out = show_output(tasks[i].set_upstream(tasks[i - 1]))
        final_task.set_upstream(out)
On another side note, this won't work if
get_values()
is a task that returns a list. You are currently building a workflow. If
get_values()
is a task, that means its value, and length, are not known in build time. So your for loop won't know what to do.
m
get_values()
is normal function not a task, Let me try this.
👍 1
@emre The above example works fine, But now i want to add a new task before looping tasks.
e
Check out task mapping: https://docs.prefect.io/core/concepts/mapping.html#prefect-approach This is the best way to run multiple copies of the same task, for each element of an upstream tasks output.
k
Hey @Mahesh! I think you can add it at the start of the list like follows. I think he needs sequential tasks @emre.
Copy code
from prefect import Flow, flatten, task
from prefect.engine import signals
import prefect
from prefect.triggers import all_successful, all_failed, always_run, any_failed, any_successful

@task(trigger = always_run, skip_on_upstream_skip = False)
def train(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x)
    return

with Flow("test") as flow:
    samples = [1,2,3,4,5,6]
    all_tasks = []

    all_tasks.append(train(0))

    for i in samples:
        all_tasks.append(train(i))

    for i in range(1, (len(samples)+1)):
        all_tasks[i].set_upstream(all_tasks[i - 1])

flow.run()
m
thanks @Kevin Kho, Will try this
@Kevin Kho I have inserted my first task and final task as 0th and last element in list. It works fine. This is what i expected. Thanks @emre @Kevin Kho
👍 2