Hey community and <@ULVA73B9P> - I have a question...
# ask-community
b
Hey community and @Marvin - I have a question about running tasks concurrently. I have a flow that I want to execute tasks concurrently, but I also need to access the data returned from those downstream tasks. My current solution is a synchronous
for loop
. E.g.
Copy code
for object in objects:
  data = tasks.operation1(data=object)
  tasks.operation2(data=data)
I'm reading the docs which mention using either
.submit()
or
.map()
to run tasks concurrently, however i'm not sure how to access the data returned and pass them to a secondary task (which can also be run concurrently if possible)
m
thought for 277.9 seconds
You’re on the right track — in Prefect 3.x you can fan-out work with either
task.submit()
or
task.map()
and then “fan-in” the results by passing the returned futures directly to downstream tasks. Prefect automatically resolves futures passed as inputs, so you don’t need to call
.result()
before wiring tasks together. Quick rules of thumb: -
task.submit(...)
→ submit one task run, returns a
PrefectFuture
-
task.map(...)
→ submit many task runs over iterables, returns a
PrefectFutureList
- You can pass futures (or lists of futures) to other tasks; Prefect will resolve them to their values - To collect Python values at the end, call
.result()
on the future or future list Example 1: End-to-end with map (both stages concurrent)
Copy code
from prefect import flow, task, unmapped
from prefect.task_runners import ThreadPoolTaskRunner

@task
def operation1(data):
    # ...do work...
    return f"op1({data})"

@task
def operation2(data, factor=1):
    # ...do more work...
    return f"op2({data}) x{factor}"

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def my_flow(objects: list[str], factor: int = 2):
    # Fan-out stage 1
    step1 = operation1.map(data=objects)  # PrefectFutureList

    # Fan-out stage 2 using results from stage 1
    # Use unmapped() to broadcast constants to each mapped run
    step2 = operation2.map(data=step1, factor=unmapped(factor))  # PrefectFutureList

    # Collect Python results (if you need them)
    return step2.result()

if __name__ == "__main__":
    print(my_flow(["a", "b", "c"]))
Example 2: submit + list comprehension (also fully concurrent)
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task
def operation1(x): return x * 2

@task
def operation2(y): return y + 1

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def my_flow(objects: list[int]):
    f1 = [operation1.submit(o) for o in objects]            # list[PrefectFuture]
    f2 = [operation2.submit(data=f) for f in f1]            # depend on stage 1 futures
    return [f.result() for f in f2]                         # collect values

print(my_flow([1, 2, 3]))
Streaming a second stage as soon as first-stage items finish
Copy code
from prefect import flow, task
from prefect.futures import as_completed

@task
def operation1(x): ...
@task
def operation2(y): ...

@flow
def my_flow(objects):
    f1 = [operation1.submit(o) for o in objects]
    f2 = []
    for fut in as_completed(f1):                # yields futures as they complete
        f2.append(operation2.submit(data=fut))  # schedule dependent work immediately
    return [f.result() for f in f2]
A few tips: - Task runner choice: - IO-bound tasks: ThreadPoolTaskRunner (default) - CPU-bound tasks: consider ProcessPoolTaskRunner - Swap easily:
@flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
- You can pass a list of futures to a single downstream task (fan-in). Prefect will resolve the list to values:
Copy code
@task
  def aggregate(items): return sum(items)

  @flow
  def my_flow(objects):
      f1 = operation1.map(objects)        # PrefectFutureList
      total = aggregate.submit(f1)        # f1 auto-resolves to list of values
      return total.result()
- To broadcast constants in
map
, use
unmapped()
(shown above). Docs for deeper dives: - Run work concurrently (how-to) - Task runners and when to use each - PrefectFuture and PrefectFutureList API - Blog: Beyond loops — task mapping at scale Would your tasks be more IO-bound or CPU-bound? I can suggest the best runner configuration and a pattern for error handling if you share a bit about the operations.
b
@Marvin to give you more context, my flow is a data extraction and load pipeline. The first task that gets run is to extract the data from the source and then following task is the load the extracted data to storage. Ideally I want to load each instance of the extracted data once it becomes available, I do not want to wait for each iteration of the extraction to be finished before starting the data loading. What I want is for each data extraction task to be running concurrently, and as each task completes, the resulting extracted data is immediately passed to the data loading task which is also configured to run concurrently so that I have multiple extraction and corresponding loading tasks running concurrently
m
thought for 166.8 seconds
Got it — you want a streaming “pipeline” where many extract tasks run concurrently and each load starts as soon as its corresponding extract finishes. Good news: in Prefect 3.x, if you pass a future as an input to another task, Prefect will schedule the downstream immediately and it will run as soon as its upstream completes. You don’t need to wait for the whole extract stage to finish. Two clean patterns you can use: Pattern A: Pre-submit both stages (simplest) - Submit all extracts. - Immediately submit all loads using the extract futures as inputs. - Loads will start automatically as their matching extract completes.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task
def extract(item):
    # pull data from source
    return f"data({item})"

