Will Hayes
10/04/2022, 10:25 PMNate
10/04/2022, 11:25 PMWill Hayes
10/05/2022, 4:56 AMNate
10/05/2022, 5:51 PM@flow
def my_flow(param_dict: Dict[str, Dict]):
df = pd.read_csv("...")
if condition_a:
# do some series of existing tasks
task_1(df, **param_dict['task_1'])
if condition_b:
task_3(df, **param_dict['task_3'])
...
else:
# some other order / series of tasks
Ryan Peden
10/05/2022, 6:19 PMimport asyncio
from pandas import DataFrame
from prefect import flow, Task, task
from typing import Dict, List, Tuple
@task
async def task_one(df: DataFrame = None, param_one: str = "", param_two: str = ""):
print(f"task one operating on DataFrame with params {param_one} and {param_two}")
@task
async def task_two(df: DataFrame = None, param_one: str = "", param_two: str = ""):
print(f"task two operating on DataFrame with params {param_one} and {param_two}")
@task
async def task_three(df: DataFrame = None, param_one: str = "", param_two: str = ""):
print(f"task three operating on DataFrame with params {param_one} and {param_two}")
def tasks(*args: Task):
flows_dict = {}
for task_fn in args:
flows_dict[task_fn.name] = task_fn
return flows_dict
@flow()
async def dynamic_flow(tasks_to_run: List[Tuple[str, Dict[str, str]]]):
available_tasks = tasks(task_one, task_two, task_three)
# presumably you'd load/create a dataframe from a datasource here
df = DataFrame()
for task_to_run in tasks_to_run:
(task_name, task_params) = task_to_run
await available_tasks[task_name](df, **task_params)
async def main():
run_one = [
("task_three", {"param_one": "test", "param_two": "test"}),
("task_two", {"param_one": "test", "param_two": "test"}),
("task_one", {"param_one": "test", "param_two": "test"})
]
run_two = [
("task_two", {"param_one": "test2", "param_two": "test2"}),
("task_one", {"param_one": "test2", "param_two": "test2"})
]
await dynamic_flow(run_one)
await dynamic_flow(run_two)
if __name__ == "__main__":
asyncio.run(main())
Will Hayes
10/06/2022, 3:01 AM