Madushan Pathirana
10/18/2025, 6:20 AMMarvin
10/18/2025, 6:21 AMMarvin
10/18/2025, 6:27 AMresolve_futures_to_results
- Or pass the futures directly to a downstream task; Prefect will resolve them before that task runs
Below is a complete example that:
- Submits your fetch tasks concurrently and keeps them in a dict keyed by upload_key
- Resolves them to DataFrames
- Writes one Excel file (either one sheet per DF or a single combined sheet)
- Uploads to S3 using a Prefect AwsCredentials block
from typing import Dict, List
import pandas as pd
from prefect import flow, task
from prefect.futures import resolve_futures_to_results
@task
def fetch_paginated_results(api_dict: dict, refresh_token: str) -> pd.DataFrame:
# TODO: your existing implementation
...
@task
def write_excel_and_upload(
dfs_by_key: Dict[str, pd.DataFrame],
bucket: str,
key: str,
aws_block_name: str,
one_sheet_per_df: bool = True
) -> str:
import io
from prefect_aws.credentials import AwsCredentials
from prefect_aws.s3 import upload_to_bucket
with io.BytesIO() as buffer:
with pd.ExcelWriter(buffer, engine="xlsxwriter") as writer:
if one_sheet_per_df:
# One sheet per DataFrame; sheet names must be <= 31 chars
for sheet_name, df in dfs_by_key.items():
safe_name = str(sheet_name)[:31]
df.to_excel(writer, sheet_name=safe_name, index=False)
else:
# Single sheet: combine them (adjust concat as needed)
combined = pd.concat(dfs_by_key.values(), ignore_index=True)
combined.to_excel(writer, sheet_name="data", index=False)
buffer.seek(0)
aws = AwsCredentials.load(aws_block_name)
# upload_to_bucket expects bytes; returns the S3 key
return upload_to_bucket(
data=buffer.getvalue(),
bucket=bucket,
key=key,
aws_credentials=aws,
)
@flow
def my_flow(
api_dict_list: List[dict],
refresh_token: str,
s3_bucket: str,
s3_key: str,
aws_block_name: str = "my-aws-creds",
one_sheet_per_df: bool = True,
):
# 1) Submit tasks and keep futures in a dict keyed by upload_key
futures_by_key = {}
for api_dict in api_dict_list:
upload_key = api_dict.get("upload_key")
fut = fetch_paginated_results.with_options(
name="fetch_paginated_results",
task_run_name=f"{upload_key}",
).submit(api_dict, refresh_token)
futures_by_key[upload_key] = fut
# 2) Resolve futures to concrete DataFrames
dfs_by_key = resolve_futures_to_results(futures_by_key)
# 3) Write Excel + upload to S3
write_excel_and_upload.submit(
dfs_by_key,
bucket=s3_bucket,
key=s3_key,
aws_block_name=aws_block_name,
one_sheet_per_df=one_sheet_per_df,
)
Notes and tips:
- Because you’re running everything on a single EC2 worker, keeping results in memory like this is ideal. You don’t need result persistence just to use them later in the same flow.
- If you prefer, you can skip explicit resolution and pass futures_by_key directly to `write_excel_and_upload.submit(...)`; Prefect will resolve the futures for that downstream task automatically.
- For large DataFrames, avoid enabling result persistence for these tasks unless you’ve configured remote result storage (e.g., S3) and an appropriate serializer. By default in Prefect 3.x, task results are not persisted, which is good for large in-memory DataFrames.
A couple of quick questions so I can tailor this further:Marvin
10/18/2025, 6:27 AMMadushan Pathirana
10/18/2025, 6:35 AMMarvin
10/18/2025, 6:35 AMMarvin
10/18/2025, 6:36 AMorders won’t cause issues by itself. The likely problem is that df is a PrefectFuture (from .submit(...)), and you’re passing it into orders_s3_upload. Whether that’s okay depends on what orders_s3_upload is:
- If orders_s3_upload is a Prefect task (@task): call it with .submit(...) and pass the future; Prefect will handle dependency resolution.
- If orders_s3_upload is a regular Python function: you must resolve the future first (e.g., df.result()), or it will fail when it tries to use a PrefectFuture as a DataFrame.
Here are both correct patterns.
Option A: orders_s3_upload is a Prefect task
futures_by_key = {}
for api_dict in api_dict_list:
upload_key = api_dict.get("upload_key")
df_fut = ebay_extraction.fetch_paginated_results.with_options(
name="fetch_paginated_results",
task_run_name=f"{upload_key}",
).submit(api_dict, refresh_token)
futures_by_key[upload_key] = df_fut
if upload_key == "orders":
orders_lineitems_expanded_upload_key = "order_with_lineItems"
expanded_fut = ebay_extraction.orders_s3_upload.with_options(
task_run_name=orders_lineitems_expanded_upload_key
).submit(df_fut, orders_lineitems_expanded_upload_key)
futures_by_key[orders_lineitems_expanded_upload_key] = expanded_fut
# Later, when you want the actual DataFrames:
from prefect.futures import resolve_futures_to_results
dfs_by_key = resolve_futures_to_results(futures_by_key)
Option B: orders_s3_upload is a plain function
futures_by_key = {}
for api_dict in api_dict_list:
upload_key = api_dict.get("upload_key")
df_fut = ebay_extraction.fetch_paginated_results.with_options(
name="fetch_paginated_results",
task_run_name=f"{upload_key}",
).submit(api_dict, refresh_token)
futures_by_key[upload_key] = df_fut
if upload_key == "orders":
orders_lineitems_expanded_upload_key = "order_with_lineItems"
orders_df = df_fut.result() # resolve the future
expanded_df = ebay_extraction.orders_s3_upload(
orders_df, orders_lineitems_expanded_upload_key
)
# If you want to include this in the later Excel step:
futures_by_key[orders_lineitems_expanded_upload_key] = expanded_df # note: this is now a concrete DF
A couple of extra tips:
- If your goal is a single Excel with all tables, avoid uploading inside `orders_s3_upload`; have it return a DataFrame and do one final “write Excel + upload to S3” step after collecting everything.
- You can skip explicit resolution and pass futures directly into a downstream Prefect task (e.g., a “write_excel_and_upload” task); Prefect will resolve them automatically. Plain functions cannot accept futures.
Questions:
- Is orders_s3_upload a Prefect task or a regular function?
- Do you want the expanded “order_with_lineItems” table included in the same Excel file as the others?Madushan Pathirana
10/18/2025, 6:44 AMMarvin
10/18/2025, 6:44 AMMarvin
10/18/2025, 6:46 AMdf into another task via .submit(...) is the right pattern. Prefect will resolve the df future to a concrete DataFrame before orders_s3_upload runs, and it will set the dependency automatically. The string 'orders' won’t cause issues.
Since orders_s3_upload crashed, the next step is to check its task logs/traceback to see why. Here are quick CLI commands (verified) to find and view the logs:
- List recent CRASHED task runs and look for your task_run_name (you set it to order_with_lineItems):
prefect task-run ls --state-type CRASHED --limit 20
- Once you have the task run ID:
prefect task-run logs <TASK_RUN_ID>
Common causes to check:
- Argument mismatch: ensure `orders_s3_upload`’s signature matches how you call it (first arg is the DataFrame, second is the upload key). For example:
@task(log_prints=True)
def orders_s3_upload(df: pd.DataFrame, upload_key: str) -> str:
...
- Missing dependencies on the worker: if the task writes Excel or uploads to S3, make sure the worker environment has the required packages installed (e.g., pandas, xlsxwriter/openpyxl, boto3 or prefect-aws if you use it).
- Credentials/permissions: if you upload to S3, confirm the worker has valid AWS credentials (either environment variables or a Prefect AwsCredentials block you load in the task) and permission to write to the target bucket/key.
- File system paths: if you write to disk before upload, ensure directories exist and the process has permissions.
- Returning very large objects with persist_result=True: avoid persisting huge DataFrames as task results unless you’ve configured remote result storage and an appropriate serializer.
To make debugging easier, you can increase logging and add retries:
order_with_lineItems_expanded_df = ebay_extraction.orders_s3_upload.with_options(
task_run_name=orders_lineitems_expanded_upload_key,
log_prints=True,
retries=2,
retry_delay_seconds=10
).submit(df, orders_lineitems_expanded_upload_key)
If you can paste the last 30–50 lines of the crashed task’s logs (or the traceback), I can pinpoint the cause and suggest a fix quickly.Madushan Pathirana
10/18/2025, 8:28 AMMarvin
10/18/2025, 8:28 AMMarvin
10/18/2025, 8:31 AM.submit(...) returns a PrefectFuture. If your flow never awaits/uses the result of a failed future, the flow function can finish successfully, so the flow run is marked Completed.
- A task failure only propagates to the flow if you:
- Resolve/await the future (e.g., future.result() or resolve_futures_to_results(...))
- Or pass the future into a downstream task that runs (Prefect will resolve it for that task; if it fails, that task will fail and the exception can propagate if you await it)
How to “fail the flow if any task failed”
- Easiest: resolve all futures at the end. If any failed/crashed, this will raise and mark the flow Failed.
from prefect.futures import resolve_futures_to_results
# collect futures
futures = []
for api_dict in api_dict_list:
fut = ebay_extraction.fetch_paginated_results.submit(api_dict, refresh_token)
futures.append(fut)
# force failure if any task failed/crashed
_ = resolve_futures_to_results(futures)
- Or check states explicitly:
from prefect.futures import resolve_futures_to_states
states = resolve_futures_to_states(futures)
bad = [s for s in states if s.is_failed() or s.is_crashed()]
if bad:
raise RuntimeError(f"{len(bad)} tasks failed/crashed")
Why some parallel tasks fail but succeed on rerun
- Usually transient issues: API rate limits, intermittent network errors, service hiccups, resource contention on your single EC2 host (CPU/memory), or timing issues with upstream systems.
- On rerun, timing or load conditions change and the task succeeds.
Hardening tips
- Add retries with jitter to smooth transient failures:
my_task = my_task.with_options(retries=3, retry_delay_seconds=10, retry_jitter_factor=0.3)
- Limit concurrency/rate for fragile external APIs:
- Keep worker-level concurrency reasonable (e.g., fewer threads/processes).
- Optionally use Prefect’s concurrency contexts to cap parallelism across tasks accessing the same API.
- Choose a task runner that matches your workload:
- I/O-bound (API calls, S3): ThreadPoolTaskRunner
- CPU-bound (heavy pandas): ProcessPoolTaskRunner
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=4))
def my_flow(...):
...
- Ensure packages/credentials are consistent on the worker (pandas/openpyxl/xlsxwriter, boto3/prefect-aws, AWS creds).
If you share the last 30–50 lines of the crashed orders_s3_upload task logs, I can help pinpoint whether this was transient (retryable) or deterministic (code/inputs).Madushan Pathirana
10/21/2025, 5:34 AMMarvin
10/21/2025, 5:34 AMMarvin
10/21/2025, 5:35 AM.result() and then call another task, Prefect no longer sees any dependency — so the final task can start as soon as it’s scheduled.
Two problems in your snippet:
- You’re calling .result() inside the loop. That blocks the flow (reduces concurrency) and strips dependency information.
- You call the final task with a dict of concrete DataFrames, so Prefect can’t “see” it depends on the fetch/expand tasks.
Fix: keep futures in the dict, then pass that dict to the final task. Prefect will resolve them for you and won’t start the final task until all those futures complete.
Example rewrite:
from prefect.futures import resolve_futures_to_results # optional if you prefer explicit resolution
dataframe_futures = {}
for api_dict in api_dict_list:
upload_key = api_dict.get("upload_key")
df_fut = ebay_extraction.fetch_paginated_results.with_options(
name="fetch_paginated_results",
task_run_name=f"{upload_key}",
).submit(api_dict, refresh_token)
dataframe_futures[upload_key] = df_fut
if upload_key == "orders":
orders_lineitems_expanded_upload_key = "order_with_lineItems"
expanded_fut = ebay_extraction.orders_s3_upload.with_options(
task_run_name=orders_lineitems_expanded_upload_key
).submit(df_fut, orders_lineitems_expanded_upload_key)
dataframe_futures[orders_lineitems_expanded_upload_key] = expanded_fut
# Option A (recommended): pass futures directly; Prefect will wait and resolve automatically
ebay_extraction.upload_combine_excel.with_options(
task_run_name="combine_excel"
).submit(dataframe_futures, store_id)
# Option B: explicitly force resolution first, then call final task
# dfs = resolve_futures_to_results(dataframe_futures)
# ebay_extraction.upload_combine_excel.with_options(task_run_name="combine_excel").submit(dfs, store_id)
Why the final task started “too early” in your version:
- Because you did df.result() and expanded_df.result() inside the loop, the final task had no visible dependency on those upstream tasks. Prefect schedules tasks based on data dependencies, not Python control flow order. Passing futures preserves those dependencies.
Bonus tips:
- Avoid .result() inside the loop — it serializes work. Keep futures and resolve once.
- If some tasks are flaky and succeed on rerun, add retries and consider limiting concurrency or adding rate limits to smooth transient API/infrastructure issues:
my_task = my_task.with_options(retries=3, retry_delay_seconds=10, retry_jitter_factor=0.3)
If you share the upload_combine_excel signature, I can suggest the cleanest way to accept a dict of futures and work with resolved DataFrames inside that task.Madushan Pathirana
10/21/2025, 5:46 AMfor api_dict in api_dict_list:
upload_key = api_dict.get('upload_key')
df = ebay_extraction.fetch_paginated_results.with_options(
name="fetch_paginated_results",
task_run_name=f"{upload_key}"
).submit(api_dict,refresh_token)
dataframe_dict[upload_key] = df
if upload_key == 'orders':
# orders_df=df.result()
orders_lineitems_expanded_upload_key = 'order_with_lineItems'
order_with_lineItems_expanded_df = ebay_extraction.orders_s3_upload.with_options(
task_run_name=orders_lineitems_expanded_upload_key
).submit(df,orders_lineitems_expanded_upload_key)
dataframe_dict[orders_lineitems_expanded_upload_key] = order_with_lineItems_expanded_df
seller_standards_profile_api = '<https://api.ebay.com/sell/analytics/v1/seller_standards_profile>'
seller_standards_profile_upload_key = 'seller_standards_profile'
seller_standards_profile_api_metric='standardsProfiles'
performance_df = ebay_extraction.get_perfomance( refresh_token,seller_standards_profile_api,seller_standards_profile_api_metric,seller_standards_profile_upload_key )
dataframe_dict[seller_standards_profile_upload_key] = performance_df
ebay_extraction.upload_combine_excel(dataframe_dict,store_id)Marvin
10/21/2025, 5:46 AMMarvin
10/21/2025, 5:47 AMdataframe_futures = {}
for api_dict in api_dict_list:
upload_key = api_dict.get("upload_key")
df_fut = ebay_extraction.fetch_paginated_results.with_options(
name="fetch_paginated_results",
task_run_name=f"{upload_key}",
).submit(api_dict, refresh_token)
dataframe_futures[upload_key] = df_fut
if upload_key == "orders":
orders_lineitems_expanded_upload_key = "order_with_lineItems"
expanded_fut = ebay_extraction.orders_s3_upload.with_options(
task_run_name=orders_lineitems_expanded_upload_key
).submit(df_fut, orders_lineitems_expanded_upload_key)
dataframe_futures[orders_lineitems_expanded_upload_key] = expanded_fut
# Make performance a task too (so it’s a future like the rest)
performance_fut = ebay_extraction.get_perfomance.with_options(
task_run_name="seller_standards_profile"
).submit(
refresh_token,
"<https://api.ebay.com/sell/analytics/v1/seller_standards_profile>",
"standardsProfiles",
"seller_standards_profile",
)
dataframe_futures["seller_standards_profile"] = performance_fut
# Explicitly tell Prefect to wait for all futures before starting the final task
all_futs = list(dataframe_futures.values())
ebay_extraction.upload_combine_excel.with_options(
task_run_name="combine_excel",
wait_for=all_futs, # <- ensures it won’t start early
).submit(dataframe_futures, store_id)
2) Alternatively, resolve futures once, then call the final task
- This removes any ambiguity; the final task receives only concrete DataFrames
from prefect.futures import resolve_futures_to_results
# ... same loop as above building dataframe_futures ...
dfs = resolve_futures_to_results(dataframe_futures)
ebay_extraction.upload_combine_excel.with_options(
task_run_name="combine_excel"
).submit(dfs, store_id)
Important details
- Don’t call the final task like a normal function: upload_combine_excel(...). Use .submit(...). A direct call executes it immediately in the flow process, which is why you’re seeing it “start early.”
- Make get_perfomance a Prefect task and .submit(...) it so your dict is uniformly futures. Mixed plain objects + futures is okay, but having everything as futures plus wait_for makes dependencies unambiguous.
- If you want a belt-and-suspenders check, you can also do:
- from prefect.futures import wait
- wait(all_futs) before submitting the final task.
If it still appears to start early, can you share:
- Whether you’re seeing it move to Scheduled or Running before upstream tasks complete
- The task_run_name of the final task so we can inspect its dependencies:
- prefect task-run ls --task-run-name combine_excel --limit 5
- prefect task-run inspect <TASK_RUN_ID> to verify it lists the upstream task runs