Hey everyone, I hope you're doing well! Currently...
# ask-community
w
Hey everyone, I hope you're doing well! Currently, I'm building a batch pipeline, and I'm seeking some insights and other perspectives to make an informed decision about how to use the Prefect flow properly with sub-flows and tasks. My pipeline consists of five significant steps: Prepare Data, Feature Engineering, Model Training, Evaluation, and Delivery of the results. I've structured it with one main flow for the entire process, and individual sub-flows for each step to separate concerns. All code within these sub-flows is written in Python, and I utilized Prefect tasks only a couple of times inside these sub-flows. However, I believe this approach lacks visibility on the main page regarding the logs, which is a crucial aspect of using Prefect. I need to dig into multiple screens to have some insights about what happened. Additionally, I want to run these flows in parallel, which would require invoking asyncio stuff, potentially executed on the main process and not external infra-like tasks. To address this issue, I'm considering converting each step into a task. This would enhance observability on the main page and enable parallel execution using map. However, it comes with downsides, like using .fn for inner tasks and creating a large task, potentially deviating from Prefect's design principles. I'd appreciate any input or suggestions on which approach to take in this scenario to achieve the most efficient and maintainable design for my Prefect pipeline.
1
k
The first thing I'm curious about is what your goal is when you mention parallelizing tasks, whether with async functionality or
.map
. Are you looking to gain performance by completing multiple simultaneous computations, or looking to take advantage of time spent otherwise waiting on IO like network transfers and reads/writes? Both is a valid answer too, but your needs will have a large influence over design choices here.
w
Thank you for you response, I appreciate your attention! Regarding the question, my primary objective is to leverage all available workers or agents to achieve multiple simultaneous computations. In my pipeline, each sub-flow handles relatively simple parameters, which are essentially hyperparameters. So I have something similar to this:
Copy code
possibilities_a = [x,y]
possibilities_b = [1,2,3]

for my_parameters in  itertools.product(l1, l2)
   result = feature_enginering_flow(my_parameters)
   training(result)
And I want to run this thing simultaneously (feature_engine in with x,1 and the other one with feature engineer x,2 and so for it).
k
What type of infrastructure are you executing on? Is there a reason you'd need many workers versus one worker that distributes flow runs to other containers on scaling infrastructure and monitors them, like with kubernetes/ECS/cloud run/ACI?
w
> And I want to run this thing simultaneously (feature_engine in with x,1 and the other one with feature engineer x,2 and so for it). Currently we have 3 agents running in our internal servers (process work_type); while we are evaluating the solution. > Is there a reason you'd need many workers versus one worker that distributes flow runs to other containers We have plans to leverage it for Kubernetes / Docker, but currently the environment which this jobs are running doesn't have this infra ready to use. (But for sure I will request if its needed to prove this experiment 🙂 ) In terms of my specific use case, please correct me if I'm wrong. If I have a Flow A running on Worker/Agent A with a work_type of 'process', and Flow A calls sub-flow B using asyncio.TaskGroup, sub-flow B will execute on the same process/Worker/Agent A. Is that correct? However, if I use Task.map, the next available agent/worker will be picked up. Is that also correct?
"one worker that distributes flow runs"
Thank you for bringing up that point. Just to clarify, when you mentioned "one worker that distributes flow runs," you were referring to the Docker/Kubernetes work type? In this case, when a flow is executed, the image will be executed as resources become available, right? So I'm safe to assume that the asyncio.TaskGroup, will run it independently, which is interesting. But how the logs would work in this scenario? I would need to "hunt" warnings and other types of logs from the sub flows as the current approach, right? I mean it will not show logs like when I use task.map
k
If I have a Flow A running on Worker/Agent A with a work_type of 'process', and Flow A calls sub-flow B using asyncio.TaskGroup, sub-flow B will execute on the same process/Worker/Agent A. Is that correct?
This is correct.
However, if I use Task.map, the next available agent/worker will be picked up. Is that also correct?
No, map submits task runs simultaneously inside the current execution environment using the selected TaskRunner. From the
map
docstring:
Copy code
Will create as many task runs as the length of the iterable(s) in the backing API and submit the task runs to the flow's task runner.
So mapped tasks still exist only within the context of the current flow run.
> In this case, when a flow is executed, the image will be executed as resources become available, right? So I'm safe to assume that the asyncio.TaskGroup, will run it independently, which is interesting. Only if you're running deployments as your subflows. The deployment is the level at which infrastructure is allocated for flow runs, so you'd need to create a separate deployment for each unit of work you want to parallelize, then begin each of those runs using
run_deployment
inside the
TaskGroup
. Only then will each subflow get its own compute resources to run on. An alternative to this is to use the Dask task runner, but Dask has its own learning curve and is a somewhat different approach to distributed compute than the one we've been talking about so far. > But how the logs would work in this scenario? I would need to "hunt" warnings and other types of logs from the sub flows as the current approach, right? Yeah, the deployments-as-subflows model still manifests as separate flow runs in the UI. We know this isn't ideal for tracking down errors at times, and your feedback on that is something we're thinking about.
w
Thank you for providing me with these detailed responses! If I understand correctly, you are suggesting that in my scenario, it would be best to stick with subflows and run them as separate deployments with docker to achieve parallelization since tasks may not provide the same level of parallel execution across workers. As for the logs, we do not have any kind of workaround? Perhaps a "hackish way" to inject logs from the subflow into the main flow, given that we have the flow run ID?
Okay looks like I can pass the logger as a parameter and it works! [all the logs shown on the main page]. The subflows page will not have the logs, which is fine for me from now, on since I can add the flow run id as a parameter on the message.
k
Awesome to hear you go this figured out!