Sam Gibson
06/22/2021, 4:40 AM@task()
def a():
# do stuff...
@task()
def b():
# do stuff...
@task(requires=[a, b])
def c(a, b):
# do stuff...
The benefit of such an approach is that is that what your dependencies is defined in the same place you need them (instead of at the bottom of the file in a flow block??). This is especially useful when your dependencies involve making queries to various data sources and you parameterise the queries, e.g.
@task(requires=[
query_table("trades"),
query_table("orders")
])
def targets(trades, orders):
# do stuff...
In the prefect model, from what I gather, for the above example I would instead write something like...
@task
def targets(trades, orders):
# do stuff...
# maybe lots of other code goes here...
with Flow("targeting") as flow:
trades = query_table("trades")(**params)
orders = query_table("orders")(**params)
targets(trades, orders)
From a code author point of view the prefect model is (subjectively, I admit) awkward, especially as the complexity of your piplines grow (with tasks depending on tasks, depending on queries, etc).
I've played around with implementing some DSL on top of prefect to achieve something similar to the above, but it feels like I'm really trying to pound a square peg into a round hole.
What I'd like to understand is what patterns or idioms are used for complicated pipelines, especially those spanning multiple modules/files, with multiple flows depending on one another? Are there advantages to declaring the DAG and your logic separately that I'm not understanding? Especially for pipelines that are built where a "start" and "end" time are implicit in every task while operating on time series data.Kevin Kho
targets.map()
, and pass in a list of inputs for parallel execution, without having to modify the task.
I think the explicit defining of requirements at a task level prevents reusability of a Task. Thinking out loud but if for example I have a Slack message task, and I want to reuse this at different parts of my Flow, wouldn’t it be easier to handle those dependencies in the Flow block as opposed to coupling those settings at the task level? I could insert that Slack Task in multiple places. I think our Task Library shows that these Tasks can be modular can recycled in bigger projects.
If you have multiple dependencies spread across multiple files, it makes sense to package it as a Python package in a Docker.
Could you elaborate more on the “start” and “end” time? Do you mean that every time the pipeline runs, you have a new “start” and “end” time and you want to keep track of state? We released a new feature around this called the KV Store that lets you persist small pieces of data and update them during your flows.
I think you’re potentially coming from another workflow orchestration tool that does things different? If so, I’d just like to know so I can understand your thoughts better.Sam Gibson
06/22/2021, 11:50 PMSam Gibson
06/22/2021, 11:51 PMKevin Kho
run_a = StartFlowRun(flow_name = "flow_a", project_name="subflows", wait=True)
run_b = StartFlowRun(flow_name = "flow_b", project_name="subflows", wait=True)
run_c = StartFlowRun(flow_name = "flow_c", project_name="subflows", wait=True)
with Flow("main", result = PrefectResult()) as main_flow:
x = Parameter(name='x', default=10)
runone = run_a(parameters={'x': x}, idempotency_key = str(datetime.datetime.now()))
runtwo = run_b(parameters={'x': x}, idempotency_key = str(datetime.datetime.now()))
runthree = run_c(parameters={'x': x}, idempotency_key = str(datetime.datetime.now()))
runtwo.set_upstream(runone)
runthree.set_upstream(runtwo)
Kevin Kho
Kevin Kho
Sam Gibson
06/23/2021, 1:53 AMKevin Kho
Sam Gibson
06/23/2021, 1:59 AMKevin Kho
Sam Gibson
06/23/2021, 11:37 PM