Slackbot
08/20/2025, 11:18 PMMarvin
08/20/2025, 11:18 PMMarvin
08/20/2025, 11:22 PMrun_deployment(...)
returns a FlowRun object. To get the return value of the child flow, fetch the state and use prefect.states.get_state_result(state)
. There is no .result()
method on State
in Prefect 3.
- Don’t pass non‑serializable objects (like your processor instances) as deployment parameters. Deployments may run on remote workers and parameters must be JSON‑serializable. Recreate those objects inside the child flow or pass references (e.g., block names/IDs or simple configs).
- The name
for run_deployment
must be "FlowName/DeploymentName"
(or a deployment UUID), not just the flow name.
- If you don’t need separate flow runs on workers, calling the subflow directly and using asyncio.gather
is simpler and avoids serialization.
Docs for reference:
- Task runners and concurrency
- Deployment API reference
- The function you’re calling: prefect.deployments.run_deployment
(signature confirms it returns a FlowRun
)
- Result helper: prefect.states.get_state_result
Two working patterns you can choose from:
A) Run subflows as separate deployments (concurrent, remote) and collect results
- Wrap run_deployment
in a task so your parent flow can launch multiple child runs concurrently.
- Only pass JSON‑serializable parameters to the deployment.
from typing import List, Optional
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.states import get_state_result
from prefect.logging import get_run_logger
@task
def launch_county_run(county: str) -> dict:
# IMPORTANT: Use "FlowName/DeploymentName"
fr = run_deployment(
name="Scraper Orchestrator/prod",
parameters={"county": county}, # only pass JSON-serializable values
timeout=None, # wait until the child flow run finishes
poll_interval=5,
as_subflow=True
)
# fr.state is a State; use get_state_result to retrieve the return value
return get_state_result(fr.state, raise_on_failure=True)
@flow(name="Zillow Scraper")
def main(
days: Optional[int] = 1,
min_lot_size: Optional[int] = 4000,
max_lot_size: Optional[int] = 21780,
counties: Optional[List[str]] = None,
models: Optional[List[str]] = None,
email_recipients: Optional[str] = ""
):
logger = get_run_logger()
counties = counties or []
models = models or []
# Any setup work that doesn't need to be passed to the deployment should happen here
# e.g., dbt deps, etc.
<http://logger.info|logger.info>("Starting concurrent county processing...")
futures = [launch_county_run.submit(c) for c in counties]
results = [f.result() for f in futures]
<http://logger.info|logger.info>(f"Completed {len(results)} county runs")
return results
Notes:
- Recreate zillow_scraper_processor
, data_processor
, and dbt_processor
inside the “Scraper Orchestrator” flow (deployment code) using config or blocks — don’t pass the instances through parameters
.
- Fix your type hints: your scraper_orchestrator
returns a dict but is annotated -> None
.
- Avoid mutable defaults like []
in function signatures; use None
and coalesce.
B) Run subflows in‑process with asyncio (no deployment)
- If you don’t need workers or separate flow runs, just call your subflow directly and run them concurrently.
```
import asyncio
from typing import List, Optional
from prefect import flow
from prefect.logging import get_run_logger
@flow(name="Scraper Orchestrator")
async def scraper_orchestrator(county: str, days: int, ... ) -> dict:
# Build processors here as needed
# ...
return {"county": county, "ok": True}
@flow(name="Zillow Scraper")
async def main(
days: Optional[int] = 1,
min_lot_size: Optional[int] = 4000,Marvin
08/20/2025, 11:22 PMdeployment_execution.state.result(raise_on_failure=True)
— in Prefect 3, use get_state_result(deployment_execution.state, raise_on_failure=True)
instead.
- Passing zillow_scraper_processor
, data_processor
, and dbt_processor
into run_deployment
— these are almost certainly not JSON‑serializable.
- name="Scraper Orchestrator"
is probably not a valid deployment name; use "Scraper Orchestrator/<your-deployment-name>"
.
- Type hint mismatches and mutable default args can cause confusion.
If you can share the exact error message you’re seeing, I can pinpoint it precisely. Also, do you intend these child runs to execute on workers (deployments) or is in‑process concurrency sufficient for your use case?