https://prefect.io logo
Title
a

Antony Southworth

07/28/2020, 2:15 AM
Hi all, been reading through the docs the last day or two; had a few questions. I'm coming from Airflow so my mental model of things might just be out-of-whack 1. Is there any best-practise recommendations about task sizing? I know the docs say "anything you can write in Python can be a task", but what are the trade-offs there? You could say the same thing about Airflow tasks (via
PythonOperator
), but generally you don't want them too small or else things get sluggish due to scheduler overhead. 2. How does Prefect handle data for mapped tasks? For example, if I have a flow
t = lambda x: x +1
,
r1 = t.map(range(10))
,
r2 = t.map(r1)
, my understanding is that Prefect would distribute the computation of
r1
across the workers, then collect the results into a list on the "central" machine (where
flow.run()
was invoked), then serialise each element again and send the elements to the workers in order to distribute the computation of
r2
. This seems a bit inefficient (we compute the results on the worker, serialise, send back to central scheduler, deserialise, then serialise, send back to worker, deserialise, and then do more work). 3. How do folks use
Task.map
in practise? For example, would it be weird to run
Task.map
for each row in a dataframe? I guess this is related to the first question; basically "how small is too small for tasks/`Task.map`?" 4. Is there any way to "re-run" old instances of tasks? E.g if I had a scheduled process running daily, and I need to re-run the one from Wednesday last week, is there a convenient way of doing that in the UI? I guess I'm basically asking "can you do
airflow backfill
?". 5. How do people handle returning large results from Tasks? For example, if I have a task in my Flow that returns a 1GB dataframe, I don't really want that to be serialised and sent back to the "central" machine (where
flow.run()
was invoked), cause that machine might have other work to do (and I don't want to waste time on ser/de, or loading the entire thing into memory just to serialise it and send it to another worker). For Airflow, I usually store things on S3 and use XCom to tell other tasks where the result was stored. Would the "Prefect Way" of doing this be for my task to upload its results to S3 and return the path as the return variable from the Task? 6. Is there any way to run Flows in a "streaming" or asynchronous fashion? For example, if my task could
yield
rows from a dataframe, and the downstream tasks accepted an
Iterator
. Again, just thinking in terms of memory-usage, it would be nice to not require the entire dataframe loaded in-memory. Apologies if these are all covered in the docs; I thought I got through them pretty thoroughly but it's possible I may have missed some parts.
Whaddya know, I just stumbled on to this part which seems to answer the questions about sending data around: https://docs.prefect.io/core/concepts/persistence.html#persisting-output Still curious about the other parts though 🙂
c

Chris White

07/28/2020, 2:37 AM
Hi Antony and welcome! I wrote about many of the differences between Airflow and Prefect here that you might find useful: https://medium.com/the-prefect-blog/why-not-airflow-4cfa423299c4 I’ll give a brief response to each of your questions here still: 1. This is completely up to you depending on what you find useful; the main trade offs are code organization and how useful the UI is. I suggest considering Prefect tasks individual units of business logic for maximum clarity, but there’s no technical reason for that recommendation. 2. Prefect is not a swap-in replacement for a system like Dask, so there are going to be some inefficiencies lurking around. Ultimately all returned data from all tasks will eventually get collected in memory on the flow runner process. Any additional memory management you need you’ll have to factor into how you write your tasks. 3. While you technically can map each row of a dataframe, my question to you is: why? If each row of the dataframe needs to undergo some transformation that requires retries / stateful visibility in the UI then by all means go for it, but I suspect you don’t need full state management for each row. 4. Yup there are many ways - Prefect doesn’t have an implicit dependence on time (all dependencies are explicit) so it depends on how you structured your code; most people use either Parameters or Context for time dependencies, both of which can be overriden when creating new flow runs 5. Returning the path makes sense; Prefect doesn’t perform any additional memory management on top of Dask, so returning large objects is fine if your setup can handle the memory overhead 6. Not currently; tasks must finish before downstream tasks can begin, which any type of stateful
yield
would violate. If you search Slack / GitHub you’ll find many instances of people talking about how to incorporate streaming into Prefect - it’s a hot topic of discussion right now
👍 3
a

Antony Southworth

07/28/2020, 3:04 AM
thank you so much @Chris White!
👍 1