Kyle McChesney
08/11/2021, 8:12 PMChris White
prefect.context.parameters
which is a dictionary of parameter name -> valueKyle McChesney
08/11/2021, 8:17 PMKyle McChesney
08/11/2021, 8:57 PMwith Flow(
'example',
executor=LocalExecutor(),
state_handlers=[flow_external_job_updater]
) as flow:
external_job_id = Parameter('external_job_id')
...
And my state handler looks like:
def flow_external_job_updater(flow, current_state, next_state):
logger = prefect.context.get('logger')
external_job_id = prefect.context.get('parameters').get('external_job_id')
if external_job_id is None:
<http://logger.info|logger.info>(
'Skipping job update for flow transition %s -> %s, no external job provided',
current_state.__class__.__name__,
next_state.__class__.__name__,
)
else:
<http://logger.info|logger.info>(
'Updating external job: %s for flow transition %s -> %s',
external_job_id,
current_state.__class__.__name__,
next_state.__class__.__name__,
)
Just logging for now, but when I run the flow with prefect run --execute -p path/to/flow.py
. I get the following error:
ValueError: Flow.run received the following unexpected parameters: external_job_id
Flow run failed!
It seems its not identifying the parameter as “registered” to the flow. Likely because its only used implicitly via the handler. If I add the parameter as an input to a task, it works (i dont actually have a task that needs this). Is there any way to force include this parameter using the functional flow definition API?Chris White
flow.add_task(Parameter('external_job_id'))
Kyle McChesney
08/11/2021, 9:27 PMa = 1 / 0
to a task.
Logs:
└── 15:23:38 | ERROR | Task 'task[0]': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/kylem/Dev/mb/mb-prefect-flows/.venv/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
logger=self.logger,
File "/Users/kylem/Dev/mb/mb-prefect-flows/.venv/lib/python3.7/site-packages/prefect/utilities/executors.py", line 328, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "flows/my_flow.py", line 203, in task
a = 1 / 0
ZeroDivisionError: division by zero
└── 15:23:38 | INFO | Task 'task[0]': Finished task run for task with final state: 'Failed'
└── 15:23:38 | INFO | Flow run FAILED: some reference tasks failed.
└── 15:23:38 | INFO | Updating external job: 1 for flow transition Running -> Failed
Kyle McChesney
08/11/2021, 9:28 PMChris White