Hans Lellelid
07/14/2023, 8:02 PMprocess_file.submit(path)
) to process each file. My understanding is that this means that agents can do that part in parallel for however many files match the glob pattern.
⢠What I would like to do is also parallelize the processing of batches of rows within these files, but attempting to have the process_file
task call another task using process_batch.submit()
results in an error about tasks not being callable outside flows.
So, questions:
1. What's the best way to architect this [ideally without reinventing the worker/concurrency]?
2. Are we pushing Prefect outside of what it's envisioned by implementing our pipeline directly in flows/tasks like this?Dominic Tarro
07/14/2023, 8:15 PMimport asyncio
from prefect import flow
from prefect.deployments.deployments import run_deployment
@flow
def worker_flow(path):
"""do stuff to individual file"""
# Load the data from the path
data = ...
# Do your individual file processing
...
@flow
async def orchestrator_flow():
"""submit workers for each path globbed"""
paths = ...
results = await asyncio.gather(
*[run_deployment("worker_flow/deployment-name", parameters=dict(path=path)) for path in paths
)
It requires you to create a deployment for the worker.Hans Lellelid
07/14/2023, 8:17 PMDominic Tarro
07/14/2023, 8:21 PMHans Lellelid
07/14/2023, 8:21 PMDominic Tarro
07/14/2023, 8:21 PMprefect_aws.ECSTask
for your infra?Hans Lellelid
07/14/2023, 8:21 PMHans Lellelid
07/14/2023, 8:22 PMDominic Tarro
07/14/2023, 8:24 PMHans Lellelid
07/14/2023, 8:24 PMHans Lellelid
07/14/2023, 8:24 PMHans Lellelid
07/14/2023, 8:24 PMDominic Tarro
07/14/2023, 8:25 PMHans Lellelid
07/14/2023, 8:26 PMDominic Tarro
07/14/2023, 8:26 PMHans Lellelid
07/14/2023, 8:26 PMHans Lellelid
07/14/2023, 8:26 PMDominic Tarro
07/14/2023, 8:27 PM