<@ULVA73B9P> I have a sub flow which need to execu...
# ask-marvin
m
@Marvin I have a sub flow which need to execute only if taaks in parent flow succeed, how to achive this
m
thought for 268.5 seconds
You can gate a subflow with normal Python control flow. In Prefect 3.x, tasks return `PrefectFuture`s inside a flow; calling
.result()
will raise if the task failed. Only call your subflow after you’ve resolved the parent tasks successfully. Example (sequential or parallel tasks):
Copy code
from prefect import flow, task, get_run_logger

@task
def step_a():
    return "A"

@task
def step_b(x):
    return f"{x}-B"

@flow
def child_subflow(payload):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Running child with: {payload}")

@flow
def parent_flow():
    logger = get_run_logger()

    # You can submit tasks in parallel or call them sequentially
    a_fut = step_a()           # returns a PrefectFuture
    b_fut = step_b(a_fut)      # depends on a_fut

    # Resolve results BEFORE calling the subflow; will raise if any failed
    try:
        a = a_fut.result()     # raises on failure
        b = b_fut.result()     # raises on failure
    except Exception as exc:
        logger.warning(f"Skipping child_subflow because a parent task failed: {exc}")
        return

    # Only reached if parent tasks succeeded
    child_subflow({"a": a, "b": b})
If you’re submitting many tasks in parallel:
Copy code
futs = [task_i.submit(i) for i in range(5)]
try:
    results = [f.result() for f in futs]   # any failure raises; subflow won’t run
except Exception:
    return
child_subflow(results)
Alternative (deployment-to-deployment): If your “subflow” is its own deployment, you can trigger it only when the parent flow run completes successfully using an Automation that listens for the
prefect.flow-run.Completed
event and runs the child deployment. This moves the gating out of code and into orchestration. References: - Prefect 3 GA - Prefect source tree (see
prefect.futures
and
prefect.states
) Quick check: Do you want to gate on “all parent tasks succeeded” or on a specific subset of tasks? If it’s a subset, you can just resolve those specific futures in the try/except before calling the subflow.
m
@Marvin i want all parent task to succeed. Basically parent flow is data extraction and load step. For each source there is a task. If all extractions and load succeed then execute transformation sub flow. If you have better architecture suggest it or how to achive this.
m
thought for 87.9 seconds
Great — that’s a classic pattern. Here are two good ways to do it in Prefect 3.x. Option A: Gate the subflow in code (simple and explicit) - Run all extract/load tasks. - If and only if all succeed, call the transformation subflow. - Use a task runner to parallelize across sources. Example:
Copy code
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner

@task(retries=3, retry_delay_seconds=30)
def extract(source_id: str) -> dict:
    # ... fetch data for this source ...
    return {"source": source_id, "rows": 123}

@task(retries=3, retry_delay_seconds=30)
def load(payload: dict) -> None:
    # ... load to destination (DW, S3, etc.) ...
    return None

@flow
def transform_subflow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running transformations...")
    # ... your transformations ...

@flow(task_runner=ThreadPoolTaskRunner(max_workers=6))
def el_then_transform(sources: list[str]):
    logger = get_run_logger()

    # 1) Extract in parallel
    extract_futures = [extract.with_options(name=f"extract-{s}")(s) for s in sources]

    # 2) Load in parallel (each load depends on its extract)
    load_futures = [load.with_options(name=f"load-{s}")(f) for s, f in zip(sources, extract_futures)]

    # 3) Gate: only proceed if ALL loads succeeded
    try:
        for f in load_futures:
            f.result()  # raises if that load (or its upstream extract) failed
    except Exception as exc:
        logger.error(f"At least one EL step failed; skipping transform. Reason: {exc}")
        return

    # 4) All good — run transformations
    transform_subflow()

if __name__ == "__main__":
    el_then_transform(["src_a", "src_b", "src_c"])
