EmGarr
09/22/2020, 4:19 PMimport prefect
from prefect import Flow, task
@task
def build_job():
return {'steps': {'a': 'a', 'b': 'b', 'c': 'c'}}
@task(log_stdout=True)
def run_step(step):
print(step)
with Flow('test') as flow:
config = build_job()
tmp_task = None
for step_name in ['a', 'b', 'c']:
upstream_tasks = None if tmp_task is None else [tmp_task]
tmp_task = run_step(
config['steps'][step_name], upstream_tasks=upstream_tasks
)
flow.run()
2. Use loop to be more dynamic
import prefect
from prefect.engine.signals import LOOP
from prefect import Flow, task
@task
def build_job():
return {'order': ['a', 'b', 'c'], 'steps': {'a': 'a', 'b': 'b', 'c': 'c'}}
@task(log_stdout=True)
def run_step(job_file):
# we extract the accumulated task loop result from context
loop_payload = prefect.context.get("task_loop_result", {})
step_name = loop_payload.get("step_name", job_file['order'][0])
step = job_file['steps'][step_name]
print(step)
pos = job_file['order'].index(step_name) + 1
if pos < len(job_file['order']):
next_step = job_file['order'][pos]
raise LOOP(message=f"Fib {step_name}", result=dict(step_name=next_step))
with Flow('test') as flow:
config = build_job()
run_step(config)
Is there a plan to support something like ?
with Flow('test') as flow:
config = build_job()
tmp_task = None
for name in config['order']:
config['steps'][name]
Thanks !Jim Crist-Harif
09/22/2020, 4:38 PMIs there a plan to support something like ?There are not currently plans to support something like that. We can't loop over something at flow build time because we don't know how long that thing is or what it's composed of (we don't have the results of
build_config
yet). There's no straightforward way to make that pattern work. Looping over a runtime result has to happen at runtime.EmGarr
09/23/2020, 8:38 AM