Hi everyone, I am currently running a dynamic job ...
# prefect-community
e
Hi everyone, I am currently running a dynamic job after a call from a db. I receive a configuration which allows me to know the number of steps to run. It looks like that: • build the config • for each part of the config run a step I have two implementations. The first one is mostly static while the second is more dynamic. What would be your recommended way? 1. static implementation
Copy code
import prefect
from prefect import Flow, task

@task
def build_job():
    return {'steps': {'a': 'a', 'b': 'b', 'c': 'c'}}


@task(log_stdout=True)
def run_step(step):
    print(step)


with Flow('test') as flow:
    config = build_job()

    tmp_task = None
    for step_name in ['a', 'b', 'c']:
        upstream_tasks = None if tmp_task is None else [tmp_task]
        tmp_task = run_step(
            config['steps'][step_name], upstream_tasks=upstream_tasks
        )

flow.run()
2. Use loop to be more dynamic
Copy code
import prefect
from prefect.engine.signals import LOOP
from prefect import Flow, task

@task
def build_job():
    return {'order': ['a', 'b', 'c'], 'steps': {'a': 'a', 'b': 'b', 'c': 'c'}}

@task(log_stdout=True)
def run_step(job_file):
    # we extract the accumulated task loop result from context
    loop_payload = prefect.context.get("task_loop_result", {})
    step_name = loop_payload.get("step_name", job_file['order'][0])

    step = job_file['steps'][step_name]

    print(step)

    pos = job_file['order'].index(step_name) + 1

    if pos < len(job_file['order']):
        next_step = job_file['order'][pos]
        raise LOOP(message=f"Fib {step_name}", result=dict(step_name=next_step))

with Flow('test') as flow:
    config = build_job()
    run_step(config)
Is there a plan to support something like ?
Copy code
with Flow('test') as flow:
    config = build_job()

    tmp_task = None
    for name in config['order']:
        config['steps'][name]
Thanks !
j
Hi @EmGarr, the trade off here is really up to you. A few assorted thoughts that might help you make your decision: • If you're using an orchestration backend (prefect cloud or prefect server), each flow configuration in option 1 would need a new registration step, as the different configurations result in different flow layouts. In contrast, option 2 could use the same registered flow, and parametrize the input at runtime to run each configuration. • Option 2 uses more complicated features, and is harder (IMO) to understand. Option 1 is a lot more straightforward. If you're not using an orchestration layer, and are just running locally, I'd recommend option 1.
Is there a plan to support something like ?
There are not currently plans to support something like that. We can't loop over something at flow build time because we don't know how long that thing is or what it's composed of (we don't have the results of
build_config
yet). There's no straightforward way to make that pattern work. Looping over a runtime result has to happen at runtime.
If the number of steps is always the same and you're only enabling/disabling them, or changing their input parameters, then I'd recommend writing a static pipeline, and parametrizing each task to configure it to do what you want at runtime.
e
Thanks for the answer 👍