https://prefect.io logo
#prefect-community
Title
# prefect-community
k

Kyle McChesney

10/07/2022, 10:34 PM
I’ve asked a similar question before, but I have a bit better summary of what I am trying to accomplish. Original thread is here: https://prefect-community.slack.com/archives/CL09KU1K7/p1659389359820829 New “angle” to the question is basically: “I have a flow state handler function and I need it to behave differently based on something that happens during the flow run” The specific example is basically • I have a flow state handler that updates an external api with the status/running of a flow. This functionality is optional and controlled via an
external_job_id
parameter. So if a flow is running and it is to be associated with some external job record, the id is passed. If the id is passed, updates are made as part of the flow state handler (marking the job done when the flow completes, recording error messages, etc) The above is working great. The flow transition handler is generic, and simply checks for the existence of the parameter in the context. The “new” use case is basically a flow that is triggered by some outside automation. Part of the flows responsibility is to create the external job record, get its ID, and update it when the flow completes. Meaning I dont have the parameter at the start. Is there anyway to save some kind of information onto the flow, within a task, so that I can check for its existence within the flow state transition function? I’ve tried doing
flow.add_task(Parameter('external_job_id', default=$res)
where
$res
is the result of a task that creates the job and returns the id (half way though the flow). I also tried setting it in the context directly within the task. No luck
1
Example flow code
Copy code
with Flow(
    'flow',
    executor=LocalDaskExecutor(),
    state_handlers=[external_job_updater],
) as flow:
    input_url = Parameter('input_url')
    job_name = Parameter('job_name', default=None)
    username = Parameter('username', default='system')

    external_job_id = create_job(job_name, username)

    with prefect.context(external_job_id=external_job_id):
        inputs = load_inputs(input_url)
        process.map(inputs)
a

Anna Geller

10/08/2022, 12:12 AM
the KV Store sounds like something you may consider (this is a Cloud-only feature though)
k

Kyle McChesney

10/11/2022, 2:36 PM
hmmm, is there any way to get a resource manger to function like a “normal” context manager? I.E. get the task results/states for any tasks called within the manager block?
a

Anna Geller

10/11/2022, 5:37 PM
get the task results/states for any tasks
you may solve this problem using
Copy code
get_task_run_result
example:
Copy code
from prefect.tasks.prefect.flow_run import get_task_run_result

get_task_run_result(flow_run_id, task_slug, map_index=-1, poll_time=5)
the resource manager already is a normal context manager, just restrained by a DAG -- once you migrate to Prefect 2, this will no longer be an issue
3 Views