@task
def load(data):
    # write to storage
    return f"loaded({data})"

@flow(task_runner=ThreadPoolTaskRunner(max_workers=16))
def el_flow(items):
    # fan-out extract
    extract_fs = [extract.submit(i) for i in items]
    # downstream loads will start as each extract completes
    load_fs = [load.submit(f) for f in extract_fs]
    # collect final results if needed
    return [f.result() for f in load_fs]

# el_flow(["a","b","c"])
Pattern B: Stream with as_completed (if you want to react per completion) - Same concurrency as A, but you submit each load only when its extract finishes (handy if you need per-item routing, metrics, etc.).
Copy code
from prefect import flow, task
from prefect.futures import as_completed
from prefect.task_runners import ThreadPoolTaskRunner

@task
def extract(item): ...
@task
def load(data): ...

@flow(task_runner=ThreadPoolTaskRunner(max_workers=16))
def el_flow(items):
    extract_fs = [extract.submit(i) for i in items]
    load_fs = []
    for f in as_completed(extract_fs):
        load_fs.append(load.submit(f))
    return [f.result() for f in load_fs]
Notes and tips: - IO-bound work (APIs, DB, object storage) → ThreadPoolTaskRunner is ideal; use
max_workers
to tune concurrency. - CPU-bound transforms → consider
ProcessPoolTaskRunner
instead. - If an extract can fail but you still want to run a load that handles failures (e.g., dead letter), use `allow_failure`:
Copy code
from prefect import allow_failure
  # ...
  for f in as_completed(extract_fs):
      load_fs.append(load.submit(allow_failure(f)))
- Large payloads: if results are big or you’re using processes/distributed runners, consider enabling persistence on tasks or configuring
result_storage
for the flow so data passes via storage instead of memory. Docs for reference: - Run work concurrently: https://docs-3.prefect.io/v3/how-to-guides/workflows/run-work-concurrently - Task runners (Thread vs Process, etc.): https://docs-3.prefect.io/v3/concepts/task-runners - Futures API: https://docs-3.prefect.io/v3/api-ref/python/prefect-futures Quick sanity check: are your extract/load steps mostly IO-bound, and roughly how many items do you expect per run? I can suggest runner settings and any guardrails (timeouts, retries, persistence) tailored to that scale.
b
@Marvin my data extraction task depend heavily on apis, and web scraping, while my data load is storing data in both a cloudflare object storage bucket and mongodb. Can you share an example of how I can enable persistence or configure
result_storage
for the flow so data passes via storage instead of memory?
m
thought for 299.9 seconds
Great use case. Since you’re doing lots of IO (APIs, scraping, object storage, MongoDB), I’d recommend: - Use ThreadPoolTaskRunner for concurrency - Persist results to remote storage so downstream tasks fetch from storage rather than in-memory references - Prefer JSON for structured scraped data; use Pickle for arbitrary Python objects or binary Below are two ways to configure persistence and remote result storage in Prefect 3.x. Option 1: Configure a RemoteFileSystem block for Cloudflare R2 and use it in your flow/tasks - RemoteFileSystem supports any fsspec backend (S3/R2) via s3fs. Install dependencies:
Copy code
pip install prefect s3fs
- One-time: create and save a RemoteFileSystem block pointing to your R2 bucket (use environment variables for secrets)
Copy code
from prefect.filesystems import RemoteFileSystem

# R2 is S3-compatible; pass s3fs settings including endpoint_url
r2 = RemoteFileSystem(
    basepath="<s3://my-r2-bucket/prefect-results/>",
    settings={
        # s3fs credentials – prefer setting via env: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
        # "key": "...",
        # "secret": "...",
        "client_kwargs": {
            "endpoint_url": "https://<account-id>.<http://r2.cloudflarestorage.com|r2.cloudflarestorage.com>"
        },
        # Optional: region_name if your SDK requires it
        # "client_kwargs": {"endpoint_url": "...", "region_name": "auto"},
    },
)
r2.save("r2-results", overwrite=True)
print("Saved RemoteFileSystem block 'r2-results'")
- Use the saved block in your flow. Setting persist_result/result_storage ensures: - Each task’s result is written to R2 - When you pass a future to
load(...)
, Prefect loads the value from R2 automatically
Copy code
from prefect import flow, task
from prefect.filesystems import RemoteFileSystem
from prefect.serializers import JSONSerializer
from prefect.futures import as_completed
from prefect.task_runners import ThreadPoolTaskRunner

