thomas
10/20/2023, 12:31 PMflow.add_task
and task.bind()
. I would let my users construct their flows like they wanted and then I created the flows dynamically. I’ve looked at the new prefect and it seems its no longer possible to do this.
The following code is basically how I went about it, I just stripped away the dynamic part and removed some for loops.
Any ideas how/if this can be done now?
Btw The tasks themselves would come from an imported library which the flow runner would also have.
from prefect import Flow, task
@task
def task1():
return 'hi'
@task
def task2(data):
return data
# Everything below this line i'd do dynamically depending on user input.
flow_task_1 = task1
flow_task_2 = task2
flow_task_3 = task2._prefect_task.copy() # another task2
flow = Flow('a-flow')
flow.add_task(flow_task_1)
flow_task_1.bind(
flow=flow,
)
flow.add_task(flow_task_2)
flow_task_2.bind(
flow,
data=flow_task_1
)
flow.add_task(flow_task_3)
flow_task_3.bind(
flow,
data=flow_task_1
)
flow.register('prefect-cloud-test')