Ishan Anilbhai Koradiya
08/25/2025, 7:32 AMMarvin
08/25/2025, 7:32 AMMarvin
08/25/2025, 7:36 AMrun_deployment
or wait_for_flow_run
)?
- Or the worker polling your work pool for new flow runs?
Either way, you can slow the polling down:
If you’re waiting on another flow run from code
- Both run_deployment
and wait_for_flow_run
let you control the poll interval (how often they check the API for state changes) and the timeout.
- Set a larger poll_interval
and increase timeout
(or set it to None
to wait indefinitely).
Examples:
# Inside a flow, waiting for a deployment as a subflow
from prefect.deployments import run_deployment
flow_run = await run_deployment(
"my-project/my-deployment",
poll_interval=60, # check every 60s
timeout=None, # wait indefinitely
)
# Or, trigger then wait explicitly
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
flow_run = await run_deployment("my-project/my-deployment", as_subflow=False)
await wait_for_flow_run(
flow_run.id,
poll_interval=60, # check every 60s
timeout=None # wait indefinitely
)
If you mean worker polling for new work
- You can slow the worker’s polling loop by setting the environment variable PREFECT_WORKER_QUERY_SECONDS
(defaults to ~5s). For example:
export PREFECT_WORKER_QUERY_SECONDS=30
prefect worker start -p my-pool
Note: Increasing this will make workers pick up new runs more slowly. Health checks consider a worker unhealthy if it hasn’t polled in a while; that threshold scales with the polling interval.
Helpful docs:
- Worker health checks and related settings: Worker health checks
- Settings reference: Settings reference
If you can share a snippet of how you’re launching/waiting on runs (or confirm it’s the worker you want to slow down), I can suggest the exact knob to turn.Ishan Anilbhai Koradiya
08/25/2025, 8:10 AMMarvin
08/25/2025, 8:10 AMMarvin
08/25/2025, 8:13 AMpoll_interval_s
(seconds): how long to sleep between status checks of the Airbyte job
- timeout
(in minutes in the collection): how long to wait for the sync to finish before failing
If you’re calling the task-level function prefect_airbyte.connections.trigger_sync
, you can pass both directly:
from prefect import flow
from prefect_airbyte.connections import trigger_sync
@flow
def sync_many(connections: list[str]):
for cid in connections:
# Poll every 60s and allow up to 2 hours for the sync
trigger_sync(
connection_id=cid,
poll_interval_s=60,
timeout=120, # minutes
status_updates=True,
)
If you’re using the flow-level helper prefect_airbyte.flows.run_connection_sync
, construct the AirbyteConnection
with your desired settings (or call trigger_sync
directly so you can pass them explicitly). For example:
from prefect import flow
from prefect_airbyte.connections import AirbyteConnection
from prefect_airbyte.flows import run_connection_sync
@flow
def sync_many(connections: list[str]):
for cid in connections:
conn = AirbyteConnection(
connection_id=cid,
# these fields are commonly supported; if not, use trigger_sync above
poll_interval_s=60,
timeout=120, # minutes
)
run_connection_sync(conn)
A couple of tips:
- If your flow runs are “timing out,” it’s usually the collection’s sync timeout that’s expiring — increase timeout
accordingly.
- If you need even fewer API calls, raise poll_interval_s
(e.g., 60–120 seconds), at the cost of slower status updates.
Quick check: are you currently calling prefect_airbyte.flows.run_connection_sync
or prefect_airbyte.connections.trigger_sync
? If you share a small snippet, I can tailor the exact parameters to your usage.