Lukas N.
06/08/2021, 5:03 PMprefect server start --postgres-port=5433
• prefect agent start --show-flow-logs
the flow looks like this:
import os
import prefect
from prefect import Flow, Parameter, task
from prefect.engine.results import S3Result
from time import sleep
@task
def some_task(x):
<http://prefect.context.logger.info|prefect.context.logger.info>(f"Entry {x}")
sleep(10)
<http://prefect.context.logger.info|prefect.context.logger.info>(f"Done {x}")
return x
with Flow(
'map_retry_experiment',
result=S3Result(
bucket=os.environ['DATA_BUCKET'],
location='prefect_results/{flow_name}/'
'{scheduled_start_time:%Y-%m-%dT%H-%M-%S}/'
'{task_full_name}-{task_run_id}.prefect_result',
),
) as flow:
nums = Parameter(name='nums', required=True)
some_task.map(x=nums)
I register the flow, open Prefect UI at http://localhost:8080 and run the task with nums=[0, 1, 2, 3, 4]
I see following logs:
[2021-06-08 18:54:37+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[0]': Starting task run...
[2021-06-08 18:54:37+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Handling state change from Pending to Running
[2021-06-08 18:54:37+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Calling task.run() method...
[2021-06-08 18:54:37+0200] INFO - prefect.some_task[0] | Entry 0
[2021-06-08 18:54:47+0200] INFO - prefect.some_task[0] | Done 0
[2021-06-08 18:54:47+0200] DEBUG - prefect.S3Result | Starting to upload result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[0]-03018502-eff1-4458-b9bf-1a6f9f1c0a4f.prefect_result...
[2021-06-08 18:54:47+0200] DEBUG - prefect.S3Result | Finished uploading result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[0]-03018502-eff1-4458-b9bf-1a6f9f1c0a4f.prefect_result.
[2021-06-08 18:54:47+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Handling state change from Running to Success
[2021-06-08 18:54:48+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[0]': Finished task run for task with final state: 'Success'
... stripped, tasks 1, 2 went the same
[2021-06-08 18:55:09+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[3]': Starting task run...
[2021-06-08 18:55:09+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[3]': Handling state change from Pending to Running
[2021-06-08 18:55:09+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[3]': Calling task.run() method...
[2021-06-08 18:55:09+0200] INFO - prefect.some_task[3] | Entry 3
[2021-06-08 18:55:19+0200] INFO - prefect.some_task[3] | Done 3
[2021-06-08 18:55:19+0200] DEBUG - prefect.S3Result | Starting to upload result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[3]-0a46de8c-17c0-4cae-bb71-a582dd045c02.prefect_result...
[2021-06-08 18:55:19+0200] DEBUG - prefect.S3Result | Finished uploading result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[3]-0a46de8c-17c0-4cae-bb71-a582dd045c02.prefect_result.
[2021-06-08 18:55:19+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[3]': Handling state change from Running to Success
[2021-06-08 18:55:19+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[3]': Finished task run for task with final state: 'Success'
[2021-06-08 18:55:19+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[4]': Starting task run...
[2021-06-08 18:55:19+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[4]': Handling state change from Pending to Running
... at this point I click the Cancel flow run button and flow get's cancelled
the flow gets cancelled, I click the restart button, I expect tasks 0, 1, 2, 3 to be skipped (results are on S3), and the computation should run for 4 only. However, the first mapped child is always recomputed:
[2021-06-08 18:55:33+0200] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'map_retry_experiment'
[2021-06-08 18:55:33+0200] DEBUG - prefect.CloudFlowRunner | Using executor type LocalExecutor
[2021-06-08 18:55:33+0200] DEBUG - prefect.CloudFlowRunner | Flow 'map_retry_experiment': Handling state change from Scheduled to Running
[2021-06-08 18:55:33+0200] INFO - prefect.CloudTaskRunner | Task 'some_task': Starting task run...
[2021-06-08 18:55:33+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task': task is already mapped, but run will proceed so children are generated.
[2021-06-08 18:55:33+0200] INFO - prefect.CloudTaskRunner | Task 'some_task': Finished task run for task with final state: 'Mapped'
[2021-06-08 18:55:33+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[0]': Starting task run...
[2021-06-08 18:55:33+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Handling state change from Pending to Running
[2021-06-08 18:55:33+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Calling task.run() method...
[2021-06-08 18:55:33+0200] INFO - prefect.some_task[0] | Entry 0
... ^ the computation for 0 is running! it should be loaded from S3
[2021-06-08 18:55:43+0200] INFO - prefect.some_task[0] | Done 0
[2021-06-08 18:55:43+0200] DEBUG - prefect.S3Result | Starting to upload result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[0]-03018502-eff1-4458-b9bf-1a6f9f1c0a4f.prefect_result...
[2021-06-08 18:55:43+0200] DEBUG - prefect.S3Result | Finished uploading result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[0]-03018502-eff1-4458-b9bf-1a6f9f1c0a4f.prefect_result.
[2021-06-08 18:55:43+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Handling state change from Running to Success
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[0]': Finished task run for task with final state: 'Success'
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[1]': Starting task run...
[2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[1]': Task is already finished.
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[1]': Finished task run for task with final state: 'Success'
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[2]': Starting task run...
[2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[2]': Task is already finished.
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[2]': Finished task run for task with final state: 'Success'
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[3]': Starting task run...
[2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[3]': Task is already finished.
... ^ computations for 1, 2, 3 are not run, instead loaded from S3
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[3]': Finished task run for task with final state: 'Success'
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[4]': Starting task run...
[2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[4]': Handling state change from Pending to Running
[2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[4]': Calling task.run() method...
[2021-06-08 18:55:44+0200] INFO - prefect.some_task[4] | Entry 4
[2021-06-08 18:55:48+0200] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 18:55:54+0200] INFO - prefect.some_task[4] | Done 4
[2021-06-08 18:55:54+0200] DEBUG - prefect.S3Result | Starting to upload result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[4]-6180b178-28f6-4075-85a4-fa964df24521.prefect_result...
[2021-06-08 18:55:54+0200] DEBUG - prefect.S3Result | Finished uploading result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[4]-6180b178-28f6-4075-85a4-fa964df24521.prefect_result.
[2021-06-08 18:55:54+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[4]': Handling state change from Running to Success
[2021-06-08 18:55:54+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[4]': Finished task run for task with final state: 'Success'
[2021-06-08 18:55:54+0200] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 18:55:54+0200] INFO - prefect.CloudFlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-06-08 18:55:54+0200] DEBUG - prefect.CloudFlowRunner | Flow 'map_retry_experiment': Handling state change from Running to Success
some_task[0]
to be loaded the same way from S3 as some_task[1]
.. some_task[3]
are loadedJenny
06/08/2021, 5:14 PMLukas N.
06/08/2021, 5:29 PMJenny
06/08/2021, 7:04 PMMarvin
06/08/2021, 7:04 PMLukas N.
06/08/2021, 8:59 PMJenny
06/08/2021, 9:02 PMLukas N.
06/08/2021, 9:11 PMJenny
06/08/2021, 9:12 PMLukas N.
06/08/2021, 10:54 PMinitialize_run
https://github.com/PrefectHQ/prefect/blob/1867f205405e1f68b828ce9dcf00bbd4969903bc/src/prefect/engine/cloud/task_runner.py#L154-L158
when I do call (from debugger)
task_run_info = self.client.get_task_run_info(
flow_run_id=context.get("flow_run_id", ""),
task_id=context.get("task_id", ""),
map_index=0, # force to index 0
)
i get task_run_info.state = Pending
, but when call with map_index=1
I get task_run_info.state = Success
, when I try map_index=3
(which didn't run yet) I also get task_run_info.state = Pending
.
If the state is pending, this check https://github.com/PrefectHQ/prefect/blob/1867f205405e1f68b828ce9dcf00bbd4969903bc/src/prefect/engine/task_runner.py#L248-L249 passes, when the state is Success
it raise ENDRUN
instead, which is the big difference.
So I think the issue is that for map_index=0, the state got somehow overridden from Success
to Pending
, but I'm not exactly sure where 😕Jenny
06/09/2021, 12:12 AMMarvin
06/09/2021, 12:12 AMLukas N.
06/09/2021, 6:52 AMJenny
06/09/2021, 10:12 AM