We have a function defined as a task as below ```@...
# ask-community
p
We have a function defined as a task as below
Copy code
@task(log_stdout=True, state_handlers=[notify_run_failure])
def submit_job_run_to_tmc(job_run):
but this is being called twice when we run a flow
The flow control looks like below
Copy code
with Flow("Schedule Submitted Jobs Runs on TMC", executor=LocalDaskExecutor()) as flow: 

    # Find SUBMITTED Job Runs 

    submitted_job_runs = fetch_submitted_job_runs() 

  

    # Submit Job to TMC and update Status to SUBMITTED; handle errors 

    update_job_run_succeeded.map(job_run=submit_job_run_to_tmc.map(job_run=submitted_job_runs))
a
@Priyab Dash do you have flow.run() in your flow file?
@Priyab Dash to explain: when you still have flow.run() inside of your flow file and you then register, then the entire flow will be executed twice all the time. If you are on Prefect Cloud, there is a feature called Version Locking that enforces that your work runs once and only once - here is how you can enable it: https://docs.prefect.io/orchestration/concepts/flows.html#toggle-version-locking
k
Could you show me the code for your state handler? It could just be your state handler is running multiple times.
p
Hi All apologies for delayed response:
Copy code
Code in flow file

with Flow("Schedule Submitted Jobs Runs on TMC", executor=LocalDaskExecutor()) as flow: 

    # Find SUBMITTED Job Runs 

    submitted_job_runs = fetch_submitted_job_runs() 

    # Submit Job to TMC and update Status to SUBMITTED; handle errors 

    update_job_run_succeeded.map(job_run=submit_job_run_to_tmc.map(job_run=submitted_job_runs)) 

  

if __name__ == "__main__": 

    flow_state = flow.run() 


--------------------

This is the state handler code 

def notify_run_failure(task, old_state, new_state): 

    if new_state.is_failed(): 
        logger.warn('Notification of failure to be added.') 

    return new_state 

This is called as below

@task(log_stdout=True, state_handlers=[notify_run_failure])
def submit_job_run_to_tmc(job_run):
k
No worries! Your state handler looks like. It looks like the
flow.run()
is causing stuff to run twice. I would try removing it seeing if that helps you.
p
from what I understand
Copy code
if __name__ == "__main__": 

    flow_state = flow.run()
will only run if main file is called. We run our flow files like this in a separate python script
Copy code
import schedule 

def run_threaded(job_func): 

    job_thread = threading.Thread(target=job_func) 

    job_thread.start() 

  

schedule.every(1).<http://minutes.do|minutes.do>(run_threaded, schedule_job_run.flow.run)
a
@Priyab Dash you could use Prefect schedule instead: https://docs.prefect.io/core/concepts/schedules.html
p
ok, just curious do you see any issues in the scheduler used in the code
a
no issues per se, but using Prefect Cloud Scheduler service would make it fault tolerant and way more reliable. If you schedule this way, you don’t take advantage of Prefect.
p
One another point is that in our code, when we hit any exception in one of our tasks we use signal.SKIP
Copy code
try: 

    assert tjob_run["run_status"] == job_run["run_status"] 

except AssertionError as e:  

    raise signals.SKIP(f"Job Run Status Mismatch. {e}")
Do you have any idea what it will do to the whole flow DAG
a
why do you want to skip all exceptions? I wouldn’t do it. What is your use case for using Prefect? What do you try to do with Prefect? In general, Prefect is a dataflow automation and workflow orchestration tool that allows you to run your data workflows ad hoc and on schedule and allows you to monitor the states of your data workflows. It’s designed to eliminate negative engineering, i.e. with Prefect you no longer need to anticipate all possible failure scenarios and if something goes wrong, Prefect will monitor it and give you visibility into what went wrong. So when you use Prefect you shouldn’t be using try/except - this is what Prefect is for.
@Priyab Dash I would be interested to hear more about your use case to give a better answer
p
well I am maintaining a code originally developed for for a in house scheduling tool
we use only prefect core for its tasks and flow control
may be when it was originally developed we did not understand the implications of the task orchastration fully
a
I see. So if you would want to replace that in-house developed scheduling tool, you could sign up for a free Prefect Cloud account and schedule your flows this way.
p
unfortunately thats not in my hand we are leveraging just the open source community piece because of its Apache 2.0 license
a
may be when it was originally developed we did not understand the implications of the task orchastration fully
you’re definitely not alone here. There are many Prefect customers who transitioned to Prefect from a home-grown scheduling system. It’s quite common. I think what can be helpful at this point would be to visit this page https://docs.prefect.io/orchestration/getting-started/set-up.html and set up your orchestration layer.
p
with that limitation to clarify why we use skip is because when we face a specific type of error would like to skip that simple execution of that task alone but from what i hear from u the signal skip is more like the airflow skip for operators where it will skip all the subsequent tasks in the dag
a
exactly! this is a good analogy. The SKIP exception will cause the task to be skipped, and by default it would skip also all downstream tasks, unless those tasks are configured with:
Copy code
@task(skip_on_upstream_skip=False)
p
thx a lot am more clear now
🙌 1