<@ULVA73B9P> I have self hosted prefect, in my flo...
# ask-marvin
m
@Marvin I have self hosted prefect, in my flow i am creating multiple pandas data frames, i want to know if i can store them within flow so i can use them later to so i can write all of them in to same excel sheet and transfer to s3. for api_dict in api_dict_list: upload_key = api_dict.get('upload_key') df = ebay_extraction.fetch_paginated_results.with_options( name="fetch_paginated_results", task_run_name=f"{upload_key}" ).submit(api_dict,refresh_token) this is part of my flow. all the df s created in this loop need to use later
m
thought for 408.4 seconds
Yes — in Prefect 3.x you can keep the task futures in a list/dict inside your flow, then resolve them when you’re ready to write the Excel and upload to S3. You don’t need to “store” them in Prefect; keeping them in Python variables is enough for use later in the same flow run. Two patterns that work well: - Collect futures, then resolve to concrete DataFrames with
resolve_futures_to_results
- Or pass the futures directly to a downstream task; Prefect will resolve them before that task runs Below is a complete example that: - Submits your fetch tasks concurrently and keeps them in a dict keyed by
upload_key
- Resolves them to DataFrames - Writes one Excel file (either one sheet per DF or a single combined sheet) - Uploads to S3 using a Prefect AwsCredentials block
Copy code
from typing import Dict, List
import pandas as pd
from prefect import flow, task
from prefect.futures import resolve_futures_to_results

@task
def fetch_paginated_results(api_dict: dict, refresh_token: str) -> pd.DataFrame:
    # TODO: your existing implementation
    ...

@task
def write_excel_and_upload(
    dfs_by_key: Dict[str, pd.DataFrame],
    bucket: str,
    key: str,
    aws_block_name: str,
    one_sheet_per_df: bool = True
) -> str:
    import io
    from prefect_aws.credentials import AwsCredentials
    from prefect_aws.s3 import upload_to_bucket

    with io.BytesIO() as buffer:
        with pd.ExcelWriter(buffer, engine="xlsxwriter") as writer:
            if one_sheet_per_df:
                # One sheet per DataFrame; sheet names must be <= 31 chars
                for sheet_name, df in dfs_by_key.items():
                    safe_name = str(sheet_name)[:31]
                    df.to_excel(writer, sheet_name=safe_name, index=False)
            else:
                # Single sheet: combine them (adjust concat as needed)
                combined = pd.concat(dfs_by_key.values(), ignore_index=True)
                combined.to_excel(writer, sheet_name="data", index=False)

        buffer.seek(0)
        aws = AwsCredentials.load(aws_block_name)
        # upload_to_bucket expects bytes; returns the S3 key
        return upload_to_bucket(
            data=buffer.getvalue(),
            bucket=bucket,
            key=key,
            aws_credentials=aws,
        )

@flow
def my_flow(
    api_dict_list: List[dict],
    refresh_token: str,
    s3_bucket: str,
    s3_key: str,
    aws_block_name: str = "my-aws-creds",
    one_sheet_per_df: bool = True,
):
    # 1) Submit tasks and keep futures in a dict keyed by upload_key
    futures_by_key = {}
    for api_dict in api_dict_list:
        upload_key = api_dict.get("upload_key")
        fut = fetch_paginated_results.with_options(
            name="fetch_paginated_results",
            task_run_name=f"{upload_key}",
        ).submit(api_dict, refresh_token)
        futures_by_key[upload_key] = fut

    # 2) Resolve futures to concrete DataFrames
    dfs_by_key = resolve_futures_to_results(futures_by_key)

    # 3) Write Excel + upload to S3
    write_excel_and_upload.submit(
        dfs_by_key,
        bucket=s3_bucket,
        key=s3_key,
        aws_block_name=aws_block_name,
        one_sheet_per_df=one_sheet_per_df,
    )