Notes: -
.result()
raises on failure, so any failing extract/load prevents the transform from running. - Tune
max_workers
to your EC2’s RAM/CPU and whether the EL is I/O- or CPU-bound (I/O-bound -> threads are good; CPU-bound -> consider
ProcessPoolTaskRunner
). - Add retries/timeouts per task as needed. Option B: Split into two deployments and use an Automation - Make your EL flow a deployment and your Transform flow a separate deployment. - Create a Prefect Automation that listens for the event “Parent EL flow run Completed” and triggers the Transform deployment. If any source task fails, the EL flow run will not be “Completed” (it will be “Failed”), so the transform won’t run. - This decouples concerns, lets you retry EL independently, and can simplify schedules. In short: - Use Option A if you want everything in one flow and a subflow gate is enough. - Use Option B if you want clean separation and orchestration guarantees at the deployment level. References: - Prefect 3 GA overview: Prefect 3 GA - Source modules mentioned above (futures, states): Prefect source tree Want me to tailor
max_workers
and retries to your EC2 sizing and number of sources? How many sources and what’s the rough data volume per source?
m
@Marvin some times i want to run sub flows separately via api. If it is the case what is the best architecture
@Marvin some times i want to run sub flows separately via api. If it is the case what is the best architecture
m
thought for 87.7 seconds
If you need to run the “subflow” (your transformations) independently via API sometimes, the best architecture is: - Make EL and Transform separate flows, each with its own deployment. - In the parent EL flow, after gating on all EL tasks’ success, trigger the Transform deployment with
run_deployment(..., as_subflow=True)
. This keeps the UI hierarchy when called from EL, but also lets you run Transform on its own at any time via API/CLI/UI. - Optionally add a Prefect Automation: when EL flow runs complete successfully, automatically run the Transform deployment. This gives you both code-level gating and event-driven orchestration. Pattern A: Parent flow triggers Transform deployment as a subflow (recommended for your use case)
Copy code
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.deployments import run_deployment

@task(retries=3, retry_delay_seconds=30)
def extract(source_id: str) -> dict:
    # ... extract ...
    return {"source": source_id}

@task(retries=3, retry_delay_seconds=30)
def load(payload: dict) -> None:
    # ... load ...
    return None

@flow(task_runner=ThreadPoolTaskRunner(max_workers=6))
def el_parent(sources: list[str], transform_params: dict | None = None):
    logger = get_run_logger()

    # Extract in parallel
    extract_futs = [extract.with_options(name=f"extract-{s}")(s) for s in sources]
    # Load in parallel (depends on extract)
    load_futs = [load.with_options(name=f"load-{s}")(f) for s, f in zip(sources, extract_futs)]

    # Gate: only proceed if ALL loads (and their upstream extracts) succeeded
    try:
        for f in load_futs:
            f.result()  # raises if any failed
    except Exception as exc:
        logger.error(f"EL failed; skipping Transform. Reason: {exc}")
        return

    # Trigger the Transform deployment as a subflow so it appears nested in the UI
    run_deployment(
        name="transform-flow/transform-deployment",
        parameters=transform_params or {},
        as_subflow=True,  # keeps hierarchy when called from EL
    )
Run Transform independently whenever you want - Python API:
Copy code
from prefect.deployments import run_deployment

run_deployment(
    name="transform-flow/transform-deployment",
    parameters={"run_date": "2025-08-31"}  # whatever your params are
)
- CLI:
Copy code
prefect deployment run transform-flow/transform-deployment -p run_date=2025-08-31 --watch
- UI: Click “Run” on the Transform deployment. Pattern B: Use an Automation for event-driven chaining - Create an Automation that listens for your EL flow’s “Flow run Completed” event and runs the Transform deployment. If any EL task fails, the flow run state won’t be Completed, so Transform won’t trigger. - This keeps the gate out of code and is great when you also want to run Transform independently. Operational tips for your single-EC2 setup - Separate work pools/queues for EL and Transform (e.g.,
el-pool
,
transform-pool
). This keeps control over concurrency and resource usage even on one host. - Parameterize Transform (e.g., date range, source list, run_id) so on-demand API runs are simple and reproducible. - Use retries/timeouts on EL tasks so transient issues don’t block Transform unnecessarily. References: - Prefect 3 GA overview: https://www.prefect.io/blog/prefect-3-generally-available-september-3 - Prefect source (for
run_deployment
, task futures): https://github.com/PrefectHQ/prefect/tree/main/src/prefect Would you like me to sketch the deployment definitions as well (either via
prefect deploy
or programmatically), and suggest work-pool settings for your EC2 (8GB RAM) and number of sources?
m
@Marvin can i use with_option with submit?
m
thought for 49.8 seconds
Yes — you can chain them. It’s
with_options
(plural), and you can call
.submit(...)
on the result. Examples:
Copy code
# Apply per-call options, then submit
e = extract.with_options(name=f"extract-{src}", retries=3).submit(src)

