Thread
#prefect-community
    e

    EmGarr

    2 years ago
    Hi everyone, I am currently running a dynamic job after a call from a db. I receive a configuration which allows me to know the number of steps to run. It looks like that: • build the config • for each part of the config run a step I have two implementations. The first one is mostly static while the second is more dynamic. What would be your recommended way?1. static implementation
    import prefect
    from prefect import Flow, task
    
    @task
    def build_job():
        return {'steps': {'a': 'a', 'b': 'b', 'c': 'c'}}
    
    
    @task(log_stdout=True)
    def run_step(step):
        print(step)
    
    
    with Flow('test') as flow:
        config = build_job()
    
        tmp_task = None
        for step_name in ['a', 'b', 'c']:
            upstream_tasks = None if tmp_task is None else [tmp_task]
            tmp_task = run_step(
                config['steps'][step_name], upstream_tasks=upstream_tasks
            )
    
    flow.run()
    2. Use loop to be more dynamic
    import prefect
    from prefect.engine.signals import LOOP
    from prefect import Flow, task
    
    @task
    def build_job():
        return {'order': ['a', 'b', 'c'], 'steps': {'a': 'a', 'b': 'b', 'c': 'c'}}
    
    @task(log_stdout=True)
    def run_step(job_file):
        # we extract the accumulated task loop result from context
        loop_payload = prefect.context.get("task_loop_result", {})
        step_name = loop_payload.get("step_name", job_file['order'][0])
    
        step = job_file['steps'][step_name]
    
        print(step)
    
        pos = job_file['order'].index(step_name) + 1
    
        if pos < len(job_file['order']):
            next_step = job_file['order'][pos]
            raise LOOP(message=f"Fib {step_name}", result=dict(step_name=next_step))
    
    with Flow('test') as flow:
        config = build_job()
        run_step(config)
    Is there a plan to support something like ?
    with Flow('test') as flow:
        config = build_job()
    
        tmp_task = None
        for name in config['order']:
            config['steps'][name]
    Thanks !
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Hi @EmGarr, the trade off here is really up to you. A few assorted thoughts that might help you make your decision: • If you're using an orchestration backend (prefect cloud or prefect server), each flow configuration in option 1 would need a new registration step, as the different configurations result in different flow layouts. In contrast, option 2 could use the same registered flow, and parametrize the input at runtime to run each configuration. • Option 2 uses more complicated features, and is harder (IMO) to understand. Option 1 is a lot more straightforward. If you're not using an orchestration layer, and are just running locally, I'd recommend option 1.
    Is there a plan to support something like ?
    There are not currently plans to support something like that. We can't loop over something at flow build time because we don't know how long that thing is or what it's composed of (we don't have the results of
    build_config
    yet). There's no straightforward way to make that pattern work. Looping over a runtime result has to happen at runtime.
    If the number of steps is always the same and you're only enabling/disabling them, or changing their input parameters, then I'd recommend writing a static pipeline, and parametrizing each task to configure it to do what you want at runtime.
    e

    EmGarr

    2 years ago
    Thanks for the answer 👍