Notes and tips: - Because you’re running everything on a single EC2 worker, keeping results in memory like this is ideal. You don’t need result persistence just to use them later in the same flow. - If you prefer, you can skip explicit resolution and pass
futures_by_key
directly to `write_excel_and_upload.submit(...)`; Prefect will resolve the futures for that downstream task automatically. - For large DataFrames, avoid enabling result persistence for these tasks unless you’ve configured remote result storage (e.g., S3) and an appropriate serializer. By default in Prefect 3.x, task results are not persisted, which is good for large in-memory DataFrames. A couple of quick questions so I can tailor this further:
- Do you want one sheet per DataFrame, or a single sheet combining them? - Do you already have an AwsCredentials block set up in Prefect (access key/secret), and what are the S3 bucket/key you plan to use?
m
@Marvin for api_dict in api_dict_list: upload_key = api_dict.get('upload_key') df = ebay_extraction.fetch_paginated_results.with_options( name="fetch_paginated_results", task_run_name=f"{upload_key}" ).submit(api_dict,refresh_token) if upload_key == 'orders': orders_lineitems_expanded_upload_key = 'order_with_lineItems' order_with_lineItems_expanded_df = ebay_extraction.orders_s3_upload(df,orders_lineitems_expanded_upload_key) will orders create any issues
m
thought for 30.3 seconds
Short answer: the string value
orders
won’t cause issues by itself. The likely problem is that
df
is a PrefectFuture (from
.submit(...)
), and you’re passing it into
orders_s3_upload
. Whether that’s okay depends on what
orders_s3_upload
is: - If
orders_s3_upload
is a Prefect task (@task): call it with
.submit(...)
and pass the future; Prefect will handle dependency resolution. - If
orders_s3_upload
is a regular Python function: you must resolve the future first (e.g.,
df.result()
), or it will fail when it tries to use a PrefectFuture as a DataFrame. Here are both correct patterns. Option A: orders_s3_upload is a Prefect task
Copy code
futures_by_key = {}

for api_dict in api_dict_list:
    upload_key = api_dict.get("upload_key")

    df_fut = ebay_extraction.fetch_paginated_results.with_options(
        name="fetch_paginated_results",
        task_run_name=f"{upload_key}",
    ).submit(api_dict, refresh_token)

    futures_by_key[upload_key] = df_fut

    if upload_key == "orders":
        orders_lineitems_expanded_upload_key = "order_with_lineItems"
        expanded_fut = ebay_extraction.orders_s3_upload.with_options(
            task_run_name=orders_lineitems_expanded_upload_key
        ).submit(df_fut, orders_lineitems_expanded_upload_key)
        futures_by_key[orders_lineitems_expanded_upload_key] = expanded_fut

# Later, when you want the actual DataFrames:
from prefect.futures import resolve_futures_to_results
dfs_by_key = resolve_futures_to_results(futures_by_key)
Option B: orders_s3_upload is a plain function
Copy code
futures_by_key = {}

for api_dict in api_dict_list:
    upload_key = api_dict.get("upload_key")

    df_fut = ebay_extraction.fetch_paginated_results.with_options(
        name="fetch_paginated_results",
        task_run_name=f"{upload_key}",
    ).submit(api_dict, refresh_token)

    futures_by_key[upload_key] = df_fut

    if upload_key == "orders":
        orders_lineitems_expanded_upload_key = "order_with_lineItems"
        orders_df = df_fut.result()  # resolve the future
        expanded_df = ebay_extraction.orders_s3_upload(
            orders_df, orders_lineitems_expanded_upload_key
        )
        # If you want to include this in the later Excel step:
        futures_by_key[orders_lineitems_expanded_upload_key] = expanded_df  # note: this is now a concrete DF
A couple of extra tips: - If your goal is a single Excel with all tables, avoid uploading inside `orders_s3_upload`; have it return a DataFrame and do one final “write Excel + upload to S3” step after collecting everything. - You can skip explicit resolution and pass futures directly into a downstream Prefect task (e.g., a “write_excel_and_upload” task); Prefect will resolve them automatically. Plain functions cannot accept futures. Questions: - Is
orders_s3_upload
a Prefect task or a regular function? - Do you want the expanded “order_with_lineItems” table included in the same Excel file as the others?
m
@Marvin orders_s3_upload is prefect task. fetch_paginated_results for orders is completed but orders_s3_upload crashed for api_dict in api_dict_list: upload_key = api_dict.get('upload_key') df = ebay_extraction.fetch_paginated_results.with_options( name="fetch_paginated_results", task_run_name=f"{upload_key}" ).submit(api_dict,refresh_token) if upload_key == 'orders': orders_lineitems_expanded_upload_key = 'order_with_lineItems' order_with_lineItems_expanded_df = ebay_extraction.orders_s3_upload.with_options( task_run_name=orders_lineitems_expanded_upload_key ).submit(df,orders_lineitems_expanded_upload_key)
m
thought for 90.8 seconds
Good news: passing the future
df
into another task via
.submit(...)
is the right pattern. Prefect will resolve the
df
future to a concrete DataFrame before
orders_s3_upload
runs, and it will set the dependency automatically. The string
'orders'
won’t cause issues. Since
orders_s3_upload
crashed, the next step is to check its task logs/traceback to see why. Here are quick CLI commands (verified) to find and view the logs: - List recent CRASHED task runs and look for your
task_run_name
(you set it to
order_with_lineItems
):
Copy code
prefect task-run ls --state-type CRASHED --limit 20
- Once you have the task run ID:
Copy code
prefect task-run logs <TASK_RUN_ID>
Common causes to check: - Argument mismatch: ensure `orders_s3_upload`’s signature matches how you call it (first arg is the DataFrame, second is the upload key). For example:
Copy code
@task(log_prints=True)
def orders_s3_upload(df: pd.DataFrame, upload_key: str) -> str:
    ...
