https://prefect.io logo
Title
a

Argemiro Neto

10/22/2019, 12:05 AM
Hey, team! I have a
task
being mapped with the following signature:
@task(max_retries=3, retry_delay=timedelta(seconds=5))
def run_sync_loads(config: dict) -> dict:
The `task`'s results are being sent to the following `task`:
@task(max_retries=3, retry_delay=timedelta(seconds=5), trigger=triggers.always_run)
def save_model_config(loads: list, configs: list):
Both are reference tasks to the
flow
. The problem is that one of the mapped tasks is failing but not being retried and I have no indication that some tasks failed at the end of the flow. To be precise, the logs show that one retry is being called but the task should fail again. It's not failing, even completing. Any ideas on why this is happening?
c

Chris White

10/22/2019, 12:11 AM
To be precise, the logs show that one retry is being called but the task should fail again. It’s not failing, even completing.
could you share some more information here, maybe a copy / paste of the logs?
If you are running this locally using
flow.run
, note that retries won’t occur until all tasks have had a chance to run through their pipeline checks (this is in contrast to Cloud execution which is asynchronous)
a

Argemiro Neto

10/22/2019, 12:28 AM
sure...
[2019-10-21 22:45:58,594] INFO - prefect.TaskRunner | Task 'run_sync_loads': Starting task run...
[2019-10-21 22:45:58,616] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': Starting task run...
[2019-10-21 22:46:28,608] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': finished task run for task with final state: 'Retrying'
[2019-10-21 22:46:28,629] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': Starting task run...
[2019-10-21 22:48:45,075] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': finished task run for task with final state: 'Success'
[2019-10-21 22:48:45,100] INFO - prefect.TaskRunner | Task 'run_sync_loads[2]': Starting task run...
[2019-10-21 22:49:04,185] INFO - prefect.TaskRunner | Task 'run_sync_loads[2]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,197] INFO - prefect.TaskRunner | Task 'run_sync_loads': finished task run for task with final state: 'Mapped'
[2019-10-21 22:49:04,225] INFO - prefect.TaskRunner | Task 'save_model_config': Starting task run...
[2019-10-21 22:49:04,234] INFO - prefect.TaskRunner | Task 'save_model_config': finished task run for task with final state: 'Pending'
[2019-10-21 22:49:04,241] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2019-10-21 22:49:04,243] INFO - prefect.FlowRunner | Beginning Flow run for 'Data Platform Sync Control'
[2019-10-21 22:49:04,278] INFO - prefect.TaskRunner | Task 'set_sync_interval': Starting task run...
[2019-10-21 22:49:04,300] INFO - prefect.TaskRunner | Task 'set_sync_interval[0]': Starting task run...
[2019-10-21 22:49:04,311] INFO - prefect.TaskRunner | Task 'set_sync_interval[0]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,336] INFO - prefect.TaskRunner | Task 'set_sync_interval[1]': Starting task run...
[2019-10-21 22:49:04,347] INFO - prefect.TaskRunner | Task 'set_sync_interval[1]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,387] INFO - prefect.TaskRunner | Task 'set_sync_interval': finished task run for task with final state: 'Mapped'
[2019-10-21 22:49:04,415] INFO - prefect.TaskRunner | Task 'run_sync_loads': Starting task run...
[2019-10-21 22:49:04,438] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': Starting task run...
[2019-10-21 22:49:04,452] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,487] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': Starting task run...
[2019-10-21 22:49:04,506] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,565] INFO - prefect.TaskRunner | Task 'run_sync_loads': finished task run for task with final state: 'Mapped'
[2019-10-21 22:49:04,601] INFO - prefect.TaskRunner | Task 'save_model_config': Starting task run...
[2019-10-21 22:49:53,704] INFO - prefect.TaskRunner | Task 'save_model_config': finished task run for task with final state: 'Success'
[2019-10-21 22:49:53,706] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
I have some breakpoints in the code. That's why I know it's not retrying. Even though the logs show
SUCCESS
, the task should fail every time
c

Chris White

10/22/2019, 12:30 AM
just as an FYI you might be able to gain some insight if you set your logging level to
"DEBUG"
either via environment variable:
export PREFECT__LOGGING__LEVEL="DEBUG"
or in your user config file
👍 1
a

Argemiro Neto

10/22/2019, 12:31 AM
and I'm logging the error from the function being called as well. There is only one log
c

Chris White

10/22/2019, 12:32 AM
I’m not sure, this honestly looks normal to me -> it appears your task does in fact retry and the second time it runs it succeeds
wait hang on
I apologize, I do see something weird -> it looks like on your second run only two
run_sync_loads
are run, whereas the first time there were 3
a

Argemiro Neto

10/22/2019, 12:35 AM
sorry, do not consider the [2]. I removed from the logs to decrease the text size
👍 1
c

Chris White

10/22/2019, 12:36 AM
gotcha --> why are you so sure the task will fail when it is retried? The logs seem to tell a normal story
a

Argemiro Neto

10/22/2019, 12:36 AM
I intentionally added a bug on the [1] to check the flow behavior when one of the tasks fail
c

Chris White

10/22/2019, 12:36 AM
The debug logs will help as well, because they will tell us what state the task began in
I just tried recreating with this flow:
from prefect import task, Flow, Parameter
from datetime import timedelta
nums = Parameter("nums")
@task(max_retries=2, retry_delay=timedelta(seconds=5))
def div(x):
    return 1 / x
with Flow("test") as flow:
    div.map(nums)
flow.run(nums=[0, 1, 2])
and my flow
Failed
after 2 retries, as I would expect
a

Argemiro Neto

10/22/2019, 12:41 AM
and the other two executions were successful, right?
c

Chris White

10/22/2019, 12:43 AM
not sure what you mean - I only ran the flow one time
but the other two
div
tasks were successful, yea, but not the first one
a

Argemiro Neto

10/22/2019, 12:45 AM
ok, I am running again with the
DEBUG
flag...
👍 1
this task is calling a lambda function. I just checked that the function was called only once (with error)
c

Chris White

10/22/2019, 12:46 AM
an AWS lambda function?
or a python lambda function?
a

Argemiro Neto

10/22/2019, 12:47 AM
AWS lambda. I'm not using the lambda task, I'm calling using
boto3
c

Chris White

10/22/2019, 12:48 AM
oh this sounds like it might be a problem with your AWS code then; i would bet that if you put a
print
statement or a
logger
statement both above and below your
boto3
call, you will see them each time the task runs
a

Argemiro Neto

10/22/2019, 12:52 AM
I checked
CloudWatch
. The function was called only 3 times. 2 successful runs and 1 fail. It should have been called at least 4 times if it passed on the second time
c

Chris White

10/22/2019, 12:53 AM
right, so I’m guessing your
boto3
code has some baked in caching or something -> are you sure you aren’t providing some input as a unique identifier to the Lambda call, preventing AWS from triggering it again? You should also put print / logger statements above and below the
boto3
call, that would prove this
a

Argemiro Neto

10/22/2019, 4:41 PM
I'll check again the code to make sure the issue is isolated
c

Chris White

10/22/2019, 4:41 PM
ok cool let me know!
👍 1
a

Argemiro Neto

10/22/2019, 6:23 PM
Found the problem on my retry code. Thank you @Chris White
💯 1
c

Chris White

10/22/2019, 6:23 PM
no problem @Argemiro Neto! glad you figured it out
👍 1