Madushan Pathirana
08/31/2025, 12:53 PMMarvin
08/31/2025, 12:53 PMMarvin
08/31/2025, 12:57 PM.result()
will raise if the task failed. Only call your subflow after you’ve resolved the parent tasks successfully.
Example (sequential or parallel tasks):
from prefect import flow, task, get_run_logger
@task
def step_a():
return "A"
@task
def step_b(x):
return f"{x}-B"
@flow
def child_subflow(payload):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Running child with: {payload}")
@flow
def parent_flow():
logger = get_run_logger()
# You can submit tasks in parallel or call them sequentially
a_fut = step_a() # returns a PrefectFuture
b_fut = step_b(a_fut) # depends on a_fut
# Resolve results BEFORE calling the subflow; will raise if any failed
try:
a = a_fut.result() # raises on failure
b = b_fut.result() # raises on failure
except Exception as exc:
logger.warning(f"Skipping child_subflow because a parent task failed: {exc}")
return
# Only reached if parent tasks succeeded
child_subflow({"a": a, "b": b})
If you’re submitting many tasks in parallel:
futs = [task_i.submit(i) for i in range(5)]
try:
results = [f.result() for f in futs] # any failure raises; subflow won’t run
except Exception:
return
child_subflow(results)
Alternative (deployment-to-deployment): If your “subflow” is its own deployment, you can trigger it only when the parent flow run completes successfully using an Automation that listens for the prefect.flow-run.Completed
event and runs the child deployment. This moves the gating out of code and into orchestration.
References:
- Prefect 3 GA
- Prefect source tree (see prefect.futures
and prefect.states
)
Quick check: Do you want to gate on “all parent tasks succeeded” or on a specific subset of tasks? If it’s a subset, you can just resolve those specific futures in the try/except before calling the subflow.Madushan Pathirana
08/31/2025, 1:26 PMMarvin
08/31/2025, 1:26 PMMarvin
08/31/2025, 1:27 PMfrom prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
@task(retries=3, retry_delay_seconds=30)
def extract(source_id: str) -> dict:
# ... fetch data for this source ...
return {"source": source_id, "rows": 123}
@task(retries=3, retry_delay_seconds=30)
def load(payload: dict) -> None:
# ... load to destination (DW, S3, etc.) ...
return None
@flow
def transform_subflow():
logger = get_run_logger()
<http://logger.info|logger.info>("Running transformations...")
# ... your transformations ...
@flow(task_runner=ThreadPoolTaskRunner(max_workers=6))
def el_then_transform(sources: list[str]):
logger = get_run_logger()
# 1) Extract in parallel
extract_futures = [extract.with_options(name=f"extract-{s}")(s) for s in sources]
# 2) Load in parallel (each load depends on its extract)
load_futures = [load.with_options(name=f"load-{s}")(f) for s, f in zip(sources, extract_futures)]
# 3) Gate: only proceed if ALL loads succeeded
try:
for f in load_futures:
f.result() # raises if that load (or its upstream extract) failed
except Exception as exc:
logger.error(f"At least one EL step failed; skipping transform. Reason: {exc}")
return
# 4) All good — run transformations
transform_subflow()
if __name__ == "__main__":
el_then_transform(["src_a", "src_b", "src_c"])
Notes:
- .result()
raises on failure, so any failing extract/load prevents the transform from running.
- Tune max_workers
to your EC2’s RAM/CPU and whether the EL is I/O- or CPU-bound (I/O-bound -> threads are good; CPU-bound -> consider ProcessPoolTaskRunner
).
- Add retries/timeouts per task as needed.
Option B: Split into two deployments and use an Automation
- Make your EL flow a deployment and your Transform flow a separate deployment.
- Create a Prefect Automation that listens for the event “Parent EL flow run Completed” and triggers the Transform deployment. If any source task fails, the EL flow run will not be “Completed” (it will be “Failed”), so the transform won’t run.
- This decouples concerns, lets you retry EL independently, and can simplify schedules.
In short:
- Use Option A if you want everything in one flow and a subflow gate is enough.
- Use Option B if you want clean separation and orchestration guarantees at the deployment level.
References:
- Prefect 3 GA overview: Prefect 3 GA
- Source modules mentioned above (futures, states): Prefect source tree
Want me to tailor max_workers
and retries to your EC2 sizing and number of sources? How many sources and what’s the rough data volume per source?Madushan Pathirana
08/31/2025, 1:37 PMMadushan Pathirana
08/31/2025, 1:37 PMMarvin
08/31/2025, 1:37 PMMarvin
08/31/2025, 1:39 PMrun_deployment(..., as_subflow=True)
. This keeps the UI hierarchy when called from EL, but also lets you run Transform on its own at any time via API/CLI/UI.
- Optionally add a Prefect Automation: when EL flow runs complete successfully, automatically run the Transform deployment. This gives you both code-level gating and event-driven orchestration.
Pattern A: Parent flow triggers Transform deployment as a subflow (recommended for your use case)
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.deployments import run_deployment
@task(retries=3, retry_delay_seconds=30)
def extract(source_id: str) -> dict:
# ... extract ...
return {"source": source_id}
@task(retries=3, retry_delay_seconds=30)
def load(payload: dict) -> None:
# ... load ...
return None
@flow(task_runner=ThreadPoolTaskRunner(max_workers=6))
def el_parent(sources: list[str], transform_params: dict | None = None):
logger = get_run_logger()
# Extract in parallel
extract_futs = [extract.with_options(name=f"extract-{s}")(s) for s in sources]
# Load in parallel (depends on extract)
load_futs = [load.with_options(name=f"load-{s}")(f) for s, f in zip(sources, extract_futs)]
# Gate: only proceed if ALL loads (and their upstream extracts) succeeded
try:
for f in load_futs:
f.result() # raises if any failed
except Exception as exc:
logger.error(f"EL failed; skipping Transform. Reason: {exc}")
return
# Trigger the Transform deployment as a subflow so it appears nested in the UI
run_deployment(
name="transform-flow/transform-deployment",
parameters=transform_params or {},
as_subflow=True, # keeps hierarchy when called from EL
)
Run Transform independently whenever you want
- Python API:
from prefect.deployments import run_deployment
run_deployment(
name="transform-flow/transform-deployment",
parameters={"run_date": "2025-08-31"} # whatever your params are
)
- CLI:
prefect deployment run transform-flow/transform-deployment -p run_date=2025-08-31 --watch
- UI: Click “Run” on the Transform deployment.
Pattern B: Use an Automation for event-driven chaining
- Create an Automation that listens for your EL flow’s “Flow run Completed” event and runs the Transform deployment. If any EL task fails, the flow run state won’t be Completed, so Transform won’t trigger.
- This keeps the gate out of code and is great when you also want to run Transform independently.
Operational tips for your single-EC2 setup
- Separate work pools/queues for EL and Transform (e.g., el-pool
, transform-pool
). This keeps control over concurrency and resource usage even on one host.
- Parameterize Transform (e.g., date range, source list, run_id) so on-demand API runs are simple and reproducible.
- Use retries/timeouts on EL tasks so transient issues don’t block Transform unnecessarily.
References:
- Prefect 3 GA overview: https://www.prefect.io/blog/prefect-3-generally-available-september-3
- Prefect source (for run_deployment
, task futures): https://github.com/PrefectHQ/prefect/tree/main/src/prefect
Would you like me to sketch the deployment definitions as well (either via prefect deploy
or programmatically), and suggest work-pool settings for your EC2 (8GB RAM) and number of sources?Madushan Pathirana
08/31/2025, 3:16 PMMarvin
08/31/2025, 3:16 PMMarvin
08/31/2025, 3:17 PMwith_options
(plural), and you can call .submit(...)
on the result.
Examples:
# Apply per-call options, then submit
e = extract.with_options(name=f"extract-{src}", retries=3).submit(src)
# Pass the upstream future as an argument (creates dependency automatically)
l = load.with_options(name=f"load-{src}", retries=3).submit(e)
# If load doesn't need the extract result value, just the dependency:
l = load.with_options(name=f"load-{src}").submit(wait_for=[e])
Notes:
- with_options(...)
returns a new Task object with those options applied for that call.
- You can also skip submit
and just call the task: extract.with_options(...)(src)
. Both return a `PrefectFuture`; submit
gives you the wait_for=
and return_state=
knobs.
- Passing a future as an argument creates the dependency; use wait_for=[...]
when the downstream task doesn’t need the upstream’s result value.
Refs:
- Task options and futures live in the source here: Prefect source tree (see prefect.tasks
, prefect.futures
).Madushan Pathirana
08/31/2025, 3:26 PMtransactions_api = {"url" : "<https://apiz.ebay.com/sell/finances/v1/transaction>?",
"upload_key" : 'transactions',
"api_limit" : 200}
payout_api = {"url" : "<https://apiz.ebay.com/sell/finances/v1/payout>?",
"upload_key" : 'payouts',
"api_limit" : 50}
api_dict_list = [transactions_api,payout_api]
for api in api_dict_list:
upload_key = transactions_api.get('upload_key')
fetch_paginated_results.with_options(
name=f"fetch_paginated_results_{upload_key}"
).submit(api,refresh_token,seller_id,start_date_formatted,today_formatted)
Marvin
08/31/2025, 3:26 PMMarvin
08/31/2025, 3:26 PMupload_key = transactions_api.get('upload_key')
That always reads from transactions_api
, so both iterations use "transactions"
. Use the current loop variable api
instead:
transactions_api = {"url": "<https://apiz.ebay.com/sell/finances/v1/transaction?>",
"upload_key": "transactions",
"api_limit": 200}
payout_api = {"url": "<https://apiz.ebay.com/sell/finances/v1/payout?>",
"upload_key": "payouts",
"api_limit": 50}
api_dict_list = [transactions_api, payout_api]
for api in api_dict_list:
upload_key = api.get("upload_key")
fetch_paginated_results.with_options(
name=f"fetch_paginated_results_{upload_key}"
).submit(api, refresh_token, seller_id, start_date_formatted, today_formatted)
Tip: Prefer dynamic task run names and a stable task name
- Keep the task’s definition name stable and vary the run names for clarity in the UI:
for api in api_dict_list:
upload_key = api["upload_key"]
fetch_paginated_results.with_options(
name="fetch_paginated_results",
task_run_name=f"fetch_paginated_results_{upload_key}"
).submit(api, refresh_token, seller_id, start_date_formatted, today_formatted)
References (Task options / futures in source):
- Prefect source tree (see prefect.tasks
, prefect.futures
)Madushan Pathirana
08/31/2025, 3:30 PMMarvin
08/31/2025, 3:30 PMMarvin
08/31/2025, 3:33 PMrate_limit(...)
context manager
- Add task retries with exponential backoff that only retry on 429/5xx
Example extending your code
from typing import Optional, Dict, Any, Iterable
import httpx
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.concurrency.sync import rate_limit, concurrency
from prefect.tasks import exponential_backoff
# Configure tasks to retry only on throttling/transient errors
def retry_on_throttle(exc: BaseException) -> bool:
try:
from httpx import HTTPStatusError
if isinstance(exc, HTTPStatusError):
code = exc.response.status_code
return code == 429 or 500 <= code < 600
except Exception:
pass
return False
@task(
retries=5,
retry_delay_seconds=exponential_backoff(2), # 2, 4, 8, 16, 32s...
retry_condition_fn=retry_on_throttle,
log_prints=True,
)
def fetch_paginated_results(
api: Dict[str, Any],
refresh_token: str,
seller_id: str,
start_date: str,
end_date: str,
):
logger = get_run_logger()
upload_key = api["upload_key"] # e.g., "transactions" or "payouts"
base_url = api["url"]
page_size = api["api_limit"]
# Name your rate limits in Prefect to match these strings:
# - a global vendor limit and a per-endpoint limit
global_limit = "ebay-global-rpm"
endpoint_limit = f"ebay-{upload_key}-rpm"
# Optional: prevent overlap per seller if needed
# with concurrency(f"seller-{seller_id}", occupy=1):
offset = 0
with httpx.Client(timeout=30.0) as client:
while True:
params = {"limit": page_size, "offset": offset, "filter": f"transactionDate:[{start_date}..{end_date}]"}
headers = {"Authorization": f"Bearer {refresh_token}"}
# Acquire rate-limit tokens before the request
with rate_limit([global_limit, endpoint_limit], strict=True):
resp = client.get(base_url, params=params, headers=headers)
try:
resp.raise_for_status()
except httpx.HTTPStatusError as e:
# This exception triggers task retries if retry_on_throttle returns True
raise
data = resp.json()
items = data.get(upload_key, [])
<http://logger.info|logger.info>(f"{upload_key}: fetched {len(items)} at offset {offset}")
# TODO: persist items, then decide if more pages exist
has_next = bool(items) and len(items) == page_size
if not has_next:
break
offset += page_size
@flow(task_runner=ThreadPoolTaskRunner(max_workers=6))
def el_parent(sources: Iterable[Dict[str, Any]], refresh_token: str, seller_id: str, start_date: str, end_date: str):
futs = []
for api in sources:
upload_key = api["upload_key"]
futs.append(
fetch_paginated_results.with_options(
name="fetch_paginated_results",
task_run_name=f"fetch_paginated_results_{upload_key}",
).submit(api, refresh_token, seller_id, start_date, end_date)
)
# Gate: only run transform if all EL tasks succeed
for f in futs:
f.result()
# run transform here (subflow or deployment)
How to define the limits
- Create the named limits before running the flow (no Redis needed):
- Example names:
- ebay-global-rpm: the vendor-wide requests-per-minute you want
- ebay-transactions-rpm: the per-endpoint RPM for the transactions API
- ebay-payouts-rpm: the per-endpoint RPM for the payouts API
- You can create/manage these in the UI under Concurrency & Rate Limits, or via REST:Marvin
08/31/2025, 3:33 PMrate_limit([global_limit, endpoint_limit])
to enforce both a global and a per-endpoint limit simultaneously.
- Keep your ThreadPoolTaskRunner high enough to parallelize, but the rate-limit guard will queue the actual requests to stay under the quota.
- Add concurrency(f"seller-{seller_id}")
if your vendor enforces per-seller overlap restrictions.
- The retry_condition_fn
makes retries selective, so you won’t retry on permanent client errors (e.g., 400/401).
To tune this precisely, what are your actual eBay quotas (requests per minute) globally and per endpoint? If you share them, I’ll map them to concrete limit names/values.