Hello, I have a bunch of tasks in my flow. When o...
# prefect-server
i
Hello, I have a bunch of tasks in my flow. When one of the tasks fails, I need to restart the previous task and the flow should continue from that point as if nothing happened in the flow. Is it possible?
k
Hey @Ismail Cenik, I think you may want to combine those steps into one task is they are linked together like that so that the retry hits them both. But you can do this if you manually set the state of the task from SUCCESS to SCHEDULED using the GraphQL API
i
Let me explain my case a little bit, I have a bunch of Amazon Kinesis Data Analytics (KDA) applications. I have 2 main tasks, startKDA, waitKDA (wait until the end of a job, then stop KDA). I controlled many KDAs in a flow by utilizing those two tasks via parameters such as app_name. According to a new requirement, when a KDA takes too much time (timeout), I need to stop it, then trigger the startKDA for that KDA (which is the previous task in the flow for that KDA). So if it is successful this time, there will be no failure in the whole flow.
k
I think the easiest thing is to group them in a new task like:
Copy code
@task
def new_task(app_name):
    id = startKDA.run(app_name)
    waitKDA.run(id)
    return
that way they fail together. You can then use the
boto3
timeout or prefect timeout to do:
Copy code
@task(timeout=60, retry_info_here)
def new_task(app_name):
    id = startKDA.run(app_name)
    waitKDA.run(id)
    return
and it would retry this block right?
Otherwise you need a
client.graphql()
call to the
set_task_run_states
mutation and then get the `startKDA`task task_run_id and then set it to Scheduled (or Failed) and then restart the flow run. You can do this in the state handler of the
waitKDA
task. If
waitKDA
fails, use graphql to mark
startKDA
as failed also.
i
I have 13 KDAs, and some of them work in parallel, so I do not want to fail the whole flow because of a KDA. Your first recommendation seems to be more proper for our case.
k
Ah I see. You don’t need to fail the whole Flow though. This can be done in the task-level state handler (with a bit more work)
i
Yes. If you have a sample code for the task-level state handler, I can overview it as well.
k
Not full but to give you an idea:
Copy code
def my_state_handler(task, old_state, new_state):
    if new_state.is_failed():

        # get task id
        task_id = client.graphql()
        task_id = task_id[find_the_results_here]

        # using task id, call set task run states to mark parevious as failed
        client.graphql("""mutation {set_task_run_state{flow_id}}...""")

    return new_state
and then attach this to the
waitKDA
task. You need two calls and some filtering to get the specific task if of the upstream
startKDA
i
Copy code
@task(timeout=60, retry_info_here)
def new_task(app_name):
    id = startKDA.run(app_name)
    waitKDA.run(id)
    return
Let's assume that I will use the above solution. I have 2 different flows which include different KDAs. When a timeout occurs, one of my flows will continue with the next step, however, the other one will retry the task. How can I specify these actions in the task? And one more question is that can I provide a dynamic timeout value to the task? I have different thresholds value for each KDA. Will there be a problem with flow registration or another issue?
k
For one, you would just not set retries and and you them for the other. They can’t be dynamic in a sense that a Prefect Parameter will change the value of the task timeout, but from what I know, maybe you can create the boto3 client inside a task and use a parameter to configure a timeout?
i
So, I think, it is better to write a separate task for each KDA by putting common codes into another function to call from those new tasks and providing each static timeout value separately Is the following definition correct for a 1-hour timeout value?
Copy code
@task(log_stdout=True, timeout=3600, max_retries=2, retry_delay=timedelta(seconds=60))
One more thing, when a timeout occurs, before retrying or going to the next step, I want to call boto3 Stop KDA (force = true), since it is stuck, otherwise, the KDA cannot stop. So is it possible to run some code before retry?
Copy code
@task(log_stdout=True, timeout=3600, max_retries=2, retry_delay=timedelta(seconds=60), state_handlers=[my_state_handler])
So I can handle it inside my_state_handler, right?
k
Yes you can in the state handler
i
Hello, There is a little bit change in our requirement. My task is like that
Copy code
@task(log_stdout=True, max_retries=5, retry_delay=timedelta(seconds=60))
def waitPlanDataLoadingTask(app_name, logId, parentLogId, envName):
I use python timeout and assume that timeout occurs after 10 mins later Then I need to fail the flow, so when I use "raise signals.FAIL("Fail the flow")", the flow recovers itself because of the retry mechanism. So how can I bypass the retry mechanism without removing it? Should I use graphQL to make the flow fail? Could you please send a code example for this case?
k
raise signals.ENDRUN
fails it without respecting the retries. You need something like
raise signals.ENDRUN(Failed())