Kyle McChesney
10/07/2022, 10:34 PMexternal_job_id
parameter. So if a flow is running and it is to be associated with some external job record, the id is passed. If the id is passed, updates are made as part of the flow state handler (marking the job done when the flow completes, recording error messages, etc)
The above is working great. The flow transition handler is generic, and simply checks for the existence of the parameter in the context. The “new” use case is basically a flow that is triggered by some outside automation. Part of the flows responsibility is to create the external job record, get its ID, and update it when the flow completes. Meaning I dont have the parameter at the start. Is there anyway to save some kind of information onto the flow, within a task, so that I can check for its existence within the flow state transition function? I’ve tried doing flow.add_task(Parameter('external_job_id', default=$res)
where $res
is the result of a task that creates the job and returns the id (half way though the flow). I also tried setting it in the context directly within the task. No luckwith Flow(
'flow',
executor=LocalDaskExecutor(),
state_handlers=[external_job_updater],
) as flow:
input_url = Parameter('input_url')
job_name = Parameter('job_name', default=None)
username = Parameter('username', default='system')
external_job_id = create_job(job_name, username)
with prefect.context(external_job_id=external_job_id):
inputs = load_inputs(input_url)
process.map(inputs)
Anna Geller
10/08/2022, 12:12 AMKyle McChesney
10/11/2022, 2:36 PMAnna Geller
10/11/2022, 5:37 PMget the task results/states for any tasksyou may solve this problem using
get_task_run_result
example:
from prefect.tasks.prefect.flow_run import get_task_run_result
get_task_run_result(flow_run_id, task_slug, map_index=-1, poll_time=5)
the resource manager already is a normal context manager, just restrained by a DAG -- once you migrate to Prefect 2, this will no longer be an issue