Derek
03/20/2024, 10:10 PMNate
03/20/2024, 10:16 PMNate
03/20/2024, 10:18 PMdef hook(task, task_run, state, row_id):
# some hook
@flow
def foo():
row_id = 42
new_task = old_task.with_options(on_completion=[partial(hook, row_id=row_id)])
new_task(some_args, row_id)
Derek
03/20/2024, 10:31 PMNate
03/20/2024, 10:32 PMall_on_completion_hooks = old_task.on_completion + [partial(hook, row_id=row_id)]
new_task = old_task.with_options(on_completion=all_on_completion_hooks)
Derek
03/20/2024, 10:40 PMNate
03/20/2024, 10:41 PMNate
03/20/2024, 10:41 PMDerek
03/20/2024, 10:44 PMNate
03/20/2024, 10:46 PMprefect.runtime.flow_run.parameters
contains the params passed to the current flow run which would be accessible in a static hook you could implementDerek
03/20/2024, 10:49 PMdef handle_fail(flow: Flow, flow_run: FlowRun, state: State):
row_id = flow_run.parameters.get('row_id')
with Session(get_engine()) as session:
row_prompt = session.query(Prompt).filter_by(id=row_id).first()
row_prompt.status = 'FAILED'
Derek
03/20/2024, 10:50 PMNate
03/20/2024, 10:50 PMDerek
03/20/2024, 10:51 PMDerek
03/20/2024, 10:51 PMNate
03/20/2024, 10:55 PMDerek
03/21/2024, 1:06 AM