Sam Joel
06/25/2024, 6:50 AMAlexander Azzam
06/25/2024, 9:49 AMAlexander Azzam
06/25/2024, 9:49 AMSam Joel
06/25/2024, 9:50 AMSam Joel
06/25/2024, 9:50 AMAlexander Azzam
06/25/2024, 9:50 AMSam Joel
06/25/2024, 9:50 AMAlexander Azzam
06/25/2024, 9:50 AMAlexander Azzam
06/25/2024, 9:51 AMAlexander Azzam
06/25/2024, 9:51 AMAlexander Azzam
06/25/2024, 9:52 AMSam Joel
06/25/2024, 9:55 AMAlexander Azzam
06/25/2024, 9:56 AMSam Joel
06/25/2024, 9:56 AMdef create_flow_run_by_deployment(
deployment_id: str,
name: str,
parameters: dict,
state: Optional[Dict] = DEFAULT_STATE,
work_queue_name: Optional[Dict] = DEFAULT_WORK_QUEUE_NAME,
empirical_policy: Optional[Dict] = DEFAULT_EMPIRICAL_POLICY,
):
payload = {
"name": name,
"parameters": parameters,
"state": state,
"work_queue_name": work_queue_name,
"empirical_policy": empirical_policy,
}
url = os.path.join(os.getenv("PREFECT_API_URL"), f"api/deployments/{deployment_id}/create_flow_run")
response = <http://requests.post|requests.post>(url, json=payload)
return response.json()
Sam Joel
06/25/2024, 9:57 AMAlexander Azzam
06/25/2024, 9:57 AMAlexander Azzam
06/25/2024, 9:57 AMAlexander Azzam
06/25/2024, 10:00 AMOscar Björhn
06/25/2024, 10:03 AMasync def _get_deployment_id(client: PrefectClient, flow_name: str, env: Env) -> UUID:
deployments = await client.read_deployments()
for deployment in deployments:
if deployment.name == f"{flow_name} ({env.value})":
return deployment.id
raise LookupError(f"No deployment found for {flow_name} ({env.value})!")
@task(task_run_name="Flow {flow_name}")
async def run_deployment(flow_name: str, env: Env, run_name: str, parameters: Dict[str, Any] | None = None, fire_and_forget: bool = False, blocking: bool = True) -> State | None:
"""Creates a flow run from a deployment and polls it until it reaches a terminal state.
If fire and forget is true, no polling is performed and the function exits early.
"""
logger = get_run_logger()
async with get_client() as client:
deployment_id = await _get_deployment_id(client=client, flow_name=flow_name, env=env)
flow_run = await client.create_flow_run_from_deployment(deployment_id=deployment_id, name=run_name, parameters=parameters)
flow_run_id = flow_run.id
<http://logger.info|logger.info>(f"Created flow run for flow {flow_name} with name: {flow_run.name} and id: {flow_run.id}")
retry_counter_prefect_api = 0
retry_counter_infra = 0
# If fire_and_forget is true, don't poll, just return.
while not fire_and_forget:
try:
flow_run = await client.read_flow_run(flow_run_id=flow_run_id)
except ReadTimeout:
if retry_counter_prefect_api >= 5:
raise
retry_counter_prefect_api = retry_counter_prefect_api + 1
logger.warning(f"Timed out while reading flow run {flow_name}, retrying in 5 mins. This is retry attempt {retry_counter_prefect_api} / 5")
await asyncio.sleep(60 * 5)
continue
if flow_run.state_type in TERMINAL_STATES:
if flow_run.state_type == StateType.COMPLETED:
<http://logger.info|logger.info>(f"Child flow run {flow_name} / {flow_run.name} completed successfully.")
elif flow_run.state_type == StateType.CRASHED:
if retry_counter_infra >= 0: # Was originally set to 3. Disabled by setting it to 0 on 2024-05-12. We can't use it with ACI.
logger.error(f"Child flow run {flow_name} / {flow_run.name} crashed due to infrastructure issues.")
if blocking:
return Failed()
return None
retry_counter_infra = retry_counter_infra + 1
logger.warning(
f"Child flow run {flow_name} / {flow_run.name} crashed due to infrastructure failure, retrying in 5 mins. This is retry attempt {retry_counter_infra} / 3"
)
# Wait a little bit, in case there's a temporary outage in Azure.
await asyncio.sleep(60 * 5)
# Create a new flow run, the previous one crashed.
flow_run = await client.create_flow_run_from_deployment(deployment_id=deployment_id, name=run_name, parameters=parameters)
flow_run_id = flow_run.id
<http://logger.info|logger.info>(f"The retried flow run for flow {flow_name} with name: {flow_run.name} has a new id: {flow_run.id}")
# Reset retry_counter_prefect_api since we've started a whole new flow run.
retry_counter_prefect_api = 0
continue
elif flow_run.state_type in (StateType.CANCELLED, StateType.FAILED):
logger.error(f"Child flow run {flow_name} / {flow_run.name} exited non-successfully.")
if blocking:
return Failed()
else:
logger.critical(f"Encountered an unknown terminal state in child flow run {flow_name}: {flow_run.state_type}")
if blocking:
return Failed()
return None
<http://logger.info|logger.info>(f"Waiting for child flow run {flow_name} / {flow_run.name} with state {flow_run.state_type}")
await asyncio.sleep(30)
return None
Oscar Björhn
06/25/2024, 10:04 AMAlexander Azzam
06/25/2024, 10:06 AMOscar Björhn
06/25/2024, 10:10 AMAlexander Azzam
06/25/2024, 10:14 AM