- Missing dependencies on the worker: if the task writes Excel or uploads to S3, make sure the worker environment has the required packages installed (e.g., pandas, xlsxwriter/openpyxl, boto3 or prefect-aws if you use it). - Credentials/permissions: if you upload to S3, confirm the worker has valid AWS credentials (either environment variables or a Prefect AwsCredentials block you load in the task) and permission to write to the target bucket/key. - File system paths: if you write to disk before upload, ensure directories exist and the process has permissions. - Returning very large objects with persist_result=True: avoid persisting huge DataFrames as task results unless you’ve configured remote result storage and an appropriate serializer. To make debugging easier, you can increase logging and add retries:
Copy code
order_with_lineItems_expanded_df = ebay_extraction.orders_s3_upload.with_options(
    task_run_name=orders_lineitems_expanded_upload_key,
    log_prints=True,
    retries=2,
    retry_delay_seconds=10
).submit(df, orders_lineitems_expanded_upload_key)
If you can paste the last 30–50 lines of the crashed task’s logs (or the traceback), I can pinpoint the cause and suggest a fix quickly.
m
@Marvin why flow doesn't crash even one task is crash,and same flow some parallel task fail but when I rerun they are successful
m
thought for 124.2 seconds
Great questions — both behaviors are expected in Prefect 3.x. Why the flow doesn’t crash when a task crashes -
.submit(...)
returns a PrefectFuture. If your flow never awaits/uses the result of a failed future, the flow function can finish successfully, so the flow run is marked Completed. - A task failure only propagates to the flow if you: - Resolve/await the future (e.g.,
future.result()
or
resolve_futures_to_results(...)
) - Or pass the future into a downstream task that runs (Prefect will resolve it for that task; if it fails, that task will fail and the exception can propagate if you await it) How to “fail the flow if any task failed” - Easiest: resolve all futures at the end. If any failed/crashed, this will raise and mark the flow Failed.
Copy code
from prefect.futures import resolve_futures_to_results

# collect futures
futures = []
for api_dict in api_dict_list:
    fut = ebay_extraction.fetch_paginated_results.submit(api_dict, refresh_token)
    futures.append(fut)

# force failure if any task failed/crashed
_ = resolve_futures_to_results(futures)
- Or check states explicitly:
Copy code
from prefect.futures import resolve_futures_to_states

states = resolve_futures_to_states(futures)
bad = [s for s in states if s.is_failed() or s.is_crashed()]
if bad:
    raise RuntimeError(f"{len(bad)} tasks failed/crashed")
Why some parallel tasks fail but succeed on rerun - Usually transient issues: API rate limits, intermittent network errors, service hiccups, resource contention on your single EC2 host (CPU/memory), or timing issues with upstream systems. - On rerun, timing or load conditions change and the task succeeds. Hardening tips - Add retries with jitter to smooth transient failures:
Copy code
my_task = my_task.with_options(retries=3, retry_delay_seconds=10, retry_jitter_factor=0.3)
- Limit concurrency/rate for fragile external APIs: - Keep worker-level concurrency reasonable (e.g., fewer threads/processes). - Optionally use Prefect’s concurrency contexts to cap parallelism across tasks accessing the same API. - Choose a task runner that matches your workload: - I/O-bound (API calls, S3): ThreadPoolTaskRunner - CPU-bound (heavy pandas): ProcessPoolTaskRunner
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner(max_workers=4))
def my_flow(...):
    ...
