This message was deleted.
# ask-marvin
s
This message was deleted.
m
thought for 235.7 seconds
Thanks for sharing the snippet — a few things are likely causing the errors you’re seeing: Key points for Prefect 3.x -
run_deployment(...)
returns a FlowRun object. To get the return value of the child flow, fetch the state and use
prefect.states.get_state_result(state)
. There is no
.result()
method on
State
in Prefect 3. - Don’t pass non‑serializable objects (like your processor instances) as deployment parameters. Deployments may run on remote workers and parameters must be JSON‑serializable. Recreate those objects inside the child flow or pass references (e.g., block names/IDs or simple configs). - The
name
for
run_deployment
must be
"FlowName/DeploymentName"
(or a deployment UUID), not just the flow name. - If you don’t need separate flow runs on workers, calling the subflow directly and using
asyncio.gather
is simpler and avoids serialization. Docs for reference: - Task runners and concurrency - Deployment API reference - The function you’re calling:
prefect.deployments.run_deployment
(signature confirms it returns a
FlowRun
) - Result helper:
prefect.states.get_state_result
Two working patterns you can choose from: A) Run subflows as separate deployments (concurrent, remote) and collect results - Wrap
run_deployment
in a task so your parent flow can launch multiple child runs concurrently. - Only pass JSON‑serializable parameters to the deployment.
Copy code
from typing import List, Optional
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.states import get_state_result
from prefect.logging import get_run_logger

@task
def launch_county_run(county: str) -> dict:
    # IMPORTANT: Use "FlowName/DeploymentName"
    fr = run_deployment(
        name="Scraper Orchestrator/prod",
        parameters={"county": county},  # only pass JSON-serializable values
        timeout=None,                   # wait until the child flow run finishes
        poll_interval=5,
        as_subflow=True
    )
    # fr.state is a State; use get_state_result to retrieve the return value
    return get_state_result(fr.state, raise_on_failure=True)

@flow(name="Zillow Scraper")
def main(
    days: Optional[int] = 1,
    min_lot_size: Optional[int] = 4000,
    max_lot_size: Optional[int] = 21780,
    counties: Optional[List[str]] = None,
    models: Optional[List[str]] = None,
    email_recipients: Optional[str] = ""
):
    logger = get_run_logger()
    counties = counties or []
    models = models or []

    # Any setup work that doesn't need to be passed to the deployment should happen here
    # e.g., dbt deps, etc.

    <http://logger.info|logger.info>("Starting concurrent county processing...")
    futures = [launch_county_run.submit(c) for c in counties]
    results = [f.result() for f in futures]
    <http://logger.info|logger.info>(f"Completed {len(results)} county runs")
    return results
Notes: - Recreate
zillow_scraper_processor
,
data_processor
, and
dbt_processor
inside the “Scraper Orchestrator” flow (deployment code) using config or blocks — don’t pass the instances through
parameters
. - Fix your type hints: your
scraper_orchestrator
returns a dict but is annotated
-> None
. - Avoid mutable defaults like
[]
in function signatures; use
None
and coalesce. B) Run subflows in‑process with asyncio (no deployment) - If you don’t need workers or separate flow runs, just call your subflow directly and run them concurrently. ``` import asyncio from typing import List, Optional from prefect import flow from prefect.logging import get_run_logger @flow(name="Scraper Orchestrator") async def scraper_orchestrator(county: str, days: int, ... ) -> dict: # Build processors here as needed # ... return {"county": county, "ok": True} @flow(name="Zillow Scraper") async def main( days: Optional[int] = 1, min_lot_size: Optional[int] = 4000,
max_lot_size: Optional[int] = 21780, counties: Optional[List[str]] = None, models: Optional[List[str]] = None, email_recipients: Optional[str] = "" ): logger = get_run_logger() counties = counties or [] models = models or [] logger.info("Starting concurrent county processing...") coros = [scraper_orchestrator(county, days, min_lot_size, max_lot_size) for county in counties] results = await asyncio.gather(*coros) logger.info(f"Completed {len(results)} county runs") return results ``` What’s likely breaking in your current code -
deployment_execution.state.result(raise_on_failure=True)
— in Prefect 3, use
get_state_result(deployment_execution.state, raise_on_failure=True)
instead. - Passing
zillow_scraper_processor
,
data_processor
, and
dbt_processor
into
run_deployment
— these are almost certainly not JSON‑serializable. -
name="Scraper Orchestrator"
is probably not a valid deployment name; use
"Scraper Orchestrator/<your-deployment-name>"
. - Type hint mismatches and mutable default args can cause confusion. If you can share the exact error message you’re seeing, I can pinpoint it precisely. Also, do you intend these child runs to execute on workers (deployments) or is in‑process concurrency sufficient for your use case?