Louis Burtz
04/27/2021, 5:20 AMLouis Burtz
04/27/2021, 5:24 AMKevin Kho
with Flow("schedule flow") as schedule_flow:
tasks = [
sub_flow_task(parameters={"date": date})
for date in dates
]
for i in range(1, len(tasks)):
tasks[i].set_upstream(tasks[i - 1])
Louis Burtz
04/28/2021, 12:39 AMKevin Kho
Kevin Kho
any_successful
, but I’m not entirely sure it’ll work. I’d have to check if this only applies to the direct upstream all degrees of upstream tasksKevin Kho
Louis Burtz
04/28/2021, 1:17 AMKevin Kho
Louis Burtz
04/28/2021, 1:25 AMKevin Kho
Kevin Kho
from prefect import Flow, flatten, task
from prefect.engine import signals
import prefect
from prefect.triggers import all_successful, all_failed, always_run, any_failed, any_successful
@task(trigger = always_run, skip_on_upstream_skip = False)
def train(x):
try:
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(x)
if x == 3:
raise ValueError
except:
raise signals.SKIP
return
@task(trigger = any_successful, skip_on_upstream_skip = True)
def inference(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(x)
return
with Flow("test") as flow:
samples = [1,2,3,4,5,6]
all_tasks = []
for i in samples:
all_tasks.append(train(i))
all_tasks.append(inference(i))
for i in range(1, len(samples)*2):
all_tasks[i].set_upstream(all_tasks[i - 1])
flow.run()
Louis Burtz
04/29/2021, 1:10 AMLouis Burtz
04/29/2021, 1:12 AMKevin Kho
Kevin Kho
always_run
triggerLouis Burtz
04/29/2021, 1:28 AMKevin Kho
Louis Burtz
05/07/2021, 5:33 AMLouis Burtz
05/07/2021, 5:53 AMfrom prefect import Flow, flatten, task
from prefect.engine import signals
import prefect
from prefect.triggers import all_successful, all_failed, always_run, any_failed, any_successful
from prefect.executors import LocalDaskExecutor, LocalExecutor
string = 'prefectisawesome'
print(string[0])
@task(trigger = always_run, skip_on_upstream_skip = False)
def train(x):
try:
s = string[x]
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(x)
if x == 3:
raise ValueError
except:
raise signals.SKIP
return s
@task(trigger = any_successful, skip_on_upstream_skip = True)
def inference(s):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(s)
return
with Flow("test") as flow:
samples = [1,2,3,4,5,6]
all_tasks = []
for x in samples:
all_tasks.append(train(x))
i = len(all_tasks)
all_tasks.append(inference(all_tasks[i-1]))
# all_tasks.append(inference(x))
for i in range(1, len(all_tasks)):
all_tasks[i].set_upstream(all_tasks[i - 1])
flow.executor = LocalExecutor()
flow.visualize()
Louis Burtz
05/07/2021, 6:07 AMKevin Kho
Kevin Kho
set_upstream
. I suggest setting the upstream for only every inference -> train
and not train -> inference
because those are already set.Kevin Kho
Kevin Kho
for i in range(1, len(all_tasks), 2):
all_tasks[i].set_upstream(all_tasks[i - 1])
flow.executor = LocalExecutor()
Kevin Kho
Louis Burtz
05/08/2021, 11:39 AM