Argemiro Neto
10/22/2019, 12:05 AMtask
being mapped with the following signature:
@task(max_retries=3, retry_delay=timedelta(seconds=5))
def run_sync_loads(config: dict) -> dict:
The `task`'s results are being sent to the following `task`:
@task(max_retries=3, retry_delay=timedelta(seconds=5), trigger=triggers.always_run)
def save_model_config(loads: list, configs: list):
Both are reference tasks to the flow
. The problem is that one of the mapped tasks is failing but not being retried and I have no indication that some tasks failed at the end of the flow. To be precise, the logs show that one retry is being called but the task should fail again. It's not failing, even completing. Any ideas on why this is happening?Chris White
10/22/2019, 12:11 AMTo be precise, the logs show that one retry is being called but the task should fail again. It’s not failing, even completing.could you share some more information here, maybe a copy / paste of the logs?
flow.run
, note that retries won’t occur until all tasks have had a chance to run through their pipeline checks (this is in contrast to Cloud execution which is asynchronous)Argemiro Neto
10/22/2019, 12:28 AM[2019-10-21 22:45:58,594] INFO - prefect.TaskRunner | Task 'run_sync_loads': Starting task run...
[2019-10-21 22:45:58,616] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': Starting task run...
[2019-10-21 22:46:28,608] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': finished task run for task with final state: 'Retrying'
[2019-10-21 22:46:28,629] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': Starting task run...
[2019-10-21 22:48:45,075] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': finished task run for task with final state: 'Success'
[2019-10-21 22:48:45,100] INFO - prefect.TaskRunner | Task 'run_sync_loads[2]': Starting task run...
[2019-10-21 22:49:04,185] INFO - prefect.TaskRunner | Task 'run_sync_loads[2]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,197] INFO - prefect.TaskRunner | Task 'run_sync_loads': finished task run for task with final state: 'Mapped'
[2019-10-21 22:49:04,225] INFO - prefect.TaskRunner | Task 'save_model_config': Starting task run...
[2019-10-21 22:49:04,234] INFO - prefect.TaskRunner | Task 'save_model_config': finished task run for task with final state: 'Pending'
[2019-10-21 22:49:04,241] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2019-10-21 22:49:04,243] INFO - prefect.FlowRunner | Beginning Flow run for 'Data Platform Sync Control'
[2019-10-21 22:49:04,278] INFO - prefect.TaskRunner | Task 'set_sync_interval': Starting task run...
[2019-10-21 22:49:04,300] INFO - prefect.TaskRunner | Task 'set_sync_interval[0]': Starting task run...
[2019-10-21 22:49:04,311] INFO - prefect.TaskRunner | Task 'set_sync_interval[0]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,336] INFO - prefect.TaskRunner | Task 'set_sync_interval[1]': Starting task run...
[2019-10-21 22:49:04,347] INFO - prefect.TaskRunner | Task 'set_sync_interval[1]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,387] INFO - prefect.TaskRunner | Task 'set_sync_interval': finished task run for task with final state: 'Mapped'
[2019-10-21 22:49:04,415] INFO - prefect.TaskRunner | Task 'run_sync_loads': Starting task run...
[2019-10-21 22:49:04,438] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': Starting task run...
[2019-10-21 22:49:04,452] INFO - prefect.TaskRunner | Task 'run_sync_loads[0]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,487] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': Starting task run...
[2019-10-21 22:49:04,506] INFO - prefect.TaskRunner | Task 'run_sync_loads[1]': finished task run for task with final state: 'Success'
[2019-10-21 22:49:04,565] INFO - prefect.TaskRunner | Task 'run_sync_loads': finished task run for task with final state: 'Mapped'
[2019-10-21 22:49:04,601] INFO - prefect.TaskRunner | Task 'save_model_config': Starting task run...
[2019-10-21 22:49:53,704] INFO - prefect.TaskRunner | Task 'save_model_config': finished task run for task with final state: 'Success'
[2019-10-21 22:49:53,706] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
SUCCESS
, the task should fail every timeChris White
10/22/2019, 12:30 AM"DEBUG"
either via environment variable:
export PREFECT__LOGGING__LEVEL="DEBUG"
or in your user config fileArgemiro Neto
10/22/2019, 12:31 AMChris White
10/22/2019, 12:32 AMrun_sync_loads
are run, whereas the first time there were 3Argemiro Neto
10/22/2019, 12:35 AMChris White
10/22/2019, 12:36 AMArgemiro Neto
10/22/2019, 12:36 AMChris White
10/22/2019, 12:36 AMfrom prefect import task, Flow, Parameter
from datetime import timedelta
nums = Parameter("nums")
@task(max_retries=2, retry_delay=timedelta(seconds=5))
def div(x):
return 1 / x
with Flow("test") as flow:
div.map(nums)
flow.run(nums=[0, 1, 2])
and my flow Failed
after 2 retries, as I would expectArgemiro Neto
10/22/2019, 12:41 AMChris White
10/22/2019, 12:43 AMdiv
tasks were successful, yea, but not the first oneArgemiro Neto
10/22/2019, 12:45 AMDEBUG
flag...Chris White
10/22/2019, 12:46 AMArgemiro Neto
10/22/2019, 12:47 AMboto3
Chris White
10/22/2019, 12:48 AMprint
statement or a logger
statement both above and below your boto3
call, you will see them each time the task runsArgemiro Neto
10/22/2019, 12:52 AMCloudWatch
. The function was called only 3 times. 2 successful runs and 1 fail. It should have been called at least 4 times if it passed on the second timeChris White
10/22/2019, 12:53 AMboto3
code has some baked in caching or something -> are you sure you aren’t providing some input as a unique identifier to the Lambda call, preventing AWS from triggering it again? You should also put print / logger statements above and below the boto3
call, that would prove thisArgemiro Neto
10/22/2019, 4:41 PMChris White
10/22/2019, 4:41 PMArgemiro Neto
10/22/2019, 6:23 PMChris White
10/22/2019, 6:23 PM