Hey, team! I have a `task` being mapped with the f...
# ask-community
a
Hey, team! I have a
task
being mapped with the following signature:
Copy code
@task(max_retries=3, retry_delay=timedelta(seconds=5))
def run_sync_loads(config: dict) -> dict:
The `task`'s results are being sent to the following `task`:
Copy code
@task(max_retries=3, retry_delay=timedelta(seconds=5), trigger=triggers.always_run)
def save_model_config(loads: list, configs: list):
Both are reference tasks to the
flow
. The problem is that one of the mapped tasks is failing but not being retried and I have no indication that some tasks failed at the end of the flow. To be precise, the logs show that one retry is being called but the task should fail again. It's not failing, even completing. Any ideas on why this is happening?
c
To be precise, the logs show that one retry is being called but the task should fail again. It’s not failing, even completing.
could you share some more information here, maybe a copy / paste of the logs?
If you are running this locally using
flow.run
, note that retries won’t occur until all tasks have had a chance to run through their pipeline checks (this is in contrast to Cloud execution which is asynchronous)
a
sure...
Copy code
[2019-10-21 22:45:58,594] INFO - prefect.TaskRunner | Task 'run_sync_loads': Starting task run...
[2019-10-21 22:45:58,616] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': Starting task run...
[2019-10-21 22:46:28,608] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': finished task run for task with final state: 'Retrying'
[2019-10-21 22:46:28,629] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': Starting task run...
[2019-10-21 22:48:45,075] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': finished task run for task with final state: 'Success'
[2019-10-21 22:48:45,100] INFO - prefect.TaskRunner | Task 'run_sync_loads[2]': Starting task run...
[2019-10-21 22:49:04,185] INFO - prefect.TaskRunner | Task 'run_sync_loads[2]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,197] INFO - prefect.TaskRunner | Task 'run_sync_loads': finished task run for task with final state: 'Mapped'
[2019-10-21 22:49:04,225] INFO - prefect.TaskRunner | Task 'save_model_config': Starting task run...
[2019-10-21 22:49:04,234] INFO - prefect.TaskRunner | Task 'save_model_config': finished task run for task with final state: 'Pending'
[2019-10-21 22:49:04,241] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2019-10-21 22:49:04,243] INFO - prefect.FlowRunner | Beginning Flow run for 'Data Platform Sync Control'
[2019-10-21 22:49:04,278] INFO - prefect.TaskRunner | Task 'set_sync_interval': Starting task run...
[2019-10-21 22:49:04,300] INFO - prefect.TaskRunner | Task 'set_sync_interval[0]': Starting task run...
[2019-10-21 22:49:04,311] INFO - prefect.TaskRunner | Task 'set_sync_interval[0]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,336] INFO - prefect.TaskRunner | Task 'set_sync_interval[1]': Starting task run...
[2019-10-21 22:49:04,347] INFO - prefect.TaskRunner | Task 'set_sync_interval[1]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,387] INFO - prefect.TaskRunner | Task 'set_sync_interval': finished task run for task with final state: 'Mapped'
[2019-10-21 22:49:04,415] INFO - prefect.TaskRunner | Task 'run_sync_loads': Starting task run...
[2019-10-21 22:49:04,438] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': Starting task run...
[2019-10-21 22:49:04,452] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,487] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': Starting task run...
[2019-10-21 22:49:04,506] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,565] INFO - prefect.TaskRunner | Task 'run_sync_loads': finished task run for task with final state: 'Mapped'
[2019-10-21 22:49:04,601] INFO - prefect.TaskRunner | Task 'save_model_config': Starting task run...
[2019-10-21 22:49:53,704] INFO - prefect.TaskRunner | Task 'save_model_config': finished task run for task with final state: 'Success'
[2019-10-21 22:49:53,706] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
I have some breakpoints in the code. That's why I know it's not retrying. Even though the logs show
SUCCESS
, the task should fail every time
c
just as an FYI you might be able to gain some insight if you set your logging level to
"DEBUG"
either via environment variable:
Copy code
export PREFECT__LOGGING__LEVEL="DEBUG"
or in your user config file
👍 1
a
and I'm logging the error from the function being called as well. There is only one log
c
I’m not sure, this honestly looks normal to me -> it appears your task does in fact retry and the second time it runs it succeeds
wait hang on
I apologize, I do see something weird -> it looks like on your second run only two
run_sync_loads
are run, whereas the first time there were 3
a
sorry, do not consider the [2]. I removed from the logs to decrease the text size
👍 1
c
gotcha --> why are you so sure the task will fail when it is retried? The logs seem to tell a normal story
a
I intentionally added a bug on the [1] to check the flow behavior when one of the tasks fail
c
The debug logs will help as well, because they will tell us what state the task began in
I just tried recreating with this flow:
Copy code
from prefect import task, Flow, Parameter
from datetime import timedelta
nums = Parameter("nums")
@task(max_retries=2, retry_delay=timedelta(seconds=5))
def div(x):
    return 1 / x
with Flow("test") as flow:
    div.map(nums)
flow.run(nums=[0, 1, 2])
and my flow
Failed
after 2 retries, as I would expect
a
and the other two executions were successful, right?
c
not sure what you mean - I only ran the flow one time
but the other two
div
tasks were successful, yea, but not the first one
a
ok, I am running again with the
DEBUG
flag...
👍 1
this task is calling a lambda function. I just checked that the function was called only once (with error)
c
an AWS lambda function?
or a python lambda function?
a
AWS lambda. I'm not using the lambda task, I'm calling using
boto3
c
oh this sounds like it might be a problem with your AWS code then; i would bet that if you put a
print
statement or a
logger
statement both above and below your
boto3
call, you will see them each time the task runs
a
I checked
CloudWatch
. The function was called only 3 times. 2 successful runs and 1 fail. It should have been called at least 4 times if it passed on the second time
c
right, so I’m guessing your
boto3
code has some baked in caching or something -> are you sure you aren’t providing some input as a unique identifier to the Lambda call, preventing AWS from triggering it again? You should also put print / logger statements above and below the
boto3
call, that would prove this
a
I'll check again the code to make sure the issue is isolated
c
ok cool let me know!
👍 1
a
Found the problem on my retry code. Thank you @Chris White
💯 1
c
no problem @Argemiro Neto! glad you figured it out
👍 1