Jan Malek
10/16/2023, 1:49 PMNate
10/16/2023, 1:59 PMJan Malek
10/16/2023, 2:00 PM@flow_if_api_reachable(name="RAW|QuoteDB|run_job", on_failure=[notify_failure_slack])
@sync_compatible
async def run_job(
config_name: Optional[str],
table_type: Optional[str] = None,
use_dask: bool = True,
db_username_secret_block_key: Optional[str] = None,
db_password_secret_block_key: Optional[str] = None,
)
Jan Malek
10/16/2023, 2:01 PM2.13.4
Nate
10/16/2023, 2:04 PMflow_if_api_reachable
? i dont see anything weird yetJan Malek
10/16/2023, 2:10 PMdef flow_if_api_reachable(*flow_args, **flow_kwargs):
"""Applies a @prefect.flow() decorator with specified args if and only if
the Prefect API specified by the envvars/config is reachable. Otherwise,
the underlying function is unmodified.
args/kwargs are passed to the flow decorator as-is, except for:
flow_run_name which is defaulted to a standardized generator (but is overrideable)
timeout_seconds which defaults to 24 hrs to avoid runaway tasks (also overrideable)
"""
fixed_kwargs = flow_kwargs.copy()
flowrun_name_gen = flow_kwargs.get("flow_run_name") or generate_flow_name
fixed_kwargs["flow_run_name"] = flowrun_name_gen
# Default timeout to 24hrs by default if not set - if None, works the same as base flow()
timeout_seconds = flow_kwargs.get("timeout_seconds", NotImplemented) # None is valid and shouldn't be erased
fixed_kwargs["timeout_seconds"] = (3600 * 24) if timeout_seconds is NotImplemented else timeout_seconds
flow_decorator = flow(*flow_args, **fixed_kwargs)
decowrapper = decorate_if(prefect_api_url_reachable, flow_decorator)
return decowrapper
Jan Malek
10/16/2023, 2:26 PMflow()
with a preflight check that pings the API and sees if it's available; if not, skips the decorationJan Malek
10/16/2023, 2:27 PMNate
10/16/2023, 2:50 PM