# Load the storage block you saved previously
r2_storage = RemoteFileSystem.load("r2-results")

# Choose serializer for your payloads
json_ser = JSONSerializer()

@task(
    persist_result=True,
    result_serializer=json_ser,
)
def extract(item):
    # make API calls / scraping
    # return structured data (dict/list) that JSON can handle
    return {"item": item, "payload": f"data-for-{item}"}

@task(
    persist_result=True,
    result_serializer=json_ser,
)
def load(doc):
    # Persist to MongoDB and/or Cloudflare R2 as your destination
    # In this example, the 'doc' value is fetched from R2 (result storage),
    # deserialized by Prefect, then passed in here as a Python object.
    # ... write to MongoDB and/or additional storage ...
    return {"status": "ok", "id": doc["item"]}

@flow(
    task_runner=ThreadPoolTaskRunner(max_workers=32),
    persist_result=True,
    result_storage=r2_storage,       # critical: results live in R2
    result_serializer=json_ser,
)
def etl(items: list[str]):
    # Fan-out extracts
    extract_fs = [extract.submit(i) for i in items]

    # As each extract finishes, schedule the corresponding load
    load_fs = []
    for f in as_completed(extract_fs):
        load_fs.append(load.submit(f))

    # Optionally collect final results at the end
    return [f.result() for f in load_fs]

# etl(["a","b","c"])
Notes - If your scraped payloads are large or not JSON-serializable, switch to Pickle:
Copy code
from prefect.serializers import PickleSerializer
  pickle_ser = PickleSerializer()
  @task(persist_result=True, result_serializer=pickle_ser)
- You can override result storage per task if needed by passing
result_storage=r2_storage
on the task decorator. If omitted, tasks use the flow’s storage when defined. - For very large values, consider
cache_result_in_memory=False
on tasks to avoid in-process caching. Option 2: Set global defaults via a Prefect profile (env vars) - Useful when you always want persistence and a default storage/serializer Set environment variables in your Prefect profile: ```
PREFECT_RESULTS_PERSIST_BY_DEFAULT=true PREFECT_RESULTS_DEFAULT_SERIALIZER=json # or pickle
Copy code
Then use the flow-level `result_storage` to point to your R2 block (as in Option 1). You can omit `persist_result=True` on tasks/flows if you use the global default.

What about MongoDB as a result store?
- Prefect doesn’t ship a MongoDB result storage block out-of-the-box.
- If you want to store task results in MongoDB instead of R2, you can implement a custom WritableFileSystem by subclassing `prefect.filesystems.WritableFileSystem`. It must implement two async methods: `write_path(path: str, content: bytes) -> str` and `read_path(path: str) -> bytes`.
- A minimal skeleton using PyMongo/GridFS might look like:
from prefect.filesystems import WritableFileSystem import anyio import gridfs from pymongo import MongoClient class MongoResultStorage(WritableFileSystem): def __init__(self, uri: str, db: str, basepath: str = ""): super().__init__() self.uri = uri self.db = db self.basepath = basepath async def _get_fs(self): def _connect(): client = MongoClient(self.uri) database = client[self.db] fs = gridfs.GridFS(database) return fs return await anyio.to_thread.run_sync(_connect) async def write_path(self, path: str, content: bytes) -> str: fs = await self._get_fs() full_path = f"{self.basepath}{path}" # upsert by path key: delete existing then write new def _write(): # store path in metadata so we can query by it for f in fs.find({"filename": full_path}): fs.delete(f._id) fs.put(content, filename=full_path) return full_path return await anyio.to_thread.run_sync(_write) async def read_path(self, path: str) -> bytes: fs = await self._get_fs() full_path = f"{self.basepath}{path}" def _read(): f = fs.find_one({"filename": full_path}) if not f: raise FileNotFoundError(full_path) return f.read() return await anyio.to_thread.run_sync(_read) ``` - Save your block and use it as
result_storage=MongoResultStorage(...)
on your flow/tasks. You can also persist the block with
.save("mongo-results")
similar to the R2 example. How this helps your streaming pattern - With
persist_result=True
and remote
result_storage
, each
extract.submit(...)
writes results to R2 as soon as it completes. - When you call
load.submit(extract_future)
, Prefect resolves that future by fetching the serialized value from R2 and passing the deserialized value into
load
— so your data flows via storage, not process memory, and you get immediate, per-item streaming. Useful references - Results and storage: https://docs-3.prefect.io/v3/advanced/results - Filesystem blocks (RemoteFileSystem, LocalFileSystem): https://docs-3.prefect.io/v3/api-ref/python/prefect-filesystems - Serializers: https://docs-3.prefect.io/v3/api-ref/python/prefect-serializers - Futures: https://docs-3.prefect.io/v3/api-ref/python/prefect-futures If you share your R2 endpoint format and how you prefer to manage credentials (env vars vs Prefect secrets/blocks), I can tailor the
RemoteFileSystem
settings and a safe setup flow for your environment.
b
@Marvin I am getting some errors when committing results to my Cloudflare R2 file system storage. For example my prefect task is crashing with this message:
Copy code
19:24:21.657 | ERROR   | Task run 'tapology_extract_fight_data-2fa' - An error was encountered while committing transaction '053d83811953df61018622d384646784'
The python stack track that follows includes these lines:
Copy code
File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/selector_events.py", line 651, in sock_connect
    return await fut
           ^^^^^^^^^
