Ben
10/23/2025, 4:20 PMfor loop. E.g.
for object in objects:
data = tasks.operation1(data=object)
tasks.operation2(data=data)
I'm reading the docs which mention using either .submit() or .map() to run tasks concurrently, however i'm not sure how to access the data returned and pass them to a secondary task (which can also be run concurrently if possible)Marvin
10/23/2025, 4:21 PMMarvin
10/23/2025, 4:25 PMtask.submit() or task.map() and then “fan-in” the results by passing the returned futures directly to downstream tasks. Prefect automatically resolves futures passed as inputs, so you don’t need to call .result() before wiring tasks together.
Quick rules of thumb:
- task.submit(...) → submit one task run, returns a PrefectFuture
- task.map(...) → submit many task runs over iterables, returns a PrefectFutureList
- You can pass futures (or lists of futures) to other tasks; Prefect will resolve them to their values
- To collect Python values at the end, call .result() on the future or future list
Example 1: End-to-end with map (both stages concurrent)
from prefect import flow, task, unmapped
from prefect.task_runners import ThreadPoolTaskRunner
@task
def operation1(data):
# ...do work...
return f"op1({data})"
@task
def operation2(data, factor=1):
# ...do more work...
return f"op2({data}) x{factor}"
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def my_flow(objects: list[str], factor: int = 2):
# Fan-out stage 1
step1 = operation1.map(data=objects) # PrefectFutureList
# Fan-out stage 2 using results from stage 1
# Use unmapped() to broadcast constants to each mapped run
step2 = operation2.map(data=step1, factor=unmapped(factor)) # PrefectFutureList
# Collect Python results (if you need them)
return step2.result()
if __name__ == "__main__":
print(my_flow(["a", "b", "c"]))
Example 2: submit + list comprehension (also fully concurrent)
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def operation1(x): return x * 2
@task
def operation2(y): return y + 1
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def my_flow(objects: list[int]):
f1 = [operation1.submit(o) for o in objects] # list[PrefectFuture]
f2 = [operation2.submit(data=f) for f in f1] # depend on stage 1 futures
return [f.result() for f in f2] # collect values
print(my_flow([1, 2, 3]))
Streaming a second stage as soon as first-stage items finish
from prefect import flow, task
from prefect.futures import as_completed
@task
def operation1(x): ...
@task
def operation2(y): ...
@flow
def my_flow(objects):
f1 = [operation1.submit(o) for o in objects]
f2 = []
for fut in as_completed(f1): # yields futures as they complete
f2.append(operation2.submit(data=fut)) # schedule dependent work immediately
return [f.result() for f in f2]
A few tips:
- Task runner choice:
- IO-bound tasks: ThreadPoolTaskRunner (default)
- CPU-bound tasks: consider ProcessPoolTaskRunner
- Swap easily: @flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
- You can pass a list of futures to a single downstream task (fan-in). Prefect will resolve the list to values:
@task
def aggregate(items): return sum(items)
@flow
def my_flow(objects):
f1 = operation1.map(objects) # PrefectFutureList
total = aggregate.submit(f1) # f1 auto-resolves to list of values
return total.result()
- To broadcast constants in map, use unmapped() (shown above).
Docs for deeper dives:
- Run work concurrently (how-to)
- Task runners and when to use each
- PrefectFuture and PrefectFutureList API
- Blog: Beyond loops — task mapping at scale
Would your tasks be more IO-bound or CPU-bound? I can suggest the best runner configuration and a pattern for error handling if you share a bit about the operations.Ben
10/24/2025, 7:05 AMMarvin
10/24/2025, 7:05 AMMarvin
10/24/2025, 7:07 AMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def extract(item):
# pull data from source
return f"data({item})"
@task
def load(data):
# write to storage
return f"loaded({data})"
@flow(task_runner=ThreadPoolTaskRunner(max_workers=16))
def el_flow(items):
# fan-out extract
extract_fs = [extract.submit(i) for i in items]
# downstream loads will start as each extract completes
load_fs = [load.submit(f) for f in extract_fs]
# collect final results if needed
return [f.result() for f in load_fs]
# el_flow(["a","b","c"])
Pattern B: Stream with as_completed (if you want to react per completion)
- Same concurrency as A, but you submit each load only when its extract finishes (handy if you need per-item routing, metrics, etc.).
from prefect import flow, task
from prefect.futures import as_completed
from prefect.task_runners import ThreadPoolTaskRunner
@task
def extract(item): ...
@task
def load(data): ...
@flow(task_runner=ThreadPoolTaskRunner(max_workers=16))
def el_flow(items):
extract_fs = [extract.submit(i) for i in items]
load_fs = []
for f in as_completed(extract_fs):
load_fs.append(load.submit(f))
return [f.result() for f in load_fs]
Notes and tips:
- IO-bound work (APIs, DB, object storage) → ThreadPoolTaskRunner is ideal; use max_workers to tune concurrency.
- CPU-bound transforms → consider ProcessPoolTaskRunner instead.
- If an extract can fail but you still want to run a load that handles failures (e.g., dead letter), use `allow_failure`:
from prefect import allow_failure
# ...
for f in as_completed(extract_fs):
load_fs.append(load.submit(allow_failure(f)))
- Large payloads: if results are big or you’re using processes/distributed runners, consider enabling persistence on tasks or configuring result_storage for the flow so data passes via storage instead of memory.
Docs for reference:
- Run work concurrently: https://docs-3.prefect.io/v3/how-to-guides/workflows/run-work-concurrently
- Task runners (Thread vs Process, etc.): https://docs-3.prefect.io/v3/concepts/task-runners
- Futures API: https://docs-3.prefect.io/v3/api-ref/python/prefect-futures
Quick sanity check: are your extract/load steps mostly IO-bound, and roughly how many items do you expect per run? I can suggest runner settings and any guardrails (timeouts, retries, persistence) tailored to that scale.Ben
10/24/2025, 7:21 AMresult_storage for the flow so data passes via storage instead of memory?Marvin
10/24/2025, 7:21 AMMarvin
10/24/2025, 7:26 AMpip install prefect s3fs
- One-time: create and save a RemoteFileSystem block pointing to your R2 bucket (use environment variables for secrets)
from prefect.filesystems import RemoteFileSystem
# R2 is S3-compatible; pass s3fs settings including endpoint_url
r2 = RemoteFileSystem(
basepath="<s3://my-r2-bucket/prefect-results/>",
settings={
# s3fs credentials – prefer setting via env: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
# "key": "...",
# "secret": "...",
"client_kwargs": {
"endpoint_url": "https://<account-id>.<http://r2.cloudflarestorage.com|r2.cloudflarestorage.com>"
},
# Optional: region_name if your SDK requires it
# "client_kwargs": {"endpoint_url": "...", "region_name": "auto"},
},
)
r2.save("r2-results", overwrite=True)
print("Saved RemoteFileSystem block 'r2-results'")
- Use the saved block in your flow. Setting persist_result/result_storage ensures:
- Each task’s result is written to R2
- When you pass a future to load(...), Prefect loads the value from R2 automatically
from prefect import flow, task
from prefect.filesystems import RemoteFileSystem
from prefect.serializers import JSONSerializer
from prefect.futures import as_completed
from prefect.task_runners import ThreadPoolTaskRunner
# Load the storage block you saved previously
r2_storage = RemoteFileSystem.load("r2-results")
# Choose serializer for your payloads
json_ser = JSONSerializer()
@task(
persist_result=True,
result_serializer=json_ser,
)
def extract(item):
# make API calls / scraping
# return structured data (dict/list) that JSON can handle
return {"item": item, "payload": f"data-for-{item}"}
@task(
persist_result=True,
result_serializer=json_ser,
)
def load(doc):
# Persist to MongoDB and/or Cloudflare R2 as your destination
# In this example, the 'doc' value is fetched from R2 (result storage),
# deserialized by Prefect, then passed in here as a Python object.
# ... write to MongoDB and/or additional storage ...
return {"status": "ok", "id": doc["item"]}
@flow(
task_runner=ThreadPoolTaskRunner(max_workers=32),
persist_result=True,
result_storage=r2_storage, # critical: results live in R2
result_serializer=json_ser,
)
def etl(items: list[str]):
# Fan-out extracts
extract_fs = [extract.submit(i) for i in items]
# As each extract finishes, schedule the corresponding load
load_fs = []
for f in as_completed(extract_fs):
load_fs.append(load.submit(f))
# Optionally collect final results at the end
return [f.result() for f in load_fs]
# etl(["a","b","c"])
Notes
- If your scraped payloads are large or not JSON-serializable, switch to Pickle:
from prefect.serializers import PickleSerializer
pickle_ser = PickleSerializer()
@task(persist_result=True, result_serializer=pickle_ser)
- You can override result storage per task if needed by passing result_storage=r2_storage on the task decorator. If omitted, tasks use the flow’s storage when defined.
- For very large values, consider cache_result_in_memory=False on tasks to avoid in-process caching.
Option 2: Set global defaults via a Prefect profile (env vars)
- Useful when you always want persistence and a default storage/serializer
Set environment variables in your Prefect profile:
```Marvin
10/24/2025, 7:26 AMThen use the flow-level `result_storage` to point to your R2 block (as in Option 1). You can omit `persist_result=True` on tasks/flows if you use the global default.
What about MongoDB as a result store?
- Prefect doesn’t ship a MongoDB result storage block out-of-the-box.
- If you want to store task results in MongoDB instead of R2, you can implement a custom WritableFileSystem by subclassing `prefect.filesystems.WritableFileSystem`. It must implement two async methods: `write_path(path: str, content: bytes) -> str` and `read_path(path: str) -> bytes`.
- A minimal skeleton using PyMongo/GridFS might look like:
from prefect.filesystems import WritableFileSystem
import anyio
import gridfs
from pymongo import MongoClient
class MongoResultStorage(WritableFileSystem):
def __init__(self, uri: str, db: str, basepath: str = ""):
super().__init__()
self.uri = uri
self.db = db
self.basepath = basepath
async def _get_fs(self):
def _connect():
client = MongoClient(self.uri)
database = client[self.db]
fs = gridfs.GridFS(database)
return fs
return await anyio.to_thread.run_sync(_connect)
async def write_path(self, path: str, content: bytes) -> str:
fs = await self._get_fs()
full_path = f"{self.basepath}{path}"
# upsert by path key: delete existing then write new
def _write():
# store path in metadata so we can query by it
for f in fs.find({"filename": full_path}): fs.delete(f._id)
fs.put(content, filename=full_path)
return full_path
return await anyio.to_thread.run_sync(_write)
async def read_path(self, path: str) -> bytes:
fs = await self._get_fs()
full_path = f"{self.basepath}{path}"
def _read():
f = fs.find_one({"filename": full_path})
if not f:
raise FileNotFoundError(full_path)
return f.read()
return await anyio.to_thread.run_sync(_read)
```
- Save your block and use it as result_storage=MongoResultStorage(...) on your flow/tasks. You can also persist the block with .save("mongo-results") similar to the R2 example.
How this helps your streaming pattern
- With persist_result=True and remote result_storage, each extract.submit(...) writes results to R2 as soon as it completes.
- When you call load.submit(extract_future), Prefect resolves that future by fetching the serialized value from R2 and passing the deserialized value into load — so your data flows via storage, not process memory, and you get immediate, per-item streaming.
Useful references
- Results and storage: https://docs-3.prefect.io/v3/advanced/results
- Filesystem blocks (RemoteFileSystem, LocalFileSystem): https://docs-3.prefect.io/v3/api-ref/python/prefect-filesystems
- Serializers: https://docs-3.prefect.io/v3/api-ref/python/prefect-serializers
- Futures: https://docs-3.prefect.io/v3/api-ref/python/prefect-futures
If you share your R2 endpoint format and how you prefer to manage credentials (env vars vs Prefect secrets/blocks), I can tailor the RemoteFileSystem settings and a safe setup flow for your environment.Ben
10/25/2025, 5:32 PM19:24:21.657 | ERROR | Task run 'tapology_extract_fight_data-2fa' - An error was encountered while committing transaction '053d83811953df61018622d384646784'
The python stack track that follows includes these lines:
File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/selector_events.py", line 651, in sock_connect
return await fut
^^^^^^^^^
asyncio.exceptions.CancelledError
File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/timeouts.py", line 115, in __aexit__
raise TimeoutError from exc_val
TimeoutError
File "/Users/benjamingroves/playground/boxing-data/combat-data-pipelines/.venv/lib/python3.12/site-packages/aiohttp/client.py", line 745, in _connect_and_send_request
raise ConnectionTimeoutError(
aiohttp.client_exceptions.ConnectionTimeoutError: Connection timeout to host https://<redacted_my_accound_id>.<http://r2.cloudflarestorage.com/<redacted_my_bucket_name|r2.cloudflarestorage.com/<redacted_my_bucket_name>>
File "/Users/benjamingroves/playground/boxing-data/combat-data-pipelines/.venv/lib/python3.12/site-packages/aiobotocore/httpsession.py", line 270, in send
raise ConnectTimeoutError(endpoint_url=request.url, error=e)
botocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: "https://<redacted_my_accound_id>.<http://r2.cloudflarestorage.com/p<redacted_my_bucket_name|r2.cloudflarestorage.com/p<redacted_my_bucket_name>>"Marvin
10/25/2025, 5:32 PMMarvin
10/25/2025, 5:44 PMfrom prefect.filesystems import RemoteFileSystem
r2_storage = RemoteFileSystem(
basepath="s3://<your_bucket>/<optional_prefix>/",
settings={
# Credentials (s3fs accepts key/secret)
"key": "<R2_ACCESS_KEY_ID>",
"secret": "<R2_SECRET_ACCESS_KEY>",
# Endpoint for R2 — IMPORTANT: no bucket name here
"client_kwargs": {
"endpoint_url": "https://<your_account_id>.<http://r2.cloudflarestorage.com|r2.cloudflarestorage.com>",
},
# Botocore client configuration
"config_kwargs": {
# Use v4 signatures and path-style addressing for R2
"signature_version": "s3v4",
"s3": {"addressing_style": "path"},
# Timeouts and retries to avoid transient timeouts
"connect_timeout": 30,
"read_timeout": 60,
"retries": {"max_attempts": 5, "mode": "standard"},
},
},
)
Then assign this storage at the flow or task level:
from prefect import task, flow
@task(result_storage=r2_storage, persist_result=True)
def my_task():
return {"hello": "world"}
@flow(result_storage=r2_storage)
def my_flow():
return my_task()
Quick sanity test outside of Prefect
Before running your flow, test the same configuration via s3fs/fsspec to confirm networking and credentials:
import fsspec
fs = fsspec.filesystem(
"s3",
key="<R2_ACCESS_KEY_ID>",
secret="<R2_SECRET_ACCESS_KEY>",
client_kwargs={"endpoint_url": "https://<account_id>.<http://r2.cloudflarestorage.com|r2.cloudflarestorage.com>"},
config_kwargs={
"signature_version": "s3v4",
"s3": {"addressing_style": "path"},
"connect_timeout": 30,
"read_timeout": 60,
"retries": {"max_attempts": 5, "mode": "standard"},
},
)
with fs.open("s3://<your_bucket>/<optional_prefix>/prefect-test.txt", "wb") as f:
f.write(b"ok")
If this works, Prefect will work with the same settings.
Optional: environment variables
As an alternative to passing config in code, these env vars are honored by botocore/s3fs:
- AWS_S3_ADDRESSING_STYLE=path
- AWS_DEFAULT_REGION=auto (or another region if required)
- AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY
You’ll still need to set the endpoint via client_kwargs (s3fs does not read a dedicated env var for endpoint_url).
Workaround while you debug
If you need to keep runs from crashing while you sort out storage, you can disable persistence:
@task(persist_result=False)
def my_task(...):
...
A couple of asks to pinpoint the issueMarvin
10/25/2025, 5:44 PMbasepath and endpoint_url? The log string suggests the bucket may have been included in the endpoint.
- Are you setting addressing style or timeouts anywhere?
If you’d like, paste your current storage config and I’ll spot-check it.
References
- Prefect 3: Results and storage
- RemoteFileSystem API
- Transactions in Prefect 3
- s3fs S3FileSystem parameters
- Cloudflare R2 S3 compatibilityBen
10/26/2025, 9:43 AMMarvin
10/26/2025, 9:43 AMMarvin
10/26/2025, 9:43 AMpersist_result=False on tasks/flows to avoid writing objects in the first place.
- Overwrite instead of append: set a deterministic result_storage_key for tasks that can safely reuse the same key across runs (be careful with concurrency and traceability).
- Run a periodic “janitor” flow that deletes old objects by prefix/age using s3fs/fsspec.
Example “janitor” flow (R2 via s3fs) you can schedule nightly:
from datetime import datetime, timedelta, timezone
from prefect import flow, get_run_logger
import fsspec
R2_ENDPOINT = "https://<account_id>.<http://r2.cloudflarestorage.com|r2.cloudflarestorage.com>"
BUCKET_PREFIX = "s3://<bucket>/<prefect-prefix>/" # same prefix you use for results
ACCESS_KEY = "<R2_ACCESS_KEY_ID>"
SECRET_KEY = "<R2_SECRET_ACCESS_KEY>"
def r2_fs():
return fsspec.filesystem(
"s3",
key=ACCESS_KEY,
secret=SECRET_KEY,
client_kwargs={"endpoint_url": R2_ENDPOINT},
config_kwargs={
"signature_version": "s3v4",
"s3": {"addressing_style": "path"},
"connect_timeout": 30,
"read_timeout": 60,
"retries": {"max_attempts": 5, "mode": "standard"},
},
)
@flow
def cleanup_results(retain_days: int = 30):
log = get_run_logger()
cutoff = datetime.now(timezone.utc) - timedelta(days=retain_days)
fs = r2_fs()
to_delete = []
for path in fs.find(BUCKET_PREFIX):
info = <http://fs.info|fs.info>(path)
lm = info.get("LastModified")
# s3fs typically returns a datetime; handle string just in case
if isinstance(lm, str):
try:
lm = datetime.fromisoformat(lm.replace("Z", "+00:00"))
except Exception:
lm = None
if lm and lm < cutoff:
to_delete.append(path)
<http://log.info|log.info>(f"Deleting {len(to_delete)} objects older than {retain_days} days")
for p in to_delete:
fs.rm(p)
if __name__ == "__main__":
cleanup_results()
Related docs:
- Prefect 3: Results and storage
- RemoteFileSystem API
- R2 lifecycle rules
If you share how you want to retain results (e.g., keep last 7 days per flow, or last N runs), I can tailor a cleanup strategy or keying scheme for you.Ben
10/26/2025, 10:06 AM@flow(
log_prints=True,
task_runner=ThreadPoolTaskRunner(max_workers=3),
persist_result=True,
result_storage=r2_storage, # critical: results live in R2
result_serializer=json_ser,
)
def tapology_routine_fights_data_sync():
fights = list(mongodb.find_many(<some_filter>))
print(f"Found {len(fights)} fights to process.")
# Fan-out extracts
fight_data = [Tapology.fights.tapology_extract_fight_data.submit(fight_url=fight.get('source_url', None)) for fight in fights]
# As each extract finishes, schedule the corresponding load
for f in as_completed(fight_data):
print("Scheduling load for extracted fight data...")
# print(f)
DataTasks.store_raw_data.submit(f)
The results returned from each tapology_extract_fight_data() execution seem to be correctly persisted in the R2 bucket.
The for loop for f in as_completed(fight_data) is being executed when the result from tapology_extract_fight_data() is a non-null value, which I can confirm because the print() message "Scheduling load for extracted fight data..." is visible in the logs.... however the following execution of the DataTasks.store_raw_data() does not seem to be running.
When the DataTasks.store_raw_data() function runs, I expect to see additional print() messages in the logs which I never see.
So it's like the for loop is running, but the function within the loop is not being called, is that expected?Marvin
10/26/2025, 10:06 AMMarvin
10/26/2025, 10:10 AMfrom prefect import task, get_run_logger
@task(log_prints=True)
def store_raw_data(rec):
print("Storing record...")
# or use a logger:
# log = get_run_logger()
# <http://log.info|log.info>("Storing record...")
- Keep passing the future; Prefect will resolve it:
store_futs = []
for fut in as_completed(fight_data):
# Optional: skip Nones explicitly if you want to avoid queuing the store
val = fut.result() # safe here because fut is completed
if val is None:
continue
sf = DataTasks.store_raw_data.submit(val) # or pass fut directly
store_futs.append(sf)
# Surface any errors from store tasks so you’ll see them in the flow logs
for sf in as_completed(store_futs):
try:
sf.result() # raises if the store task failed
except Exception as exc:
print(f"store_raw_data failed: {exc}")
Notes
- If you don’t check sf.result(), failures in the store task may be easy to miss in the flow log stream. Collecting and resolving the store futures at the end will surface exceptions.
- If you prefer not to block by calling result() in the main loop, pass the future directly and do the None-filtering inside the store task.
References
- Futures API (as_completed, result)
- Run work concurrently
- Task runners and futures
If you can share how DataTasks.store_raw_data is declared (task decorator, parameters), I can double-check the call site for any other gotchas.Ben
10/26/2025, 10:18 AMas_completed() as I have done:
for f in as_completed(fight_data):
print("Scheduling load for extracted fight data...")
DataTasks.store_raw_data.submit(f)
Or is it preferable to submit dependent tasks like:
DataTasks.store_raw_data.submit(f) for f in fight_dataMarvin
10/26/2025, 10:18 AMMarvin
10/26/2025, 10:19 AM[DataTasks.store_raw_data.submit(f) for f in fight_data]
- Bad (no-op): DataTasks.store_raw_data.submit(f) for f in fight_data
When to use each pattern
- Submit all dependents at once (simple, scalable):
# wire dependencies for all extracts; Prefect will start each store as soon as its extract finishes
store_futs = [DataTasks.store_raw_data.submit(f) for f in fight_data]
# optional: surface any errors from store tasks
for sf in as_completed(store_futs):
sf.result()
Pros: simple; fully parallel pipeline; lets the engine schedule everything. Cons: if you have thousands of items, you create thousands of downstream tasks up front (higher memory/graph overhead).
- Submit dependents reactively with as_completed (conditional logic/backpressure):
store_futs = []
for f in as_completed(fight_data):
v = f.result() # since f is done, this returns immediately
if v is None:
continue # skip empty extracts
store_futs.append(DataTasks.store_raw_data.submit(v))
# optional: surface errors
for sf in as_completed(store_futs):
sf.result()
Pros: lets you filter/branch per result; avoids creating downstream tasks for failed/None extracts; can throttle/batch; reduces the number of queued tasks. Cons: a bit more code.
Notes
- Passing a PrefectFuture into .submit is fine; Prefect will resolve it to the value before running the downstream task. Use the resolved value (as above) only if you want to conditionally skip or transform.
- If you don’t need per-item logic and your cardinality isn’t huge, the “submit all + optionally wait later” pattern is usually simplest.
- If you don’t need the results in the flow, you can skip the final waits; just be aware failures in store tasks won’t raise in the flow unless you resolve those futures.
- Alternative wiring without passing data (just dependency): DataTasks.store_raw_data.submit(rec=None, wait_for=[f]).
Docs:
- Futures API (as_completed, result)
- Run work concurrently
If you share your expected scale (number of fights per run), I can recommend which approach (and any batching) will fit best.Ben
10/26/2025, 10:54 AM@flow(
log_prints=True,
task_runner=ThreadPoolTaskRunner(max_workers=3),
persist_result=True,
result_storage=r2_storage, # critical: results live in R2
result_serializer=json_ser,
)
def tapology_routine_fights_data_sync():
fights = list(mongodb.find_many(<some_filter>))
print(f"Found {len(fights)} fights to process.")
# Fan-out extracts
fight_data = [Tapology.fights.tapology_extract_fight_data.submit(fight_url=fight.get('source_url', None)) for fight in fights]
# As each extract finishes, schedule the corresponding load
store_futs = []
for f in as_completed(fight_data):
v = f.result() # since f is done, this returns immediately
if v is None:
continue # skip empty extracts
print("Scheduling load for extracted fight data...")
print(v)
store_futs.append(DataTasks.store_raw_data.submit(v))
# optional: surface errors
for sf in as_completed(store_futs):
sf.result()
As each for loop iteration for f in as_completed(fight_data) initiates, it is correctly printing the print(v) result when the value i not none , however the following function DataTasks.store_raw_data.submit(v) is still not being executed. I can confirm that the tasks do have logging enabled, the code looks something like this:
@task(
log_prints=True,
retries=3,
retry_delay_seconds=5,
persist_result=True,
result_serializer=json_ser,
)
def store_raw_data(data: metadataModels.RawDataStorage):
"""
Store raw data in MongoDB and archive it in Cloudflare R2.
:param data: Raw data to store.
"""
# Save the data to MongoDB
print("Storing raw data in MongoDB...")
Why is the dependent task seemingly not being executed?Marvin
10/26/2025, 10:54 AMMarvin
10/26/2025, 10:57 AMDataTasks.store_raw_data.submit(v), the most common reason you won’t see the task’s prints is that the task never actually starts because its input cannot be scheduled/serialized.
What’s likely happening
- You’re passing v (looks like a custom model metadataModels.RawDataStorage) as a task argument. In Prefect 3, task inputs must be JSON-serializable. If not, Prefect falls back to storing the inputs in a “task scheduling storage” location before the task starts. If that storage write fails (e.g., due to R2 timeout/misconfig) or the object cannot be serialized, the task will not start — so you won’t see its prints.
- This aligns with the R2 timeouts you saw earlier: even though your extract results persisted to R2, the parameter handoff for the store task may still be trying to write to storage and timing out.
How to confirm quickly
- Check the Prefect UI for your store_raw_data task runs — do they exist? What state are they in? If they’re Failed or Crashed, expand logs; you’ll likely see a “failed to store parameters” or serialization error.
- Add a try/except around submit and print the future/state:
try:
sf = DataTasks.store_raw_data.submit(v)
print("Submitted store task:", sf)
store_futs.append(sf)
except Exception as e:
print("Submit failed:", repr(e))
- After the run, inspect each store future’s final state to surface errors:
for sf in as_completed(store_futs):
try:
sf.result()
print("Store completed:", sf)
except Exception as exc:
print("Store failed:", repr(exc))
Fixes you can apply
- Make the argument JSON-serializable before submitting:
- If it’s Pydantic: v = v.model_dump(mode="json")
- If it’s a dataclass: convert to dict
- If it’s arbitrary, serialize to a plain dict you control
- Or, avoid passing the heavy payload at all:
- Pass a small ID or URL and have store_raw_data fetch from MongoDB/R2 inside the task.
- This is generally more robust and keeps task parameter payloads small.
- If you want to pass complex objects, ensure the parameter scheduling storage is reliable:
- Since you’ve seen R2 timeouts, avoid using R2 for parameter handoff. Keep parameters JSONable or store locally and pass only light references between tasks.
Minor improvements
- You can pass the future directly and do the None check inside the store task to avoid materializing large objects in the flow:
for f in as_completed(fight_data):
store_futs.append(DataTasks.store_raw_data.submit(f))
Inside the task:
@task(log_prints=True, persist_result=True, result_serializer=json_ser)
def store_raw_data(data):
if data is None:
print("Skip: no data")
return
print("Storing raw data in MongoDB...")
- If you keep the flow-side None filter, still ensure the object you pass is JSON-serializable (or convert it), otherwise scheduling will fail before the task starts.
If you can share:
- What type v is (Pydantic model? dict?)
- Whether the store_raw_data task run shows up in the Prefect UI and its state/logs
…I can pinpoint whether it’s serialization vs storage handoff and give you a tailored change. References: Concurrent work and Results and storage.Ben
10/26/2025, 1:57 PMtask_runner=ThreadPoolTaskRunner(max_workers=3) , the execution of the dependent task is happening sequentially.
I think that first all of the iterations of fight_data = [Tapology.fights.tapology_extract_fight_data.submit(fight_url=fight.get('source_url', None)) for fight in fights] are finishing, and then only after all those tasks are completing am I seeing the output of the DataTasks.store_raw_data() tasks.
The reason I was not seeing the output from these tasks before, was because there are hundreds of fights that get processed, and I was not waiting until they were all completed. When I limited the number of fights to only 10 fights, I could clearly see that only after all 10 fights completed the extraction task, were they then executed through the data store tasksMarvin
10/26/2025, 1:57 PMMarvin
10/26/2025, 1:58 PMfrom prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
BATCH_SIZE = 50 # tune to your scale
@flow(task_runner=ThreadPoolTaskRunner(max_workers=3), log_prints=True)
def tapology_routine_fights_data_sync():
fights = list(mongodb.find_many(<some_filter>))
print(f"Found {len(fights)} fights to process.")
store_futs = []
for i in range(0, len(fights), BATCH_SIZE):
batch = fights[i:i+BATCH_SIZE]
extracts = [
Tapology.fights.tapology_extract_fight_data.submit(
fight_url=f.get("source_url")
)
for f in batch
]
for fut in as_completed(extracts):
v = fut.result()
if v is None:
continue
store_futs.append(DataTasks.store_raw_data.submit(v))
# surface errors
for sf in as_completed(store_futs):
sf.result()
- Increase max_workers so there are spare threads to start store tasks earlier. This helps but doesn’t change FIFO ordering; batching is still advisable.
- Fuse extract+store into one task if you want strict “store right after extract” behavior per item (simplest pipeline, fewer scheduling/queue interactions).
- Advanced: build a bounded pipeline (seed N extracts, and for each completion submit a store and the next extract) to keep a steady-state concurrency. This is more code; batching is usually enough.
Notes
- as_completed only controls when you submit the dependent task, not when it is scheduled relative to already-queued tasks.
- Passing futures (or values) to .submit is fine; Prefect will respect dependencies either way.
Docs
- Task runners and scheduling
- Run work concurrently (as_completed)