Priyab Dash
11/29/2021, 9:08 AM@task(log_stdout=True, state_handlers=[notify_run_failure])
def submit_job_run_to_tmc(job_run):
but this is being called twice when we run a flowPriyab Dash
11/29/2021, 9:16 AMwith Flow("Schedule Submitted Jobs Runs on TMC", executor=LocalDaskExecutor()) as flow:
# Find SUBMITTED Job Runs
submitted_job_runs = fetch_submitted_job_runs()
# Submit Job to TMC and update Status to SUBMITTED; handle errors
update_job_run_succeeded.map(job_run=submit_job_run_to_tmc.map(job_run=submitted_job_runs))
Anna Geller
Anna Geller
Kevin Kho
Priyab Dash
11/29/2021, 9:01 PMCode in flow file
with Flow("Schedule Submitted Jobs Runs on TMC", executor=LocalDaskExecutor()) as flow:
# Find SUBMITTED Job Runs
submitted_job_runs = fetch_submitted_job_runs()
# Submit Job to TMC and update Status to SUBMITTED; handle errors
update_job_run_succeeded.map(job_run=submit_job_run_to_tmc.map(job_run=submitted_job_runs))
if __name__ == "__main__":
flow_state = flow.run()
--------------------
This is the state handler code
def notify_run_failure(task, old_state, new_state):
if new_state.is_failed():
logger.warn('Notification of failure to be added.')
return new_state
This is called as below
@task(log_stdout=True, state_handlers=[notify_run_failure])
def submit_job_run_to_tmc(job_run):
Kevin Kho
flow.run()
is causing stuff to run twice. I would try removing it seeing if that helps you.Priyab Dash
11/30/2021, 11:18 AMif __name__ == "__main__":
flow_state = flow.run()
will only run if main file is called. We run our flow files like this in a separate python script
import schedule
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(1).<http://minutes.do|minutes.do>(run_threaded, schedule_job_run.flow.run)
Anna Geller
Priyab Dash
11/30/2021, 11:23 AMAnna Geller
Priyab Dash
11/30/2021, 12:01 PMtry:
assert tjob_run["run_status"] == job_run["run_status"]
except AssertionError as e:
raise signals.SKIP(f"Job Run Status Mismatch. {e}")
Do you have any idea what it will do to the whole flow DAGAnna Geller
Anna Geller
Priyab Dash
11/30/2021, 1:47 PMPriyab Dash
11/30/2021, 1:48 PMPriyab Dash
11/30/2021, 1:48 PMAnna Geller
Priyab Dash
11/30/2021, 1:51 PMAnna Geller
may be when it was originally developed we did not understand the implications of the task orchastration fullyyou’re definitely not alone here. There are many Prefect customers who transitioned to Prefect from a home-grown scheduling system. It’s quite common. I think what can be helpful at this point would be to visit this page https://docs.prefect.io/orchestration/getting-started/set-up.html and set up your orchestration layer.
Priyab Dash
11/30/2021, 1:53 PMAnna Geller
@task(skip_on_upstream_skip=False)
Priyab Dash
11/30/2021, 2:54 PM