# Pass the upstream future as an argument (creates dependency automatically)
l = load.with_options(name=f"load-{src}", retries=3).submit(e)

# If load doesn't need the extract result value, just the dependency:
l = load.with_options(name=f"load-{src}").submit(wait_for=[e])
Notes: -
with_options(...)
returns a new Task object with those options applied for that call. - You can also skip
submit
and just call the task:
extract.with_options(...)(src)
. Both return a `PrefectFuture`;
submit
gives you the
wait_for=
and
return_state=
knobs. - Passing a future as an argument creates the dependency; use
wait_for=[...]
when the downstream task doesn’t need the upstream’s result value. Refs: - Task options and futures live in the source here: Prefect source tree (see
prefect.tasks
,
prefect.futures
).
m
@Marvin I tried this as you suggested but both tasks got fetch_paginated_results_transactions as the task name
Copy code
transactions_api =    {"url" : "<https://apiz.ebay.com/sell/finances/v1/transaction>?",
                          "upload_key" : 'transactions',
                          "api_limit" : 200}

    payout_api =    {"url" : "<https://apiz.ebay.com/sell/finances/v1/payout>?",
                    "upload_key" : 'payouts',
                    "api_limit" : 50}

    api_dict_list = [transactions_api,payout_api]

    for api in api_dict_list:   
        upload_key = transactions_api.get('upload_key')
        
        fetch_paginated_results.with_options(
            name=f"fetch_paginated_results_{upload_key}"
            ).submit(api,refresh_token,seller_id,start_date_formatted,today_formatted)
m
thought for 39.2 seconds
You’re super close — the issue is just this line inside your loop:
Copy code
upload_key = transactions_api.get('upload_key')
That always reads from
transactions_api
, so both iterations use
"transactions"
. Use the current loop variable
api
instead:
Copy code
transactions_api = {"url": "<https://apiz.ebay.com/sell/finances/v1/transaction?>",
                    "upload_key": "transactions",
                    "api_limit": 200}

payout_api = {"url": "<https://apiz.ebay.com/sell/finances/v1/payout?>",
              "upload_key": "payouts",
              "api_limit": 50}

api_dict_list = [transactions_api, payout_api]

for api in api_dict_list:
    upload_key = api.get("upload_key")
    fetch_paginated_results.with_options(
        name=f"fetch_paginated_results_{upload_key}"
    ).submit(api, refresh_token, seller_id, start_date_formatted, today_formatted)
Tip: Prefer dynamic task run names and a stable task name - Keep the task’s definition name stable and vary the run names for clarity in the UI:
Copy code
for api in api_dict_list:
    upload_key = api["upload_key"]
    fetch_paginated_results.with_options(
        name="fetch_paginated_results",
        task_run_name=f"fetch_paginated_results_{upload_key}"
    ).submit(api, refresh_token, seller_id, start_date_formatted, today_formatted)
References (Task options / futures in source): - Prefect source tree (see
prefect.tasks
,
prefect.futures
)
m
@Marvin can you extend this example to handle api rate limits using prefect
m
thought for 163.4 seconds
Yes — Prefect 3.x gives you first-class rate limits and concurrency controls. The usual pattern is: - Create named rate limits in Prefect (UI or API) - Wrap each outbound API call in a
rate_limit(...)
context manager - Add task retries with exponential backoff that only retry on 429/5xx Example extending your code
Copy code
from typing import Optional, Dict, Any, Iterable
import httpx
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.concurrency.sync import rate_limit, concurrency
from prefect.tasks import exponential_backoff

