Hi everyone. Very new to prefect, and just startin...
# ask-community
s
Hi everyone. Very new to prefect, and just starting to evaluate how it might be able to be a replacement for our internally built tooling. There are lots of really good things that we see in the design, but it's different enough to our current approach that we're struggling to understand some usage. One big conceptual challenge is that in our current implementation the DAG is created at declaration-time by saying what your dependencies are, e.g.
Copy code
@task()
def a():
    # do stuff...

@task()
def b():
    # do stuff...

@task(requires=[a, b])
def c(a, b):
    # do stuff...
The benefit of such an approach is that is that what your dependencies is defined in the same place you need them (instead of at the bottom of the file in a flow block??). This is especially useful when your dependencies involve making queries to various data sources and you parameterise the queries, e.g.
Copy code
@task(requires=[
    query_table("trades"),
    query_table("orders")
])
def targets(trades, orders):
    # do stuff...
In the prefect model, from what I gather, for the above example I would instead write something like...
Copy code
@task
def targets(trades, orders):
    # do stuff...

# maybe lots of other code goes here...

with Flow("targeting") as flow:
    trades = query_table("trades")(**params)
    orders = query_table("orders")(**params)

    targets(trades, orders)
From a code author point of view the prefect model is (subjectively, I admit) awkward, especially as the complexity of your piplines grow (with tasks depending on tasks, depending on queries, etc). I've played around with implementing some DSL on top of prefect to achieve something similar to the above, but it feels like I'm really trying to pound a square peg into a round hole. What I'd like to understand is what patterns or idioms are used for complicated pipelines, especially those spanning multiple modules/files, with multiple flows depending on one another? Are there advantages to declaring the DAG and your logic separately that I'm not understanding? Especially for pipelines that are built where a "start" and "end" time are implicit in every task while operating on time series data.
k
Hi @Sam Gibson, there are a couple of things here to answer so I’ll try to cover. Your code example looks good. For complicated pipelines, people use Flow of Flows where a main Flow triggers various subflows. This allows people to modularize their logic, and the main Flow can determine whether or not to fire the subflows. This also allows heterogenous execution environments (think high memory Dask cluster for memory bound processes. GPU machine for a deep learning training). This is something enabled by separation of Flow configuration and logic. Mapping is also enabled by this separation. You can do
targets.map()
, and pass in a list of inputs for parallel execution, without having to modify the task. I think the explicit defining of requirements at a task level prevents reusability of a Task. Thinking out loud but if for example I have a Slack message task, and I want to reuse this at different parts of my Flow, wouldn’t it be easier to handle those dependencies in the Flow block as opposed to coupling those settings at the task level? I could insert that Slack Task in multiple places. I think our Task Library shows that these Tasks can be modular can recycled in bigger projects. If you have multiple dependencies spread across multiple files, it makes sense to package it as a Python package in a Docker. Could you elaborate more on the “start” and “end” time? Do you mean that every time the pipeline runs, you have a new “start” and “end” time and you want to keep track of state? We released a new feature around this called the KV Store that lets you persist small pieces of data and update them during your flows. I think you’re potentially coming from another workflow orchestration tool that does things different? If so, I’d just like to know so I can understand your thoughts better.
s
Hey @Kevin Kho thanks so much for the detailed reply. You're right that we're coming from a tool that models things differently - but as I said there's a number of compelling things about prefect that have led us to evaluating its suitability. And of course, I'm open to changing our approach (otherwise I wouldn't be here :-)) To answer your question about start and end times: Yes the "start" and "end" would change every run of the pipeline and some state is tracked between runs. You can imagine this as a continuous process that runs throughout the day incrementally appending to output datasets from a variety of constantly updating inputs. Most of the pipeline logic isn't easily translatable to a "streaming" framework (e.g. ksql, spark, faust, etc) and mostly is very pandas/dataframe centric. What's the technical reason for packaging modules in Docker? Especially as pipelines grow in complexity, surely the deployment complexity increases dramatically? It could be because of the nature of our systems allowing it, but our pipelines are often very complex, split across many 10's of files. --- I'm not sure I understand your point about the Slack task? In a model where a task defines it's own dependencies, any task that wants to use it just declares it as a dependency (thus inheriting the entire sub-DAG as it's own upstream dep.). Do you mean you have some generic "message to slack" task that you want to parameterise? This just seems like a hard thing to justify to me, since the majority of tasks (that exist in our pipelines) are focus on producing an output dataset or some intermediary (viz. producing dataframes and passing them around).
(sorry for the long time between replies - APAC timezones make this challenging)
k
Replying about the Slack task first because it’s the lowest hanging fruit. Yes we do have a generic message to slack that is parameterized. But let me show you another example. Some people use Prefect to orchestrate Prefect subflows as mentioned above. This is a generic task that is used a lot. Example below:
Copy code
run_a = StartFlowRun(flow_name = "flow_a", project_name="subflows", wait=True)
run_b = StartFlowRun(flow_name = "flow_b", project_name="subflows", wait=True)
run_c = StartFlowRun(flow_name = "flow_c", project_name="subflows", wait=True)

with Flow("main", result = PrefectResult()) as main_flow:
    x = Parameter(name='x', default=10)
    runone = run_a(parameters={'x': x}, idempotency_key = str(datetime.datetime.now()))
    runtwo = run_b(parameters={'x': x}, idempotency_key = str(datetime.datetime.now()))
    runthree = run_c(parameters={'x': x}, idempotency_key = str(datetime.datetime.now()))
    runtwo.set_upstream(runone)
    runthree.set_upstream(runtwo)
In this case, the StartFlowRun tasks would have different dependencies, which get set in the Flow definition. For a SlackTask, you can have different dependencies depending on the conditional logic that your flow took.
The start and end time handling are a prime use case for the KV store, which can hold that state for you. Git repos that hold interdependent files to store Flows is something on the roadmap, so we’re not intending to limit ourselves to Docker. That said, Docker would keep the Flow consistent rather than relying on the execution environment (agent) to have the same settings and versions of packages. Prefect as a tool is tries to be flexible, and you can avoid Docker if you agent is set up similar to where you registred from
s
So with the KV store for scheduling a repeating set of runs, is an external application typically written that triggers the runs, or is this something that's easily expressible using "schedules"?
k
I think they are conceptually independent, though there is some overlap. For a Flow that repeats on a consistent basis, a schedule will do. The pain point was that people wanted to store small pieces of data (think last timestamp retrieved). The KV store is was just released last month to address this problem. Before then users had to persist in a file or somewhere external.
s
Does a schedule guarantee that a new run won't be started until the last run is finished?
k
It doesn’t. We have a few mechanisms with Prefect Cloud that let you work with this. First, we have flow concurrency limits that only let a certain number of Flows run at a given time. The second is Flow SLAs. Think “if this run is late by 5 minutes, cancel it”. This is doable though without these. All you need to do with your first task is hit our GraphQL API endpoint and see if a Flow has ongoing runs, and then just end the current run
s
I see. Thanks for your help @Kevin Kho. I'll let you know if I have any more questions