Hi <#CL09KU1K7|>! I’m just starting to work with P...
# ask-community
p
Hi #CL09KU1K7! I’m just starting to work with Prefect 3 and I’ve run into a task for which, as far as I can tell, there is no built-in mechanism — defining dependencies between deployments. My pipeline has three stages: 1. Parsers are launched for a set of datasets (there can be ~40). 2. The result is a list of which datasets were successfully collected and which were not. 3. Failed ones need to be retried (run the parser again for those datasets), and successful ones move to the next step. 4. For each successfully collected dataset, a converter (separate flow) is run. 5. In the third stage, "jobs" are launched that may depend on several datasets from different providers. 6. For example, a single job may depend on datasets #1 and #30 — only when both are converted should the job be launched. The problem: how do I restart this without rerunning everything? • I need to be able to re-run only the failed parsers. • Converters should start only for newly successful parsers. • Jobs should run only when all their dependencies are ready. In Prefect 3 I haven’t found a built-in way to define such dependencies between deployments. Right now I’m thinking about writing my own "orchestrator" flow that triggers deployments via the API and monitors their status, but that feels rather heavy. I considered something like this with
asyncio
(for example ): import asyncio import random from typing import AsyncGenerator, Callable, Awaitable async def run_jobs_async(jobs: list[Callable[[], Awaitable]]) -> AsyncGenerator: tasks = [asyncio.create_task(job()) for job in jobs] for completed in asyncio.as_completed(tasks): result = await completed yield result async def sample_job(i: int) -> str: await asyncio.sleep(random.random() * 2) # simulate work return f"Job {i} done" async def main(): jobs = [lambda i=i: sample_job(i) for i in range(5)] async for outcome in run_jobs_async(jobs): print(outcome) if name == "__main__": asyncio.run(main()) If I implement my logic in this way, will I lose key Prefect benefits — such as automatic logging, dependency tracking, the ability to restart only parts of the pipeline, etc.? Or is there a recommended way to implement my use case in Prefect 3 so that I can keep those benefits?
b
Here's an AI-generated example that might be able to accomplish what you're looking for. 1. Use
persist_result=True
with a stable
cache_key_fn
such as
task_input_hash
for both
parse_dataset
and
convert_dataset
. On subsequent flow runs, Prefect will read completed results from storage rather than re-executing, so only newly successful parsers will trigger converters. If a parser previously failed, only that task will run again. 2. Express dependencies with
wait_for
. Converters wait on their parser future. Jobs wait on the exact set of converter futures they need. Prefect tracks this in the UI so you get full visibility and logs. 3. Keep parsers and converters idempotent and include a version or checksum in their inputs so the cache key reflects real work. For example, include a source file ETag or snapshot timestamp in the task inputs for
cache_key_fn
. 4. If you prefer deployments per stage, call them from the coordinator using
prefect.deployments.run_deployment(...)
instead of
convert_dataset.submit(...)
. You still keep dependencies by awaiting futures and you still get caching if the called flow uses persisted results. 5. If you want cross run decoupling, emit a small artifact or event from
convert_dataset
like
{"dataset": ds, "version": v, "batch": run_id}
and have a lightweight “jobs” flow read the artifact store to decide what to run. You can also add Automations that trigger the jobs deployment when conversion events arrive, but for most teams the single coordinator flow with persisted results is the simplest and most reliable.