- Ensure packages/credentials are consistent on the worker (pandas/openpyxl/xlsxwriter, boto3/prefect-aws, AWS creds). If you share the last 30–50 lines of the crashed
orders_s3_upload
task logs, I can help pinpoint whether this was transient (retryable) or deterministic (code/inputs).
m
@Marvin for api_dict in api_dict_list: upload_key = api_dict.get('upload_key') df = ebay_extraction.fetch_paginated_results.with_options( name="fetch_paginated_results", task_run_name=f"{upload_key}" ).submit(api_dict,refresh_token) dataframe_dict[upload_key] = df.result() if upload_key == 'orders': orders_df=df.result() orders_lineitems_expanded_upload_key = 'order_with_lineItems' order_with_lineItems_expanded_df = ebay_extraction.orders_s3_upload.with_options( task_run_name=orders_lineitems_expanded_upload_key ).submit(orders_df,orders_lineitems_expanded_upload_key) dataframe_dict[orders_lineitems_expanded_upload_key] = order_with_lineItems_expanded_df.result() seller_standards_profile_api = 'https://api.ebay.com/sell/analytics/v1/seller_standards_profile' seller_standards_profile_upload_key = 'seller_standards_profile' seller_standards_profile_api_metric='standardsProfiles' performance_df = ebay_extraction.get_perfomance( refresh_token,seller_standards_profile_api,seller_standards_profile_api_metric,seller_standards_profile_upload_key ) dataframe_dict[seller_standards_profile_upload_key] = performance_df ebay_extraction.upload_combine_excel(dataframe_dict,store_id) ebay_extraction.upload_combine_excel is prefect task , it start before dataframe_dict[upload_key] = df.result(), before realizing all dfs in the loop
m
thought for 56.4 seconds
What you’re seeing is due to dependency tracking. In Prefect, a downstream task will only wait for upstream work if you pass it the upstream futures (or otherwise create an explicit dependency). If you resolve futures with
.result()
and then call another task, Prefect no longer sees any dependency — so the final task can start as soon as it’s scheduled. Two problems in your snippet: - You’re calling
.result()
inside the loop. That blocks the flow (reduces concurrency) and strips dependency information. - You call the final task with a dict of concrete DataFrames, so Prefect can’t “see” it depends on the fetch/expand tasks. Fix: keep futures in the dict, then pass that dict to the final task. Prefect will resolve them for you and won’t start the final task until all those futures complete. Example rewrite:
Copy code
from prefect.futures import resolve_futures_to_results  # optional if you prefer explicit resolution

dataframe_futures = {}

for api_dict in api_dict_list:
    upload_key = api_dict.get("upload_key")

    df_fut = ebay_extraction.fetch_paginated_results.with_options(
        name="fetch_paginated_results",
        task_run_name=f"{upload_key}",
    ).submit(api_dict, refresh_token)

    dataframe_futures[upload_key] = df_fut

    if upload_key == "orders":
        orders_lineitems_expanded_upload_key = "order_with_lineItems"
        expanded_fut = ebay_extraction.orders_s3_upload.with_options(
            task_run_name=orders_lineitems_expanded_upload_key
        ).submit(df_fut, orders_lineitems_expanded_upload_key)
        dataframe_futures[orders_lineitems_expanded_upload_key] = expanded_fut

# Option A (recommended): pass futures directly; Prefect will wait and resolve automatically
ebay_extraction.upload_combine_excel.with_options(
    task_run_name="combine_excel"
).submit(dataframe_futures, store_id)

