Will Hayes

10/04/2022, 10:25 PM
Hi All, I'm having trouble trying to dynamically build flows from a list of tasks. For example, I have tasks 1, 2, and 3 that will all modify the same dataframe. However, they might need to run in different orders or not all depending on user input. Ideally id like to build a list of these tasks to run later. For instance if I want to run task 1, 3, and 2 in that order (with their associated parameters) I'd like to call a task runner with a list of tuples like: [(Task_1, Config_1), (Task_3, Config_3), (Task_2, Config_2)]. I'd appreciate if anyone could point me in the right direction. Thanks!


10/04/2022, 11:25 PM
Hi @Will Hayes! Are you using prefect 1 or 2?

Will Hayes

10/05/2022, 4:56 AM
I am trying to use Prefect 2 but could use either. Thanks!
Is there a different approach I should be taking? I'm not finding much around dynamicly generated pipelines in Prefect.


10/05/2022, 5:51 PM
hmm - Is there a reason you'd be unable to just define a flow that calls the tasks (conditonally w some logic) in the correct order? This is essentially creating a dynamic "list of tasks to run later", with the added benefit that you wouldn't have a new flow / deployment to worry about for each possible order / series of tasks
def my_flow(param_dict: Dict[str, Dict]):
  df = pd.read_csv("...")
  if condition_a:
     # do some series of existing tasks
     task_1(df, **param_dict['task_1'])
     if condition_b:
        task_3(df, **param_dict['task_3'])
     # some other order / series of tasks

Ryan Peden

10/05/2022, 6:19 PM
Or if you want to dispatch dynamically based on flow parameters, you could try something like this:
import asyncio
from pandas import DataFrame
from prefect import flow, Task, task
from typing import Dict, List, Tuple

async def task_one(df: DataFrame = None, param_one: str = "", param_two: str = ""):
    print(f"task one operating on DataFrame with params {param_one} and {param_two}")

async def task_two(df: DataFrame = None, param_one: str = "", param_two: str = ""):
    print(f"task two operating on DataFrame with params {param_one} and {param_two}")

async def task_three(df: DataFrame = None, param_one: str = "", param_two: str = ""):
    print(f"task three operating on DataFrame with params {param_one} and {param_two}")

def tasks(*args: Task):
    flows_dict = {}
    for task_fn in args:
        flows_dict[] = task_fn

    return flows_dict

async def dynamic_flow(tasks_to_run: List[Tuple[str, Dict[str, str]]]):
    available_tasks = tasks(task_one, task_two, task_three)

    # presumably you'd load/create a dataframe from a datasource here
    df = DataFrame()

    for task_to_run in tasks_to_run:
        (task_name, task_params) = task_to_run
        await available_tasks[task_name](df, **task_params)

async def main():
    run_one = [
        ("task_three", {"param_one": "test", "param_two": "test"}),
        ("task_two", {"param_one": "test", "param_two": "test"}),
        ("task_one", {"param_one": "test", "param_two": "test"})

    run_two = [
        ("task_two", {"param_one": "test2", "param_two": "test2"}),
        ("task_one", {"param_one": "test2", "param_two": "test2"})

    await dynamic_flow(run_one)
    await dynamic_flow(run_two)

if __name__ == "__main__":
❤️ 1

Will Hayes

10/06/2022, 3:01 AM
Ah, I think I got it. Thank you both, really appreciate the help!!
👍 1