DiffyBron
12/10/2019, 12:25 PM#!/usr/bin/env python
import random
from prefect.triggers import all_successful, all_failed
from prefect import task, Flow
@task(name="Task A")
def task_a():
rand_num = float(random.random())
curr_limits = float(0.5)
if rand_num < curr_limits:
raise ValueError(f'{rand_num} is less than {curr_limits}')
return rand_num
@task(name="Task B", trigger=all_successful)
def task_b():
pass
@task(name="Task C", trigger=all_failed)
def task_c():
pass
if __name__ == '__main__':
with Flow('My triggers') as flow:
success = task_b(upstream_tasks=[task_a])
fail = task_c(upstream_tasks=[task_a])
flow.set_reference_tasks([success])
flow_state = flow.run()
josh
12/10/2019, 3:07 PM@task(name="Task B", trigger=all_successful)
def task_b(value):
print(value)
with Flow('My triggers') as flow:
rand_num = task_a()
success = task_b(rand_num)
fail = task_c(rand_num)
etc...
DiffyBron
12/10/2019, 3:11 PMjosh
12/10/2019, 3:14 PMwith Flow as flow...
rand_num1 = task_a()
rand_num2 = tasl_a()
success = task_b([rand_num1, rand_num2])
And then task_b can do something based on that listDiffyBron
12/10/2019, 3:23 PMjosh
12/10/2019, 3:24 PMDiffyBron
12/10/2019, 3:28 PM