Matt Melgard
08/30/2022, 9:18 PMNate
08/30/2022, 9:26 PMorchestrator
and worker
) and create the worker deployment with an -ib kubernetes-job/your-k8s-job-block
as needed
then in the orchestrator
you could use the client to `create_flow_run_from_deployment` , where you'd pass the deployment_id
of the worker and the params
it needs to run with something like:
from prefect.utilities.asyncutils import sync_compatible
@task
@sync_compatible
async def submit_worker_flows(deployment_id: str, params: Dict) -> FlowRun:
"""Async task to create a flow run from the `worker` deployment
Args:
deployment_id (str): Prefect Deployment ID of `modeling_worker` flow deployment
params (Dict): Everything an instance of `worker` needs to run
"""
async with get_client() as client:
await client.create_flow_run_from_deployment(
deployment_id=deployment_id, parameters=params
)
that way, the worker
flows can run on their own infra and you could poll for state from the orchestrator
as neededMatt Melgard
08/30/2022, 9:59 PMNate
08/30/2022, 11:00 PM