# Configure tasks to retry only on throttling/transient errors
def retry_on_throttle(exc: BaseException) -> bool:
    try:
        from httpx import HTTPStatusError
        if isinstance(exc, HTTPStatusError):
            code = exc.response.status_code
            return code == 429 or 500 <= code < 600
    except Exception:
        pass
    return False

@task(
    retries=5,
    retry_delay_seconds=exponential_backoff(2),  # 2, 4, 8, 16, 32s...
    retry_condition_fn=retry_on_throttle,
    log_prints=True,
)
def fetch_paginated_results(
    api: Dict[str, Any],
    refresh_token: str,
    seller_id: str,
    start_date: str,
    end_date: str,
):
    logger = get_run_logger()
    upload_key = api["upload_key"]  # e.g., "transactions" or "payouts"
    base_url = api["url"]
    page_size = api["api_limit"]

    # Name your rate limits in Prefect to match these strings:
    # - a global vendor limit and a per-endpoint limit
    global_limit = "ebay-global-rpm"
    endpoint_limit = f"ebay-{upload_key}-rpm"

    # Optional: prevent overlap per seller if needed
    # with concurrency(f"seller-{seller_id}", occupy=1):
    offset = 0
    with httpx.Client(timeout=30.0) as client:
        while True:
            params = {"limit": page_size, "offset": offset, "filter": f"transactionDate:[{start_date}..{end_date}]"}
            headers = {"Authorization": f"Bearer {refresh_token}"}

            # Acquire rate-limit tokens before the request
            with rate_limit([global_limit, endpoint_limit], strict=True):
                resp = client.get(base_url, params=params, headers=headers)
                try:
                    resp.raise_for_status()
                except httpx.HTTPStatusError as e:
                    # This exception triggers task retries if retry_on_throttle returns True
                    raise

            data = resp.json()
            items = data.get(upload_key, [])
            <http://logger.info|logger.info>(f"{upload_key}: fetched {len(items)} at offset {offset}")
            # TODO: persist items, then decide if more pages exist
            has_next = bool(items) and len(items) == page_size
            if not has_next:
                break
            offset += page_size

@flow(task_runner=ThreadPoolTaskRunner(max_workers=6))
def el_parent(sources: Iterable[Dict[str, Any]], refresh_token: str, seller_id: str, start_date: str, end_date: str):
    futs = []
    for api in sources:
        upload_key = api["upload_key"]
        futs.append(
            fetch_paginated_results.with_options(
                name="fetch_paginated_results",
                task_run_name=f"fetch_paginated_results_{upload_key}",
            ).submit(api, refresh_token, seller_id, start_date, end_date)
        )

    # Gate: only run transform if all EL tasks succeed
    for f in futs:
        f.result()
    # run transform here (subflow or deployment)
How to define the limits - Create the named limits before running the flow (no Redis needed): - Example names: - ebay-global-rpm: the vendor-wide requests-per-minute you want - ebay-transactions-rpm: the per-endpoint RPM for the transactions API - ebay-payouts-rpm: the per-endpoint RPM for the payouts API - You can create/manage these in the UI under Concurrency & Rate Limits, or via REST:
- Global concurrency and rate limits guide - Rate limits concepts - REST API: create concurrency/rate limit Notes - Use multiple names in
rate_limit([global_limit, endpoint_limit])
to enforce both a global and a per-endpoint limit simultaneously. - Keep your ThreadPoolTaskRunner high enough to parallelize, but the rate-limit guard will queue the actual requests to stay under the quota. - Add
concurrency(f"seller-{seller_id}")
if your vendor enforces per-seller overlap restrictions. - The
retry_condition_fn
makes retries selective, so you won’t retry on permanent client errors (e.g., 400/401). To tune this precisely, what are your actual eBay quotas (requests per minute) globally and per endpoint? If you share them, I’ll map them to concrete limit names/values.