Priyab Dash

    Priyab Dash

    9 months ago
    We have a function defined as a task as below
    @task(log_stdout=True, state_handlers=[notify_run_failure])
    def submit_job_run_to_tmc(job_run):
    but this is being called twice when we run a flow
    The flow control looks like below
    with Flow("Schedule Submitted Jobs Runs on TMC", executor=LocalDaskExecutor()) as flow: 
    
        # Find SUBMITTED Job Runs 
    
        submitted_job_runs = fetch_submitted_job_runs() 
    
      
    
        # Submit Job to TMC and update Status to SUBMITTED; handle errors 
    
        update_job_run_succeeded.map(job_run=submit_job_run_to_tmc.map(job_run=submitted_job_runs))
    Anna Geller

    Anna Geller

    9 months ago
    @Priyab Dash do you have flow.run() in your flow file?
    @Priyab Dash to explain: when you still have flow.run() inside of your flow file and you then register, then the entire flow will be executed twice all the time. If you are on Prefect Cloud, there is a feature called Version Locking that enforces that your work runs once and only once - here is how you can enable it: https://docs.prefect.io/orchestration/concepts/flows.html#toggle-version-locking
    Kevin Kho

    Kevin Kho

    9 months ago
    Could you show me the code for your state handler? It could just be your state handler is running multiple times.
    Priyab Dash

    Priyab Dash

    9 months ago
    Hi All apologies for delayed response:
    Code in flow file
    
    with Flow("Schedule Submitted Jobs Runs on TMC", executor=LocalDaskExecutor()) as flow: 
    
        # Find SUBMITTED Job Runs 
    
        submitted_job_runs = fetch_submitted_job_runs() 
    
        # Submit Job to TMC and update Status to SUBMITTED; handle errors 
    
        update_job_run_succeeded.map(job_run=submit_job_run_to_tmc.map(job_run=submitted_job_runs)) 
    
      
    
    if __name__ == "__main__": 
    
        flow_state = flow.run() 
    
    
    --------------------
    
    This is the state handler code 
    
    def notify_run_failure(task, old_state, new_state): 
    
        if new_state.is_failed(): 
            logger.warn('Notification of failure to be added.') 
    
        return new_state 
    
    This is called as below
    
    @task(log_stdout=True, state_handlers=[notify_run_failure])
    def submit_job_run_to_tmc(job_run):
    Kevin Kho

    Kevin Kho

    9 months ago
    No worries! Your state handler looks like. It looks like the
    flow.run()
    is causing stuff to run twice. I would try removing it seeing if that helps you.
    Priyab Dash

    Priyab Dash

    9 months ago
    from what I understand
    if __name__ == "__main__": 
    
        flow_state = flow.run()
    will only run if main file is called. We run our flow files like this in a separate python script
    import schedule 
    
    def run_threaded(job_func): 
    
        job_thread = threading.Thread(target=job_func) 
    
        job_thread.start() 
    
      
    
    schedule.every(1).<http://minutes.do|minutes.do>(run_threaded, schedule_job_run.flow.run)
    Anna Geller

    Anna Geller

    9 months ago
    @Priyab Dash you could use Prefect schedule instead: https://docs.prefect.io/core/concepts/schedules.html
    Priyab Dash

    Priyab Dash

    9 months ago
    ok, just curious do you see any issues in the scheduler used in the code
    Anna Geller

    Anna Geller

    9 months ago
    no issues per se, but using Prefect Cloud Scheduler service would make it fault tolerant and way more reliable. If you schedule this way, you don’t take advantage of Prefect.
    Priyab Dash

    Priyab Dash

    9 months ago
    One another point is that in our code, when we hit any exception in one of our tasks we use signal.SKIP
    try: 
    
        assert tjob_run["run_status"] == job_run["run_status"] 
    
    except AssertionError as e:  
    
        raise signals.SKIP(f"Job Run Status Mismatch. {e}")
    Do you have any idea what it will do to the whole flow DAG
    Anna Geller

    Anna Geller

    9 months ago
    why do you want to skip all exceptions? I wouldn’t do it. What is your use case for using Prefect? What do you try to do with Prefect? In general, Prefect is a dataflow automation and workflow orchestration tool that allows you to run your data workflows ad hoc and on schedule and allows you to monitor the states of your data workflows. It’s designed to eliminate negative engineering, i.e. with Prefect you no longer need to anticipate all possible failure scenarios and if something goes wrong, Prefect will monitor it and give you visibility into what went wrong. So when you use Prefect you shouldn’t be using try/except - this is what Prefect is for.
    @Priyab Dash I would be interested to hear more about your use case to give a better answer
    Priyab Dash

    Priyab Dash

    9 months ago
    well I am maintaining a code originally developed for for a in house scheduling tool
    we use only prefect core for its tasks and flow control
    may be when it was originally developed we did not understand the implications of the task orchastration fully
    Anna Geller

    Anna Geller

    9 months ago
    I see. So if you would want to replace that in-house developed scheduling tool, you could sign up for a free Prefect Cloud account and schedule your flows this way.
    Priyab Dash

    Priyab Dash

    9 months ago
    unfortunately thats not in my hand we are leveraging just the open source community piece because of its Apache 2.0 license
    Anna Geller

    Anna Geller

    9 months ago
    may be when it was originally developed we did not understand the implications of the task orchastration fully
    you’re definitely not alone here. There are many Prefect customers who transitioned to Prefect from a home-grown scheduling system. It’s quite common. I think what can be helpful at this point would be to visit this page https://docs.prefect.io/orchestration/getting-started/set-up.html and set up your orchestration layer.
    Priyab Dash

    Priyab Dash

    9 months ago
    with that limitation to clarify why we use skip is because when we face a specific type of error would like to skip that simple execution of that task alone but from what i hear from u the signal skip is more like the airflow skip for operators where it will skip all the subsequent tasks in the dag
    Anna Geller

    Anna Geller

    9 months ago
    exactly! this is a good analogy. The SKIP exception will cause the task to be skipped, and by default it would skip also all downstream tasks, unless those tasks are configured with:
    @task(skip_on_upstream_skip=False)
    Priyab Dash

    Priyab Dash

    9 months ago
    thx a lot am more clear now