# Option B: explicitly force resolution first, then call final task
# dfs = resolve_futures_to_results(dataframe_futures)
# ebay_extraction.upload_combine_excel.with_options(task_run_name="combine_excel").submit(dfs, store_id)
Why the final task started “too early” in your version: - Because you did
df.result()
and
expanded_df.result()
inside the loop, the final task had no visible dependency on those upstream tasks. Prefect schedules tasks based on data dependencies, not Python control flow order. Passing futures preserves those dependencies. Bonus tips: - Avoid
.result()
inside the loop — it serializes work. Keep futures and resolve once. - If some tasks are flaky and succeed on rerun, add retries and consider limiting concurrency or adding rate limits to smooth transient API/infrastructure issues:
Copy code
my_task = my_task.with_options(retries=3, retry_delay_seconds=10, retry_jitter_factor=0.3)
If you share the
upload_combine_excel
signature, I can suggest the cleanest way to accept a dict of futures and work with resolved DataFrames inside that task.
m
@Marvin still upload_combine_excel starts ear;y
Copy code
for api_dict in api_dict_list:   
            upload_key = api_dict.get('upload_key')
            
            df = ebay_extraction.fetch_paginated_results.with_options(
                name="fetch_paginated_results",
                task_run_name=f"{upload_key}"
                ).submit(api_dict,refresh_token)
            
            dataframe_dict[upload_key]  = df

            if upload_key == 'orders':
                # orders_df=df.result()
                orders_lineitems_expanded_upload_key = 'order_with_lineItems'
                order_with_lineItems_expanded_df = ebay_extraction.orders_s3_upload.with_options(
                                                    task_run_name=orders_lineitems_expanded_upload_key
                                                    ).submit(df,orders_lineitems_expanded_upload_key)
                
                
                dataframe_dict[orders_lineitems_expanded_upload_key] = order_with_lineItems_expanded_df

            
        
                                      
        seller_standards_profile_api = '<https://api.ebay.com/sell/analytics/v1/seller_standards_profile>'
        seller_standards_profile_upload_key = 'seller_standards_profile'
        seller_standards_profile_api_metric='standardsProfiles'

        performance_df = ebay_extraction.get_perfomance( refresh_token,seller_standards_profile_api,seller_standards_profile_api_metric,seller_standards_profile_upload_key )
        dataframe_dict[seller_standards_profile_upload_key] = performance_df

        ebay_extraction.upload_combine_excel(dataframe_dict,store_id)
m
thought for 55.4 seconds
Got it — the behavior you’re seeing is because Prefect doesn’t “see” a dependency unless you either pass upstream futures to the task or explicitly tell it to wait. Two concrete fixes: 1) Submit the final task and explicitly wait for all futures - Call the final task with .submit(...) - Use wait_for to force a dependency on every future so it cannot start early
Copy code
dataframe_futures = {}

for api_dict in api_dict_list:
    upload_key = api_dict.get("upload_key")

    df_fut = ebay_extraction.fetch_paginated_results.with_options(
        name="fetch_paginated_results",
        task_run_name=f"{upload_key}",
    ).submit(api_dict, refresh_token)

    dataframe_futures[upload_key] = df_fut

    if upload_key == "orders":
        orders_lineitems_expanded_upload_key = "order_with_lineItems"
        expanded_fut = ebay_extraction.orders_s3_upload.with_options(
            task_run_name=orders_lineitems_expanded_upload_key
        ).submit(df_fut, orders_lineitems_expanded_upload_key)
        dataframe_futures[orders_lineitems_expanded_upload_key] = expanded_fut

# Make performance a task too (so it’s a future like the rest)
performance_fut = ebay_extraction.get_perfomance.with_options(
    task_run_name="seller_standards_profile"
).submit(
    refresh_token,
    "<https://api.ebay.com/sell/analytics/v1/seller_standards_profile>",
    "standardsProfiles",
    "seller_standards_profile",
)
dataframe_futures["seller_standards_profile"] = performance_fut

# Explicitly tell Prefect to wait for all futures before starting the final task
all_futs = list(dataframe_futures.values())
ebay_extraction.upload_combine_excel.with_options(
    task_run_name="combine_excel",
    wait_for=all_futs,               # <- ensures it won’t start early
).submit(dataframe_futures, store_id)
2) Alternatively, resolve futures once, then call the final task - This removes any ambiguity; the final task receives only concrete DataFrames
Copy code
from prefect.futures import resolve_futures_to_results

# ... same loop as above building dataframe_futures ...

dfs = resolve_futures_to_results(dataframe_futures)
ebay_extraction.upload_combine_excel.with_options(
    task_run_name="combine_excel"
).submit(dfs, store_id)
Important details - Don’t call the final task like a normal function:
upload_combine_excel(...)
. Use
.submit(...)
. A direct call executes it immediately in the flow process, which is why you’re seeing it “start early.” - Make
get_perfomance
a Prefect task and
.submit(...)
it so your dict is uniformly futures. Mixed plain objects + futures is okay, but having everything as futures plus wait_for makes dependencies unambiguous. - If you want a belt-and-suspenders check, you can also do: -
from prefect.futures import wait
-
wait(all_futs)
before submitting the final task. If it still appears to start early, can you share: - Whether you’re seeing it move to Scheduled or Running before upstream tasks complete - The
task_run_name
of the final task so we can inspect its dependencies: -
prefect task-run ls --task-run-name combine_excel --limit 5
-
prefect task-run inspect <TASK_RUN_ID>
to verify it lists the upstream task runs