https://prefect.io logo
#prefect-server
Title
# prefect-server
l

Lukas N.

06/08/2021, 5:03 PM
Hello 👋 , I've hit an issue while retrying mapped task. It always re-runs the first mapped task. Code and more details in thread
Here is my setup: •
prefect server start --postgres-port=5433
prefect agent start --show-flow-logs
the flow looks like this:
Copy code
import os

import prefect
from prefect import Flow, Parameter, task
from prefect.engine.results import S3Result
from time import sleep


@task
def some_task(x):
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"Entry {x}")
    sleep(10)
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"Done {x}")
    return x


with Flow(
    'map_retry_experiment',
    result=S3Result(
        bucket=os.environ['DATA_BUCKET'],
        location='prefect_results/{flow_name}/'
        '{scheduled_start_time:%Y-%m-%dT%H-%M-%S}/'
        '{task_full_name}-{task_run_id}.prefect_result',
    ),
) as flow:
    nums = Parameter(name='nums', required=True)
    some_task.map(x=nums)
I register the flow, open Prefect UI at http://localhost:8080 and run the task with
nums=[0, 1, 2, 3, 4]
I see following logs:
Copy code
[2021-06-08 18:54:37+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[0]': Starting task run...
[2021-06-08 18:54:37+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Handling state change from Pending to Running
[2021-06-08 18:54:37+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Calling task.run() method...
[2021-06-08 18:54:37+0200] INFO - prefect.some_task[0] | Entry 0
[2021-06-08 18:54:47+0200] INFO - prefect.some_task[0] | Done 0
[2021-06-08 18:54:47+0200] DEBUG - prefect.S3Result | Starting to upload result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[0]-03018502-eff1-4458-b9bf-1a6f9f1c0a4f.prefect_result...
[2021-06-08 18:54:47+0200] DEBUG - prefect.S3Result | Finished uploading result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[0]-03018502-eff1-4458-b9bf-1a6f9f1c0a4f.prefect_result.
[2021-06-08 18:54:47+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Handling state change from Running to Success
[2021-06-08 18:54:48+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[0]': Finished task run for task with final state: 'Success'
... stripped, tasks 1, 2 went the same
[2021-06-08 18:55:09+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[3]': Starting task run...
[2021-06-08 18:55:09+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[3]': Handling state change from Pending to Running
[2021-06-08 18:55:09+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[3]': Calling task.run() method...
[2021-06-08 18:55:09+0200] INFO - prefect.some_task[3] | Entry 3
[2021-06-08 18:55:19+0200] INFO - prefect.some_task[3] | Done 3
[2021-06-08 18:55:19+0200] DEBUG - prefect.S3Result | Starting to upload result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[3]-0a46de8c-17c0-4cae-bb71-a582dd045c02.prefect_result...
[2021-06-08 18:55:19+0200] DEBUG - prefect.S3Result | Finished uploading result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[3]-0a46de8c-17c0-4cae-bb71-a582dd045c02.prefect_result.
[2021-06-08 18:55:19+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[3]': Handling state change from Running to Success
[2021-06-08 18:55:19+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[3]': Finished task run for task with final state: 'Success'
[2021-06-08 18:55:19+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[4]': Starting task run...
[2021-06-08 18:55:19+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[4]': Handling state change from Pending to Running
... at this point I click the Cancel flow run button and flow get's cancelled
the flow gets cancelled, I click the restart button, I expect tasks 0, 1, 2, 3 to be skipped (results are on S3), and the computation should run for 4 only. However, the first mapped child is always recomputed:
Copy code
[2021-06-08 18:55:33+0200] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'map_retry_experiment'
[2021-06-08 18:55:33+0200] DEBUG - prefect.CloudFlowRunner | Using executor type LocalExecutor
[2021-06-08 18:55:33+0200] DEBUG - prefect.CloudFlowRunner | Flow 'map_retry_experiment': Handling state change from Scheduled to Running
[2021-06-08 18:55:33+0200] INFO - prefect.CloudTaskRunner | Task 'some_task': Starting task run...
[2021-06-08 18:55:33+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task': task is already mapped, but run will proceed so children are generated.
[2021-06-08 18:55:33+0200] INFO - prefect.CloudTaskRunner | Task 'some_task': Finished task run for task with final state: 'Mapped'
[2021-06-08 18:55:33+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[0]': Starting task run...
[2021-06-08 18:55:33+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Handling state change from Pending to Running
[2021-06-08 18:55:33+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Calling task.run() method...
[2021-06-08 18:55:33+0200] INFO - prefect.some_task[0] | Entry 0
... ^ the computation for 0 is running! it should be loaded from S3
[2021-06-08 18:55:43+0200] INFO - prefect.some_task[0] | Done 0
[2021-06-08 18:55:43+0200] DEBUG - prefect.S3Result | Starting to upload result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[0]-03018502-eff1-4458-b9bf-1a6f9f1c0a4f.prefect_result...
[2021-06-08 18:55:43+0200] DEBUG - prefect.S3Result | Finished uploading result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[0]-03018502-eff1-4458-b9bf-1a6f9f1c0a4f.prefect_result.
[2021-06-08 18:55:43+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Handling state change from Running to Success
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[0]': Finished task run for task with final state: 'Success'
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[1]': Starting task run...
[2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[1]': Task is already finished.
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[1]': Finished task run for task with final state: 'Success'
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[2]': Starting task run...
[2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[2]': Task is already finished.
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[2]': Finished task run for task with final state: 'Success'
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[3]': Starting task run...
[2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[3]': Task is already finished.
... ^ computations for 1, 2, 3 are not run, instead loaded from S3
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[3]': Finished task run for task with final state: 'Success'
[2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[4]': Starting task run...
[2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[4]': Handling state change from Pending to Running
[2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[4]': Calling task.run() method...
[2021-06-08 18:55:44+0200] INFO - prefect.some_task[4] | Entry 4
[2021-06-08 18:55:48+0200] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 18:55:54+0200] INFO - prefect.some_task[4] | Done 4
[2021-06-08 18:55:54+0200] DEBUG - prefect.S3Result | Starting to upload result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[4]-6180b178-28f6-4075-85a4-fa964df24521.prefect_result...
[2021-06-08 18:55:54+0200] DEBUG - prefect.S3Result | Finished uploading result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[4]-6180b178-28f6-4075-85a4-fa964df24521.prefect_result.
[2021-06-08 18:55:54+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[4]': Handling state change from Running to Success
[2021-06-08 18:55:54+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[4]': Finished task run for task with final state: 'Success'
[2021-06-08 18:55:54+0200] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 18:55:54+0200] INFO - prefect.CloudFlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-06-08 18:55:54+0200] DEBUG - prefect.CloudFlowRunner | Flow 'map_retry_experiment': Handling state change from Running to Success
I would expect the
some_task[0]
to be loaded the same way from S3 as
some_task[1]
..
some_task[3]
are loaded
j

Jenny

06/08/2021, 5:14 PM
Hi @Lukas N. Let me look into that one for you....
Hi @Lukas N. - can you confirm which restart button you are using? Is it the one on the flow run page or the task run page?
l

Lukas N.

06/08/2021, 5:29 PM
Thanks @Jenny for looking into it, I'm using the one on the flow run page
j

Jenny

06/08/2021, 7:04 PM
Thanks @Lukas N. - I think you've found a small bug in our restart logic. Opening a ticket here: @Marvin open "Check for no map index incorrectly returns true when map index is 0" in UI
l

Lukas N.

06/08/2021, 8:59 PM
Hey @Jenny, thanks for opening the issue, I already see a fix for it 👏 . Although I'm surprised the fix is in prefect UI and not in prefect server where I would expect it to be 🤔 . Is it possible there is something similar with falsey value? Just to clarify my example, it was the easiest and smallest reproducible example I could come up with. In reality I'm not using the "cancel flow" button, in fact, my flow is executed on schedule in Kubernetes that runs on AWS EC2 Spot instances, where it may happen that the node is killed (therefore the flow run is killed when running). In such a case, the heartbeat system fails and the flow run is automatically restarted - and the map issue is there. No input from UI happens in this case. Sorry for misleading you, maybe a better example would be to kill the process that corresponds to the flow run on my local machine, rather than using the cancel button.
j

Jenny

06/08/2021, 9:02 PM
Hi @Lukas N. - the UI repo works for both prefect server and prefect cloud but you maybe found a UI bug that you didn't mean to. So to clarify - this happens without using the UI Restart button? And also comes from Lazarus restarts?
l

Lukas N.

06/08/2021, 9:11 PM
yes, this happens without using the UI restart button. In fact there is no UI used in the process at all. The flow runs on a schedule, but sometimes the node it runs on fails, so the flow run loses heartbeat. Then it's automatically restarted by Lazarus. From logs I've observed that restarted flows re-run the 0 mapped tasks, which shouldn't happen, because it was already run in the original (failed) flow run and the result is present on S3
😂 1
j

Jenny

06/08/2021, 9:12 PM
Well glad we got that straight. I wonder if there's similar logic in server. Let me look into that.
l

Lukas N.

06/08/2021, 10:54 PM
I did a bit of debugging and narrowed the search to the task runner to this line https://github.com/PrefectHQ/prefect/blob/1867f205405e1f68b828ce9dcf00bbd4969903bc/src/prefect/engine/task_runner.py#L236 which calls this cloud task runner method
initialize_run
https://github.com/PrefectHQ/prefect/blob/1867f205405e1f68b828ce9dcf00bbd4969903bc/src/prefect/engine/cloud/task_runner.py#L154-L158 when I do call (from debugger)
Copy code
task_run_info = self.client.get_task_run_info(
    flow_run_id=context.get("flow_run_id", ""),
    task_id=context.get("task_id", ""),
    map_index=0, # force to index 0
)
i get
task_run_info.state = Pending
, but when call with
map_index=1
I get
task_run_info.state = Success
, when I try
map_index=3
(which didn't run yet) I also get
task_run_info.state = Pending
. If the state is pending, this check https://github.com/PrefectHQ/prefect/blob/1867f205405e1f68b828ce9dcf00bbd4969903bc/src/prefect/engine/task_runner.py#L248-L249 passes, when the state is
Success
it
raise ENDRUN
instead, which is the big difference. So I think the issue is that for map_index=0, the state got somehow overridden from
Success
to
Pending
, but I'm not exactly sure where 😕
j

Jenny

06/09/2021, 12:12 AM
Thanks @Lukas N. Let’s open a follow up issue on server! @Marvin open “0 index mapped tasks get marked for retry when they ran successfully” in server
l

Lukas N.

06/09/2021, 6:52 AM
@Jenny Actually I got it completely wrong 🙁 Talked with my colleague this morning and he says those restarts were actually not by lazarus, but manually through the restart button. And yesterday I was debugging the wrong thing. I'm confident that the fix in the UI will solve our troubles 🙂 So I'll close the issue in the server again, I don't want anybody to chase ghosts that are not there
Actually I cannot close it, so at least I left a comment 🙏
j

Jenny

06/09/2021, 10:12 AM
Thanks for following up. I’ll close the issue.
👍 1
2 Views