https://prefect.io logo
y

Yannick

01/04/2021, 10:08 AM
Hi everyone, I am evaluating whether I can build on top of Prefect (as a backend for Orchest) and am left with a few questions about serialization, caching and data passing between steps in a distributed setting. Any help would be greatly appreciated :) • I understand that serialization is done using
cloudpickle
so that Dask workers can pass the data. Does that mean that data passing between the tasks is in the hands of Dask completely? Meaning that the data is passed over the network in case Dask schedules the tasks to not be on the same machine in the cluster? • About large data from the docs: "Don't worry about passing large data objects between tasks. As long as it fits in memory, Prefect can handle it with no special settings." What exactly should fit in memory here, the sum of all output data in the flow or is there some sort of eviction going on? Example: when building a flow like: A --> B and A --> C, and B --> D, should the output from A + output from B fit completely in memory? Secondly, from the docs: "(...) If it doesn't, there are ways to distribute the operation across a cluster.", how would I go about doing such a thing? • For Input Caching, is there any way to configure how this works as it states: "Input caching is an automatic caching." since I would like additional control over input caching. Many, many thanks! 🙏
k

Kyle Moon-Wright

01/04/2021, 7:19 PM
Hey @Yannick, Lots of questions here that I’ll try to answer, but I’m sure others will have more succinct answers: 1 - Dask implementations are built into Prefect and the Dask scheduler will handle the passage of data across your cluster, whether it’s distributed or not, dynamic or static. This is typically accomplished by determining an Executor for FlowRun execution. 2 - It is recommended that an entire FlowRun should fit within memory of your environment, that includes its tasks and any caching that you’ve configured. To distribute across a cluster, your choice of Executor will determine the computation - DaskExecutor for a distributed Dask cluster and the LocalDaskExecutor for multiple threads/processes on a single machine. 3 - What sort of control do you need here? Information that the task needs to run is stored for future runs, but usually this is limited to task dependencies and States. I may have misunderstood your questions here, so let me know if I can clarify.
y

Yannick

01/05/2021, 7:33 AM
Thanks @Kyle Moon-Wright! Sounds to me like Prefect has made a well-isolated abstraction for the execution of Flows 💪 Regarding the caching (as mentioned in 3.) I would like/need full control of when input data is cached and when it is evicted. Say for example A --> B and the computation in A takes a "long" time, then I would like to be able to configure that its result should be cached (in memory) so that for future Flows the result can be reused and does not have to be computed again. If such granular control of caching supported?
k

Kyle Moon-Wright

01/05/2021, 4:51 PM
Yeah! I think cache_validators are what you’re after here, though this specifically pertains to outputs of your tasks (Result objects). You can cache for a
cache_for
duration for future Flow Runs.
y

Yannick

01/05/2021, 5:19 PM
Yea that definitely seems like what I am after. @Kyle Moon-Wright You're the best ;)
5 Views