Argemiro Neto
10/22/2019, 12:05 AMtask
being mapped with the following signature:
@task(max_retries=3, retry_delay=timedelta(seconds=5))
def run_sync_loads(config: dict) -> dict:
The `task`'s results are being sent to the following `task`:
@task(max_retries=3, retry_delay=timedelta(seconds=5), trigger=triggers.always_run)
def save_model_config(loads: list, configs: list):
Both are reference tasks to the flow
. The problem is that one of the mapped tasks is failing but not being retried and I have no indication that some tasks failed at the end of the flow. To be precise, the logs show that one retry is being called but the task should fail again. It's not failing, even completing. Any ideas on why this is happening?Chris White
To be precise, the logs show that one retry is being called but the task should fail again. It’s not failing, even completing.could you share some more information here, maybe a copy / paste of the logs?
Chris White
flow.run
, note that retries won’t occur until all tasks have had a chance to run through their pipeline checks (this is in contrast to Cloud execution which is asynchronous)Argemiro Neto
10/22/2019, 12:28 AMArgemiro Neto
10/22/2019, 12:29 AM[2019-10-21 22:45:58,594] INFO - prefect.TaskRunner | Task 'run_sync_loads': Starting task run...
[2019-10-21 22:45:58,616] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': Starting task run...
[2019-10-21 22:46:28,608] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': finished task run for task with final state: 'Retrying'
[2019-10-21 22:46:28,629] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': Starting task run...
[2019-10-21 22:48:45,075] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': finished task run for task with final state: 'Success'
[2019-10-21 22:48:45,100] INFO - prefect.TaskRunner | Task 'run_sync_loads[2]': Starting task run...
[2019-10-21 22:49:04,185] INFO - prefect.TaskRunner | Task 'run_sync_loads[2]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,197] INFO - prefect.TaskRunner | Task 'run_sync_loads': finished task run for task with final state: 'Mapped'
[2019-10-21 22:49:04,225] INFO - prefect.TaskRunner | Task 'save_model_config': Starting task run...
[2019-10-21 22:49:04,234] INFO - prefect.TaskRunner | Task 'save_model_config': finished task run for task with final state: 'Pending'
[2019-10-21 22:49:04,241] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2019-10-21 22:49:04,243] INFO - prefect.FlowRunner | Beginning Flow run for 'Data Platform Sync Control'
[2019-10-21 22:49:04,278] INFO - prefect.TaskRunner | Task 'set_sync_interval': Starting task run...
[2019-10-21 22:49:04,300] INFO - prefect.TaskRunner | Task 'set_sync_interval[0]': Starting task run...
[2019-10-21 22:49:04,311] INFO - prefect.TaskRunner | Task 'set_sync_interval[0]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,336] INFO - prefect.TaskRunner | Task 'set_sync_interval[1]': Starting task run...
[2019-10-21 22:49:04,347] INFO - prefect.TaskRunner | Task 'set_sync_interval[1]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,387] INFO - prefect.TaskRunner | Task 'set_sync_interval': finished task run for task with final state: 'Mapped'
[2019-10-21 22:49:04,415] INFO - prefect.TaskRunner | Task 'run_sync_loads': Starting task run...
[2019-10-21 22:49:04,438] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': Starting task run...
[2019-10-21 22:49:04,452] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,487] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': Starting task run...
[2019-10-21 22:49:04,506] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,565] INFO - prefect.TaskRunner | Task 'run_sync_loads': finished task run for task with final state: 'Mapped'
[2019-10-21 22:49:04,601] INFO - prefect.TaskRunner | Task 'save_model_config': Starting task run...
[2019-10-21 22:49:53,704] INFO - prefect.TaskRunner | Task 'save_model_config': finished task run for task with final state: 'Success'
[2019-10-21 22:49:53,706] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Argemiro Neto
10/22/2019, 12:30 AMSUCCESS
, the task should fail every timeChris White
"DEBUG"
either via environment variable:
export PREFECT__LOGGING__LEVEL="DEBUG"
or in your user config fileArgemiro Neto
10/22/2019, 12:31 AMChris White
Chris White
Chris White
run_sync_loads
are run, whereas the first time there were 3Argemiro Neto
10/22/2019, 12:35 AMChris White
Argemiro Neto
10/22/2019, 12:36 AMChris White
Chris White
from prefect import task, Flow, Parameter
from datetime import timedelta
nums = Parameter("nums")
@task(max_retries=2, retry_delay=timedelta(seconds=5))
def div(x):
return 1 / x
with Flow("test") as flow:
div.map(nums)
flow.run(nums=[0, 1, 2])
and my flow Failed
after 2 retries, as I would expectArgemiro Neto
10/22/2019, 12:41 AMChris White
Chris White
div
tasks were successful, yea, but not the first oneArgemiro Neto
10/22/2019, 12:45 AMDEBUG
flag...Argemiro Neto
10/22/2019, 12:46 AMChris White
Chris White
Argemiro Neto
10/22/2019, 12:47 AMboto3
Chris White
print
statement or a logger
statement both above and below your boto3
call, you will see them each time the task runsArgemiro Neto
10/22/2019, 12:52 AMCloudWatch
. The function was called only 3 times. 2 successful runs and 1 fail. It should have been called at least 4 times if it passed on the second timeChris White
boto3
code has some baked in caching or something -> are you sure you aren’t providing some input as a unique identifier to the Lambda call, preventing AWS from triggering it again? You should also put print / logger statements above and below the boto3
call, that would prove thisArgemiro Neto
10/22/2019, 4:41 PMChris White
Argemiro Neto
10/22/2019, 6:23 PMChris White