asyncio.exceptions.CancelledError

  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/timeouts.py", line 115, in __aexit__
    raise TimeoutError from exc_val
TimeoutError

  File "/Users/benjamingroves/playground/boxing-data/combat-data-pipelines/.venv/lib/python3.12/site-packages/aiohttp/client.py", line 745, in _connect_and_send_request
    raise ConnectionTimeoutError(
aiohttp.client_exceptions.ConnectionTimeoutError: Connection timeout to host https://<redacted_my_accound_id>.<http://r2.cloudflarestorage.com/<redacted_my_bucket_name|r2.cloudflarestorage.com/<redacted_my_bucket_name>>

  File "/Users/benjamingroves/playground/boxing-data/combat-data-pipelines/.venv/lib/python3.12/site-packages/aiobotocore/httpsession.py", line 270, in send
    raise ConnectTimeoutError(endpoint_url=request.url, error=e)
botocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: "https://<redacted_my_accound_id>.<http://r2.cloudflarestorage.com/p<redacted_my_bucket_name|r2.cloudflarestorage.com/p<redacted_my_bucket_name>>"
m
thought for 686.3 seconds
Thanks for sharing the stack trace — that helps. The key bits are these timeout errors from aiohttp/aiobotocore when Prefect tries to commit your task’s result to your R2-backed storage: - ConnectionTimeoutError / ConnectTimeoutError to an R2 URL - The URL in the log looks malformed and seems to include the bucket name in the endpoint In Prefect 3, committing a task result uses your result storage’s filesystem (fsspec + s3fs + aiobotocore for s3), so misconfiguration of the S3-compatible endpoint or addressing style will surface as transaction commit errors. Most common causes with Cloudflare R2 - Endpoint URL includes a bucket path. Endpoint must be just the base: https://<account_id>.r2.cloudflarestorage.com — do not append the bucket or any path there. The bucket/prefix belongs in the basepath (e.g., s3://my-bucket/prefix/). - Addressing style. For S3-compatible providers like R2, path-style is the most reliable. Virtual-hosted can fail TLS/hostname checks or generate malformed hosts. - Missing/incorrect region or signature. For R2, use signature v4 and region “auto” is generally okay. - Too-short defaults for connect/read timeouts when the endpoint is slow or first-connection DNS/TLS takes longer. A known-good Prefect 3 RemoteFileSystem config for R2 Use RemoteFileSystem for result storage and pass the s3fs settings. Here’s a minimal working example you can adapt (replace placeholders with your values):
Copy code
from prefect.filesystems import RemoteFileSystem

r2_storage = RemoteFileSystem(
    basepath="s3://<your_bucket>/<optional_prefix>/",
    settings={
        # Credentials (s3fs accepts key/secret)
        "key": "<R2_ACCESS_KEY_ID>",
        "secret": "<R2_SECRET_ACCESS_KEY>",

        # Endpoint for R2 — IMPORTANT: no bucket name here
        "client_kwargs": {
            "endpoint_url": "https://<your_account_id>.<http://r2.cloudflarestorage.com|r2.cloudflarestorage.com>",
        },

        # Botocore client configuration
        "config_kwargs": {
            # Use v4 signatures and path-style addressing for R2
            "signature_version": "s3v4",
            "s3": {"addressing_style": "path"},

            # Timeouts and retries to avoid transient timeouts
            "connect_timeout": 30,
            "read_timeout": 60,
            "retries": {"max_attempts": 5, "mode": "standard"},
        },
    },
)
Then assign this storage at the flow or task level:
Copy code
from prefect import task, flow

@task(result_storage=r2_storage, persist_result=True)
def my_task():
    return {"hello": "world"}

@flow(result_storage=r2_storage)
def my_flow():
    return my_task()
Quick sanity test outside of Prefect Before running your flow, test the same configuration via s3fs/fsspec to confirm networking and credentials:
Copy code
import fsspec

fs = fsspec.filesystem(
    "s3",
    key="<R2_ACCESS_KEY_ID>",
    secret="<R2_SECRET_ACCESS_KEY>",
    client_kwargs={"endpoint_url": "https://<account_id>.<http://r2.cloudflarestorage.com|r2.cloudflarestorage.com>"},
    config_kwargs={
        "signature_version": "s3v4",
        "s3": {"addressing_style": "path"},
        "connect_timeout": 30,
        "read_timeout": 60,
        "retries": {"max_attempts": 5, "mode": "standard"},
    },
)

with fs.open("s3://<your_bucket>/<optional_prefix>/prefect-test.txt", "wb") as f:
    f.write(b"ok")
If this works, Prefect will work with the same settings. Optional: environment variables As an alternative to passing config in code, these env vars are honored by botocore/s3fs: - AWS_S3_ADDRESSING_STYLE=path - AWS_DEFAULT_REGION=auto (or another region if required) - AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY You’ll still need to set the endpoint via client_kwargs (s3fs does not read a dedicated env var for endpoint_url). Workaround while you debug If you need to keep runs from crashing while you sort out storage, you can disable persistence:
Copy code
@task(persist_result=False)
def my_task(...):
    ...
A couple of asks to pinpoint the issue
- How did you configure your result storage today? Can you share the code or block settings for your RemoteFileSystem (with secrets redacted)? - What is your current
basepath
and
endpoint_url
? The log string suggests the bucket may have been included in the endpoint. - Are you setting addressing style or timeouts anywhere? If you’d like, paste your current storage config and I’ll spot-check it. References - Prefect 3: Results and storage - RemoteFileSystem API - Transactions in Prefect 3 - s3fs S3FileSystem parameters - Cloudflare R2 S3 compatibility
b
@Marvin will prefect automatically clean up the persisted results that are being saved in the R2 object sotrage?
m
thought for 20.6 seconds
Short answer: no. Prefect does not automatically delete objects you persist to external storage (R2/S3). Prefect writes results to the location you configure; lifecycle/cleanup of those objects is up to you. Your options: - Use R2 lifecycle rules to expire objects after N days. This is the simplest and most reliable approach for ongoing cleanup. See Cloudflare R2 lifecycle rules. - Don’t persist results where you don’t need them: set
persist_result=False
on tasks/flows to avoid writing objects in the first place. - Overwrite instead of append: set a deterministic
result_storage_key
for tasks that can safely reuse the same key across runs (be careful with concurrency and traceability). - Run a periodic “janitor” flow that deletes old objects by prefix/age using s3fs/fsspec. Example “janitor” flow (R2 via s3fs) you can schedule nightly:
Copy code
from datetime import datetime, timedelta, timezone
from prefect import flow, get_run_logger
import fsspec

R2_ENDPOINT = "https://<account_id>.<http://r2.cloudflarestorage.com|r2.cloudflarestorage.com>"
BUCKET_PREFIX = "s3://<bucket>/<prefect-prefix>/"  # same prefix you use for results
ACCESS_KEY = "<R2_ACCESS_KEY_ID>"
SECRET_KEY = "<R2_SECRET_ACCESS_KEY>"

def r2_fs():
    return fsspec.filesystem(
        "s3",
        key=ACCESS_KEY,
        secret=SECRET_KEY,
        client_kwargs={"endpoint_url": R2_ENDPOINT},
        config_kwargs={
            "signature_version": "s3v4",
            "s3": {"addressing_style": "path"},
            "connect_timeout": 30,
            "read_timeout": 60,
            "retries": {"max_attempts": 5, "mode": "standard"},
        },
    )

@flow
def cleanup_results(retain_days: int = 30):
    log = get_run_logger()
    cutoff = datetime.now(timezone.utc) - timedelta(days=retain_days)
    fs = r2_fs()

    to_delete = []
    for path in fs.find(BUCKET_PREFIX):
        info = <http://fs.info|fs.info>(path)
        lm = info.get("LastModified")
        # s3fs typically returns a datetime; handle string just in case
        if isinstance(lm, str):
            try:
                lm = datetime.fromisoformat(lm.replace("Z", "+00:00"))
            except Exception:
                lm = None
        if lm and lm < cutoff:
            to_delete.append(path)

    <http://log.info|log.info>(f"Deleting {len(to_delete)} objects older than {retain_days} days")
    for p in to_delete:
        fs.rm(p)

if __name__ == "__main__":
    cleanup_results()
Related docs: - Prefect 3: Results and storage - RemoteFileSystem API - R2 lifecycle rules If you share how you want to retain results (e.g., keep last 7 days per flow, or last N runs), I can tailor a cleanup strategy or keying scheme for you.
b
@Marvin my flow currently looks something like this:
Copy code
@flow(
    log_prints=True,
    task_runner=ThreadPoolTaskRunner(max_workers=3),
    persist_result=True,
    result_storage=r2_storage,       # critical: results live in R2
    result_serializer=json_ser,
)
def tapology_routine_fights_data_sync():
  fights = list(mongodb.find_many(<some_filter>))
  print(f"Found {len(fights)} fights to process.")
  # Fan-out extracts
  fight_data = [Tapology.fights.tapology_extract_fight_data.submit(fight_url=fight.get('source_url', None)) for fight in fights]
  # As each extract finishes, schedule the corresponding load
  for f in as_completed(fight_data):
      print("Scheduling load for extracted fight data...")
      # print(f)
      DataTasks.store_raw_data.submit(f)
The results returned from each
tapology_extract_fight_data()
execution seem to be correctly persisted in the R2 bucket. The
for
loop
for f in as_completed(fight_data)
is being executed when the result from
tapology_extract_fight_data()
is a non-null value, which I can confirm because the
print()
message "Scheduling load for extracted fight data..." is visible in the logs.... however the following execution of the
DataTasks.store_raw_data()
does not seem to be running. When the
DataTasks.store_raw_data()
function runs, I expect to see additional
print()
messages in the logs which I never see. So it's like the
for loop
is running, but the function within the loop is not being called, is that expected?
m
thought for 191.2 seconds
Great question — a few important clarifications: - as_completed yields PrefectFuture objects, not the resolved values. It yields each future as soon as it finishes (regardless of whether the result is None or not). So your print in the loop indicates the extract future finished — not necessarily that it produced a non-null value. - Passing a PrefectFuture into another task’s .submit(...) is valid and does create a dependency; Prefect will resolve the future to its value before running the downstream task. Why you don’t see prints from DataTasks.store_raw_data - log_prints=True on the flow does not automatically enable print capture inside tasks. Tasks need their own log_prints=True or should use a logger. - So your downstream task may well be running, but its prints aren’t being captured in Prefect logs. What to change - Ensure the task logs prints:
Copy code
from prefect import task, get_run_logger

@task(log_prints=True)
def store_raw_data(rec):
    print("Storing record...")
    # or use a logger:
    # log = get_run_logger()
    # <http://log.info|log.info>("Storing record...")
- Keep passing the future; Prefect will resolve it:
Copy code
store_futs = []
for fut in as_completed(fight_data):
    # Optional: skip Nones explicitly if you want to avoid queuing the store
    val = fut.result()  # safe here because fut is completed
    if val is None:
        continue
    sf = DataTasks.store_raw_data.submit(val)  # or pass fut directly
    store_futs.append(sf)

# Surface any errors from store tasks so you’ll see them in the flow logs
for sf in as_completed(store_futs):
    try:
        sf.result()  # raises if the store task failed
    except Exception as exc:
        print(f"store_raw_data failed: {exc}")
Notes - If you don’t check sf.result(), failures in the store task may be easy to miss in the flow log stream. Collecting and resolving the store futures at the end will surface exceptions. - If you prefer not to block by calling result() in the main loop, pass the future directly and do the None-filtering inside the store task. References - Futures API (as_completed, result) - Run work concurrently - Task runners and futures If you can share how DataTasks.store_raw_data is declared (task decorator, parameters), I can double-check the call site for any other gotchas.
b
@Marvin is it preferable to run the dependent task as a for loop with
as_completed()
as I have done:
Copy code
for f in as_completed(fight_data):
      print("Scheduling load for extracted fight data...")
      DataTasks.store_raw_data.submit(f)
Or is it preferable to submit dependent tasks like:
Copy code
DataTasks.store_raw_data.submit(f) for f in fight_data
m
thought for 34.2 seconds
Short answer: both patterns are valid, but choose based on what you need. One critical note first: - The second snippet as written is a generator expression and won’t run anything unless you iterate it. You need brackets or a loop: - Good:
[DataTasks.store_raw_data.submit(f) for f in fight_data]
- Bad (no-op):
DataTasks.store_raw_data.submit(f) for f in fight_data
When to use each pattern - Submit all dependents at once (simple, scalable):
Copy code
# wire dependencies for all extracts; Prefect will start each store as soon as its extract finishes
store_futs = [DataTasks.store_raw_data.submit(f) for f in fight_data]

# optional: surface any errors from store tasks
for sf in as_completed(store_futs):
    sf.result()
Pros: simple; fully parallel pipeline; lets the engine schedule everything. Cons: if you have thousands of items, you create thousands of downstream tasks up front (higher memory/graph overhead). - Submit dependents reactively with as_completed (conditional logic/backpressure):
Copy code
store_futs = []
for f in as_completed(fight_data):
    v = f.result()           # since f is done, this returns immediately
    if v is None:
        continue             # skip empty extracts
    store_futs.append(DataTasks.store_raw_data.submit(v))

# optional: surface errors
for sf in as_completed(store_futs):
    sf.result()
Pros: lets you filter/branch per result; avoids creating downstream tasks for failed/None extracts; can throttle/batch; reduces the number of queued tasks. Cons: a bit more code. Notes - Passing a PrefectFuture into .submit is fine; Prefect will resolve it to the value before running the downstream task. Use the resolved value (as above) only if you want to conditionally skip or transform. - If you don’t need per-item logic and your cardinality isn’t huge, the “submit all + optionally wait later” pattern is usually simplest. - If you don’t need the results in the flow, you can skip the final waits; just be aware failures in store tasks won’t raise in the flow unless you resolve those futures. - Alternative wiring without passing data (just dependency):
DataTasks.store_raw_data.submit(rec=None, wait_for=[f])
. Docs: - Futures API (as_completed, result) - Run work concurrently If you share your expected scale (number of fights per run), I can recommend which approach (and any batching) will fit best.
b
@Marvin so now my code looks like this:
Copy code
@flow(
    log_prints=True,
    task_runner=ThreadPoolTaskRunner(max_workers=3),
    persist_result=True,
    result_storage=r2_storage,       # critical: results live in R2
    result_serializer=json_ser,
)
def tapology_routine_fights_data_sync():
  fights = list(mongodb.find_many(<some_filter>))
  print(f"Found {len(fights)} fights to process.")
  # Fan-out extracts
  fight_data = [Tapology.fights.tapology_extract_fight_data.submit(fight_url=fight.get('source_url', None)) for fight in fights]
  # As each extract finishes, schedule the corresponding load
  store_futs = []
  for f in as_completed(fight_data):
      v = f.result()         # since f is done, this returns immediately
      if v is None:
        continue             # skip empty extracts
      print("Scheduling load for extracted fight data...")
      print(v)
      store_futs.append(DataTasks.store_raw_data.submit(v))
  
  # optional: surface errors
  for sf in as_completed(store_futs):
    sf.result()
As each for loop iteration
for f in as_completed(fight_data)
initiates, it is correctly printing the
print(v)
result when the value i not
none
, however the following function
DataTasks.store_raw_data.submit(v)
is still not being executed. I can confirm that the tasks do have logging enabled, the code looks something like this:
Copy code
@task(
    log_prints=True,
    retries=3,
    retry_delay_seconds=5,
    persist_result=True,
    result_serializer=json_ser,
)
def store_raw_data(data: metadataModels.RawDataStorage):
    """
    Store raw data in MongoDB and archive it in Cloudflare R2.

    :param data: Raw data to store.
    """

    # Save the data to MongoDB
    print("Storing raw data in MongoDB...")
Why is the dependent task seemingly not being executed?
m
thought for 141.2 seconds
Short answer: your downstream task should run. Given your prints fire right before
DataTasks.store_raw_data.submit(v)
, the most common reason you won’t see the task’s prints is that the task never actually starts because its input cannot be scheduled/serialized. What’s likely happening - You’re passing
v
(looks like a custom model
metadataModels.RawDataStorage
) as a task argument. In Prefect 3, task inputs must be JSON-serializable. If not, Prefect falls back to storing the inputs in a “task scheduling storage” location before the task starts. If that storage write fails (e.g., due to R2 timeout/misconfig) or the object cannot be serialized, the task will not start — so you won’t see its prints. - This aligns with the R2 timeouts you saw earlier: even though your extract results persisted to R2, the parameter handoff for the store task may still be trying to write to storage and timing out. How to confirm quickly - Check the Prefect UI for your
store_raw_data
task runs — do they exist? What state are they in? If they’re Failed or Crashed, expand logs; you’ll likely see a “failed to store parameters” or serialization error. - Add a try/except around submit and print the future/state:
Copy code
try:
    sf = DataTasks.store_raw_data.submit(v)
    print("Submitted store task:", sf)
    store_futs.append(sf)
except Exception as e:
    print("Submit failed:", repr(e))
- After the run, inspect each store future’s final state to surface errors:
Copy code
for sf in as_completed(store_futs):
    try:
        sf.result()
        print("Store completed:", sf)
    except Exception as exc:
        print("Store failed:", repr(exc))
Fixes you can apply - Make the argument JSON-serializable before submitting: - If it’s Pydantic:
v = v.model_dump(mode="json")
- If it’s a dataclass: convert to dict - If it’s arbitrary, serialize to a plain dict you control - Or, avoid passing the heavy payload at all: - Pass a small ID or URL and have
store_raw_data
fetch from MongoDB/R2 inside the task. - This is generally more robust and keeps task parameter payloads small. - If you want to pass complex objects, ensure the parameter scheduling storage is reliable: - Since you’ve seen R2 timeouts, avoid using R2 for parameter handoff. Keep parameters JSONable or store locally and pass only light references between tasks. Minor improvements - You can pass the future directly and do the None check inside the store task to avoid materializing large objects in the flow:
Copy code
for f in as_completed(fight_data):
    store_futs.append(DataTasks.store_raw_data.submit(f))
Inside the task:
Copy code
@task(log_prints=True, persist_result=True, result_serializer=json_ser)
def store_raw_data(data):
    if data is None:
        print("Skip: no data")
        return
    print("Storing raw data in MongoDB...")
- If you keep the flow-side None filter, still ensure the object you pass is JSON-serializable (or convert it), otherwise scheduling will fail before the task starts. If you can share: - What type
v
is (Pydantic model? dict?) - Whether the
store_raw_data
task run shows up in the Prefect UI and its state/logs …I can pinpoint whether it’s serialization vs storage handoff and give you a tailored change. References: Concurrent work and Results and storage.
b
@Marvin after doing some trial and error, I believe that although each task is running concurrently, in my test in environment im running with a concurrency of 3
task_runner=ThreadPoolTaskRunner(max_workers=3)
, the execution of the dependent task is happening sequentially. I think that first all of the iterations of
fight_data = [Tapology.fights.tapology_extract_fight_data.submit(fight_url=fight.get('source_url', None)) for fight in fights]
are finishing, and then only after all those tasks are completing am I seeing the output of the
DataTasks.store_raw_data()
tasks. The reason I was not seeing the output from these tasks before, was because there are hundreds of fights that get processed, and I was not waiting until they were all completed. When I limited the number of fights to only 10 fights, I could clearly see that only after all 10 fights completed the extraction task, were they then executed through the data store tasks
m
thought for 39.2 seconds
You’ve got it—what you’re seeing is queueing behavior from the task runner. What’s happening - You submit hundreds of extract tasks up front. With ThreadPoolTaskRunner(max_workers=3), the runner’s queue is filled with extracts. - As extracts finish, you submit store tasks via as_completed, but those new tasks go to the back of the same queue, behind all remaining extracts. - Result: you don’t see store tasks run until the extract queue drains (or mostly drains). as_completed triggers submission, not priority or preemption. Ways to interleave extract and store - Submit in batches (windowed). Don’t flood the queue initially; this lets store tasks for batch N start before extracts for batch N+1 exist. Example:
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed

BATCH_SIZE = 50  # tune to your scale

@flow(task_runner=ThreadPoolTaskRunner(max_workers=3), log_prints=True)
def tapology_routine_fights_data_sync():
    fights = list(mongodb.find_many(<some_filter>))
    print(f"Found {len(fights)} fights to process.")

    store_futs = []
    for i in range(0, len(fights), BATCH_SIZE):
        batch = fights[i:i+BATCH_SIZE]
        extracts = [
            Tapology.fights.tapology_extract_fight_data.submit(
                fight_url=f.get("source_url")
            )
            for f in batch
        ]

        for fut in as_completed(extracts):
            v = fut.result()
            if v is None:
                continue
            store_futs.append(DataTasks.store_raw_data.submit(v))

    # surface errors
    for sf in as_completed(store_futs):
        sf.result()
- Increase max_workers so there are spare threads to start store tasks earlier. This helps but doesn’t change FIFO ordering; batching is still advisable. - Fuse extract+store into one task if you want strict “store right after extract” behavior per item (simplest pipeline, fewer scheduling/queue interactions). - Advanced: build a bounded pipeline (seed N extracts, and for each completion submit a store and the next extract) to keep a steady-state concurrency. This is more code; batching is usually enough. Notes - as_completed only controls when you submit the dependent task, not when it is scheduled relative to already-queued tasks. - Passing futures (or values) to .submit is fine; Prefect will respect dependencies either way. Docs - Task runners and scheduling - Run work concurrently (as_completed)