high level question, is it possible to use/referen...
# ask-community
k
high level question, is it possible to use/reference parameters within a state handler. The use case is as follows • I have a “job” record in a database/application that is external to prefect • The execution of this external “job” can be represented/encapsulated partially by a flow run • I’d like to use state handlers to update the the job status in the external system • To do so, I need to grab the external job id from a parameter (since it is an input to the flow) Reasons for not putting this in a task • There may be a number of different flows which will use this, but the update logic will be the same • The external job id is an optional input, so I want to avoid coupling it as much as possible with the flow code • I want to apply these “updates” and different parts of the flow (aka specific tasks), for different flows it will be for different tasks
c
Hey @Kyle McChesney -- you can access your flow run parameters at
prefect.context.parameters
which is a dictionary of parameter name -> value
1
k
ty
@Chris White quick follow up. I added the parameter like so:
Copy code
with Flow(
    'example',
    executor=LocalExecutor(),
    state_handlers=[flow_external_job_updater]
) as flow:

    external_job_id = Parameter('external_job_id')
    ...
And my state handler looks like:
Copy code
def flow_external_job_updater(flow, current_state, next_state):
    logger = prefect.context.get('logger')
    external_job_id = prefect.context.get('parameters').get('external_job_id')

    if external_job_id is None:
        <http://logger.info|logger.info>(
            'Skipping job update for flow transition %s -> %s, no external job provided',
            current_state.__class__.__name__,
            next_state.__class__.__name__,
        )

    else:
        <http://logger.info|logger.info>(
            'Updating external job: %s for flow transition %s -> %s',
            external_job_id,
            current_state.__class__.__name__,
            next_state.__class__.__name__,
        )
Just logging for now, but when I run the flow with
prefect run --execute -p path/to/flow.py
. I get the following error:
Copy code
ValueError: Flow.run received the following unexpected parameters: external_job_id

Flow run failed!
It seems its not identifying the parameter as “registered” to the flow. Likely because its only used implicitly via the handler. If I add the parameter as an input to a task, it works (i dont actually have a task that needs this). Is there any way to force include this parameter using the functional flow definition API?
c
First off, nice work figuring out why your parameter wasn't automatically added 😄 To add it explicitly, you can use
flow.add_task(Parameter('external_job_id'))
k
Awesome, that works. Last question! Is there a way to get the “reason” for a flow failing. Example, I added
a = 1 / 0
to a task. Logs:
Copy code
└── 15:23:38 | ERROR   | Task 'task[0]': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/kylem/Dev/mb/mb-prefect-flows/.venv/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
    logger=self.logger,
  File "/Users/kylem/Dev/mb/mb-prefect-flows/.venv/lib/python3.7/site-packages/prefect/utilities/executors.py", line 328, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "flows/my_flow.py", line 203, in task
    a = 1 / 0
ZeroDivisionError: division by zero
└── 15:23:38 | INFO    | Task 'task[0]': Finished task run for task with final state: 'Failed'
└── 15:23:38 | INFO    | Flow run FAILED: some reference tasks failed.
└── 15:23:38 | INFO    | Updating external job: 1 for flow transition Running -> Failed
I’d love to be able to grab that exception type and message from the flow state transition from Running -> Failed
c
Not really, because there are many many different reasons a flow may fail -- executor crash / process death / task failures (note that task failure does not immediately imply flow failure - it depends on your task triggers and reference task configuration) / state handler exceptions / etc.