Mahesh
04/29/2021, 7:21 AMMahesh
04/29/2021, 7:22 AMsql_list=get_values()
tasks = [
shell_exec(val)
for val in sql_list
]
for i in range(1, len(tasks)):
show_output(tasks[i].set_upstream(tasks[i - 1]))
Now i want to run another shell task once above looping tasks executed successfully.
Let me know how to do it.emre
04/29/2021, 7:42 AMset_upstream
inside your loop to place it downstream to each task in the loop.
sql_list=get_values()
tasks = [
shell_exec(val)
for val in sql_list
]
final_task = some_task()
for i in range(1, len(tasks)):
out = show_output(tasks[i].set_upstream(tasks[i - 1]))
final_task.set_upstream(out)
emre
04/29/2021, 7:45 AMget_values()
is a task that returns a list.
You are currently building a workflow. If get_values()
is a task, that means its value, and length, are not known in build time. So your for loop won't know what to do.Mahesh
04/29/2021, 8:53 AMget_values()
is normal function not a task, Let me try this.Mahesh
04/29/2021, 10:54 AMemre
04/29/2021, 11:09 AMKevin Kho
from prefect import Flow, flatten, task
from prefect.engine import signals
import prefect
from prefect.triggers import all_successful, all_failed, always_run, any_failed, any_successful
@task(trigger = always_run, skip_on_upstream_skip = False)
def train(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(x)
return
with Flow("test") as flow:
samples = [1,2,3,4,5,6]
all_tasks = []
all_tasks.append(train(0))
for i in samples:
all_tasks.append(train(i))
for i in range(1, (len(samples)+1)):
all_tasks[i].set_upstream(all_tasks[i - 1])
flow.run()
Mahesh
04/29/2021, 1:38 PMMahesh
04/29/2021, 2:24 PM