S K

    S K

    1 year ago
    Need help for this. This is on server as the backend. On exception I am using "raise SystemExit(0)", but the flow is not quitting immediately and takes 15 minutes or so for the flow to fail. What is the issue?
    @task()
    def mainprocess(a):
        global df_single_record
        try:
            for indx in df_data_extracted.index + 1:
                df_single_record = df_data_extracted.iloc[indx - 1:indx]
                converttojson(df_single_record)
                postjsontoqapi(df_json_data)
        except Exception as e:
            logger = get_logger()
            logger.error(str(get_pst_time())
                         + '==========ERROR IN mainprocess() WHILE CONVERTING TO JSON/POSTING TO Q-API: '
                          + str(e))
            raise SystemExit(0)
    Michael Adkins

    Michael Adkins

    1 year ago
    Hi @S K this is likely not captured by our standard exception handling because
    SystemExit
    is a reserved exception type https://docs.python.org/3/library/exceptions.html#SystemExit
    S K

    S K

    1 year ago
    @Michael Adkins Thanks for the info, can you please show me the piece of code, how to fail the flow using class prefect.engine.signals.FAIL?
    Michael Adkins

    Michael Adkins

    1 year ago
    Ah sorry, you just import it and raise it like an exception (ie
    raise FAIL()
    ) https://docs.prefect.io/core/concepts/execution.html#state-signals
    S K

    S K

    1 year ago
    @Michael Adkins After following the above, I am able to fail the task successfully, but other subsequent tasks are executed which I don't want. I want to fail the complete flow if any of the tasks fails, how to achieve this?
    Michael Adkins

    Michael Adkins

    1 year ago
    By default, if a task fails its downstream tasks should also fail. Did you set a custom trigger (https://docs.prefect.io/core/getting_started/next-steps.html#triggers) / are your tasks running in parallel?
    S K

    S K

    1 year ago
    No I did not set any custom triggers, tasks are not running in parallel and they run in sequential way
    with Flow('flow_name', storage=Local()) as flow:
        check_if_flow_is_running()
        getdata = readssmandextractdata()
        vmainprocess = mainprocess(getdata)
        updatecontroltable(vmainprocess)
    
    flow.run()
    @Michael Adkins This is how I am executing the tasks
    checkflow = check_if_flow_is_running()
    getdata = readssmandextractdata(checkflow)
    mainlogic = mainprocess(getdata)
    updatecontroltable(mainlogic)
    @Michael Adkins thx much, able to stop the flow by doing as above...
    Michael Adkins

    Michael Adkins

    1 year ago
    Great!