Nicolas Ouporov
04/01/2024, 10:35 PMMarvin
04/01/2024, 10:35 PMNate
04/01/2024, 10:39 PMNicolas Ouporov
04/01/2024, 10:45 PMNate
04/01/2024, 10:52 PMtasks.py
from prefect import task
from prefect.task_server import serve
@task
def some_work(payload: dict):
pass
serve(some_work)
api.py
from tasks import some_work
@app.post("/")
def submit_work(r: Request):
task_run = some_work.submit(r.json()) # task_run is the API representation of a task run with an ID
hand_to_another_process_for_manual_polling(task_run)
Nicolas Ouporov
04/01/2024, 10:55 PMhand_to_another_process_for_manual_polling
function. Inside, I need some way of using the task_run id to set the state as completed or failed.Nate
04/01/2024, 10:58 PM@sync_compatible
async def wait_for_submitted_runs(
task_run_filter: Optional[TaskRunFilter] = None,
timeout: Optional[float] = None,
poll_interval: float = 3.0,
):
"""
Wait for completion of any provided task runs.
Args:
task_run_filter: A filter to apply to the task runs to wait for.
timeout: How long to wait for completion of all runs (seconds).
poll_interval: How long to wait between polling each run's state (seconds).
"""
async def wait_for_final_state(run_id: uuid.UUID):
while True:
run = await client.read_task_run(run_id)
if run.state and run.state.is_final():
# update some KV of run states HERE
return run_id
await anyio.sleep(poll_interval)
async with get_client() as client:
with anyio.move_on_after(timeout):
task_runs_to_wait_for = await client.read_task_runs(task_run_filter=task_run_filter)
await asyncio.gather(
*(wait_for_final_state(run.id) for run in task_runs_to_wait_for)
)
Nate
04/01/2024, 10:58 PMNicolas Ouporov
04/02/2024, 2:25 AMNate
04/02/2024, 2:28 AMState
(pydantic model instance) and write it someplace, how you would do that depends on why you want to
• the prefect API already tracks all this information, so tracking it otherwise seems not necessary (without knowing more about your scenario)Nate
04/02/2024, 2:30 AMFailed
/ Completed
etc? not track the current stateNate
04/02/2024, 2:31 AMset_task_run_state
client method for that
https://github.com/PrefectHQ/prefect/blob/ba9c176a14e0db8a6a3ab3e1aac1b37ae30e4bf8/src/prefect/client/orchestration.py#L2297