pavel hrablis
08/12/2025, 5:10 PMasyncio
(for example ):
import asyncio
import random
from typing import AsyncGenerator, Callable, Awaitable
async def run_jobs_async(jobs: list[Callable[[], Awaitable]]) -> AsyncGenerator:
tasks = [asyncio.create_task(job()) for job in jobs]
for completed in asyncio.as_completed(tasks):
result = await completed
yield result
async def sample_job(i: int) -> str:
await asyncio.sleep(random.random() * 2) # simulate work
return f"Job {i} done"
async def main():
jobs = [lambda i=i: sample_job(i) for i in range(5)]
async for outcome in run_jobs_async(jobs):
print(outcome)
if name == "__main__":
asyncio.run(main())
If I implement my logic in this way, will I lose key Prefect benefits — such as automatic logging, dependency tracking, the ability to restart only parts of the pipeline, etc.?
Or is there a recommended way to implement my use case in Prefect 3 so that I can keep those benefits?Brendan Dalpe
08/13/2025, 6:49 PMpersist_result=True
with a stable cache_key_fn
such as task_input_hash
for both parse_dataset
and convert_dataset
. On subsequent flow runs, Prefect will read completed results from storage rather than re-executing, so only newly successful parsers will trigger converters. If a parser previously failed, only that task will run again.
2. Express dependencies with wait_for
. Converters wait on their parser future. Jobs wait on the exact set of converter futures they need. Prefect tracks this in the UI so you get full visibility and logs.
3. Keep parsers and converters idempotent and include a version or checksum in their inputs so the cache key reflects real work. For example, include a source file ETag or snapshot timestamp in the task inputs for cache_key_fn
.
4. If you prefer deployments per stage, call them from the coordinator using prefect.deployments.run_deployment(...)
instead of convert_dataset.submit(...)
. You still keep dependencies by awaiting futures and you still get caching if the called flow uses persisted results.
5. If you want cross run decoupling, emit a small artifact or event from convert_dataset
like {"dataset": ds, "version": v, "batch": run_id}
and have a lightweight “jobs” flow read the artifact store to decide what to run. You can also add Automations that trigger the jobs deployment when conversion events arrive, but for most teams the single coordinator flow with persisted results is the simplest and most reliable.