Ajay
10/21/2020, 12:18 PMdef task_state_transition_handler(flow_info:Task, old_state: State, new_state: State):
if new_state.is_queued():
aquire_lock(path=new_state.context['flow_name'] + "/" +
new_state.context['today']+"/",
data=new_state.context['flow_run_id'])
elif new_state.is_finished():
release_lock(path=new_state.context['flow_name'] + "/" +
new_state.context['today']+"/",
data=new_state.context['flow_run_id'])
return
josh
10/21/2020, 1:22 PMdef your_handler(...):
from prefect import context
print(context['flow_name'])
...
Ajay
10/21/2020, 4:02 PMjosh
10/21/2020, 4:03 PMAjay
10/21/2020, 4:11 PMwith Flow("ETL", result=s3_result, state_handlers=[flow_state_transition_handler]) as flow:
prefect.context.event_name = keyName
def flow_state_transition_handler(flow_info: Flow, old_state: State, new_state: State):
if new_state.is_successful():
create_file(context["event_name"] + "/" +
context['today'] + "/_success", get_s3_bucket(),
bucket_name, "")
KeyError: 'event_name'
@joshjosh
10/21/2020, 4:13 PMwith Flow():
...
with prefect.context(event_name="name"):
flow.run()
Ajay
10/21/2020, 4:15 PM