Priyab Dash
11/29/2021, 9:08 AM@task(log_stdout=True, state_handlers=[notify_run_failure])
def submit_job_run_to_tmc(job_run):Priyab Dash
11/29/2021, 9:16 AMwith Flow("Schedule Submitted Jobs Runs on TMC", executor=LocalDaskExecutor()) as flow: 
    # Find SUBMITTED Job Runs 
    submitted_job_runs = fetch_submitted_job_runs() 
  
    # Submit Job to TMC and update Status to SUBMITTED; handle errors 
    update_job_run_succeeded.map(job_run=submit_job_run_to_tmc.map(job_run=submitted_job_runs))Anna Geller
Anna Geller
Kevin Kho
Priyab Dash
11/29/2021, 9:01 PMCode in flow file
with Flow("Schedule Submitted Jobs Runs on TMC", executor=LocalDaskExecutor()) as flow: 
    # Find SUBMITTED Job Runs 
    submitted_job_runs = fetch_submitted_job_runs() 
    # Submit Job to TMC and update Status to SUBMITTED; handle errors 
    update_job_run_succeeded.map(job_run=submit_job_run_to_tmc.map(job_run=submitted_job_runs)) 
  
if __name__ == "__main__": 
    flow_state = flow.run() 
--------------------
This is the state handler code 
def notify_run_failure(task, old_state, new_state): 
    if new_state.is_failed(): 
        logger.warn('Notification of failure to be added.') 
    return new_state 
This is called as below
@task(log_stdout=True, state_handlers=[notify_run_failure])
def submit_job_run_to_tmc(job_run):Kevin Kho
flow.run()Priyab Dash
11/30/2021, 11:18 AMif __name__ == "__main__": 
    flow_state = flow.run()import schedule 
def run_threaded(job_func): 
    job_thread = threading.Thread(target=job_func) 
    job_thread.start() 
  
schedule.every(1).<http://minutes.do|minutes.do>(run_threaded, schedule_job_run.flow.run)Anna Geller
Priyab Dash
11/30/2021, 11:23 AMAnna Geller
Priyab Dash
11/30/2021, 12:01 PMtry: 
    assert tjob_run["run_status"] == job_run["run_status"] 
except AssertionError as e:  
    raise signals.SKIP(f"Job Run Status Mismatch. {e}")Anna Geller
Anna Geller
Priyab Dash
11/30/2021, 1:47 PMPriyab Dash
11/30/2021, 1:48 PMPriyab Dash
11/30/2021, 1:48 PMAnna Geller
Priyab Dash
11/30/2021, 1:51 PMAnna Geller
may be when it was originally developed we did not understand the implications of the task orchastration fullyyou’re definitely not alone here. There are many Prefect customers who transitioned to Prefect from a home-grown scheduling system. It’s quite common. I think what can be helpful at this point would be to visit this page https://docs.prefect.io/orchestration/getting-started/set-up.html and set up your orchestration layer.
Priyab Dash
11/30/2021, 1:53 PMAnna Geller
@task(skip_on_upstream_skip=False)Priyab Dash
11/30/2021, 2:54 PM