l

    Lukas N.

    1 year ago
    Hello 👋 , I've hit an issue while retrying mapped task. It always re-runs the first mapped task. Code and more details in thread
    Here is my setup: •
    prefect server start --postgres-port=5433
    prefect agent start --show-flow-logs
    the flow looks like this:
    import os
    
    import prefect
    from prefect import Flow, Parameter, task
    from prefect.engine.results import S3Result
    from time import sleep
    
    
    @task
    def some_task(x):
        <http://prefect.context.logger.info|prefect.context.logger.info>(f"Entry {x}")
        sleep(10)
        <http://prefect.context.logger.info|prefect.context.logger.info>(f"Done {x}")
        return x
    
    
    with Flow(
        'map_retry_experiment',
        result=S3Result(
            bucket=os.environ['DATA_BUCKET'],
            location='prefect_results/{flow_name}/'
            '{scheduled_start_time:%Y-%m-%dT%H-%M-%S}/'
            '{task_full_name}-{task_run_id}.prefect_result',
        ),
    ) as flow:
        nums = Parameter(name='nums', required=True)
        some_task.map(x=nums)
    I register the flow, open Prefect UI at http://localhost:8080 and run the task with
    nums=[0, 1, 2, 3, 4]
    I see following logs:
    [2021-06-08 18:54:37+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[0]': Starting task run...
    [2021-06-08 18:54:37+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Handling state change from Pending to Running
    [2021-06-08 18:54:37+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Calling task.run() method...
    [2021-06-08 18:54:37+0200] INFO - prefect.some_task[0] | Entry 0
    [2021-06-08 18:54:47+0200] INFO - prefect.some_task[0] | Done 0
    [2021-06-08 18:54:47+0200] DEBUG - prefect.S3Result | Starting to upload result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[0]-03018502-eff1-4458-b9bf-1a6f9f1c0a4f.prefect_result...
    [2021-06-08 18:54:47+0200] DEBUG - prefect.S3Result | Finished uploading result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[0]-03018502-eff1-4458-b9bf-1a6f9f1c0a4f.prefect_result.
    [2021-06-08 18:54:47+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Handling state change from Running to Success
    [2021-06-08 18:54:48+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[0]': Finished task run for task with final state: 'Success'
    ... stripped, tasks 1, 2 went the same
    [2021-06-08 18:55:09+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[3]': Starting task run...
    [2021-06-08 18:55:09+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[3]': Handling state change from Pending to Running
    [2021-06-08 18:55:09+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[3]': Calling task.run() method...
    [2021-06-08 18:55:09+0200] INFO - prefect.some_task[3] | Entry 3
    [2021-06-08 18:55:19+0200] INFO - prefect.some_task[3] | Done 3
    [2021-06-08 18:55:19+0200] DEBUG - prefect.S3Result | Starting to upload result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[3]-0a46de8c-17c0-4cae-bb71-a582dd045c02.prefect_result...
    [2021-06-08 18:55:19+0200] DEBUG - prefect.S3Result | Finished uploading result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[3]-0a46de8c-17c0-4cae-bb71-a582dd045c02.prefect_result.
    [2021-06-08 18:55:19+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[3]': Handling state change from Running to Success
    [2021-06-08 18:55:19+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[3]': Finished task run for task with final state: 'Success'
    [2021-06-08 18:55:19+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[4]': Starting task run...
    [2021-06-08 18:55:19+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[4]': Handling state change from Pending to Running
    ... at this point I click the Cancel flow run button and flow get's cancelled
    the flow gets cancelled, I click the restart button, I expect tasks 0, 1, 2, 3 to be skipped (results are on S3), and the computation should run for 4 only. However, the first mapped child is always recomputed:
    [2021-06-08 18:55:33+0200] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'map_retry_experiment'
    [2021-06-08 18:55:33+0200] DEBUG - prefect.CloudFlowRunner | Using executor type LocalExecutor
    [2021-06-08 18:55:33+0200] DEBUG - prefect.CloudFlowRunner | Flow 'map_retry_experiment': Handling state change from Scheduled to Running
    [2021-06-08 18:55:33+0200] INFO - prefect.CloudTaskRunner | Task 'some_task': Starting task run...
    [2021-06-08 18:55:33+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task': task is already mapped, but run will proceed so children are generated.
    [2021-06-08 18:55:33+0200] INFO - prefect.CloudTaskRunner | Task 'some_task': Finished task run for task with final state: 'Mapped'
    [2021-06-08 18:55:33+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[0]': Starting task run...
    [2021-06-08 18:55:33+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Handling state change from Pending to Running
    [2021-06-08 18:55:33+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Calling task.run() method...
    [2021-06-08 18:55:33+0200] INFO - prefect.some_task[0] | Entry 0
    ... ^ the computation for 0 is running! it should be loaded from S3
    [2021-06-08 18:55:43+0200] INFO - prefect.some_task[0] | Done 0
    [2021-06-08 18:55:43+0200] DEBUG - prefect.S3Result | Starting to upload result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[0]-03018502-eff1-4458-b9bf-1a6f9f1c0a4f.prefect_result...
    [2021-06-08 18:55:43+0200] DEBUG - prefect.S3Result | Finished uploading result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[0]-03018502-eff1-4458-b9bf-1a6f9f1c0a4f.prefect_result.
    [2021-06-08 18:55:43+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[0]': Handling state change from Running to Success
    [2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[0]': Finished task run for task with final state: 'Success'
    [2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[1]': Starting task run...
    [2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[1]': Task is already finished.
    [2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[1]': Finished task run for task with final state: 'Success'
    [2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[2]': Starting task run...
    [2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[2]': Task is already finished.
    [2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[2]': Finished task run for task with final state: 'Success'
    [2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[3]': Starting task run...
    [2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[3]': Task is already finished.
    ... ^ computations for 1, 2, 3 are not run, instead loaded from S3
    [2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[3]': Finished task run for task with final state: 'Success'
    [2021-06-08 18:55:44+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[4]': Starting task run...
    [2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[4]': Handling state change from Pending to Running
    [2021-06-08 18:55:44+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[4]': Calling task.run() method...
    [2021-06-08 18:55:44+0200] INFO - prefect.some_task[4] | Entry 4
    [2021-06-08 18:55:48+0200] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 18:55:54+0200] INFO - prefect.some_task[4] | Done 4
    [2021-06-08 18:55:54+0200] DEBUG - prefect.S3Result | Starting to upload result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[4]-6180b178-28f6-4075-85a4-fa964df24521.prefect_result...
    [2021-06-08 18:55:54+0200] DEBUG - prefect.S3Result | Finished uploading result to prefect_results/map_retry_experiment/2021-06-08T16-54-35/some_task[4]-6180b178-28f6-4075-85a4-fa964df24521.prefect_result.
    [2021-06-08 18:55:54+0200] DEBUG - prefect.CloudTaskRunner | Task 'some_task[4]': Handling state change from Running to Success
    [2021-06-08 18:55:54+0200] INFO - prefect.CloudTaskRunner | Task 'some_task[4]': Finished task run for task with final state: 'Success'
    [2021-06-08 18:55:54+0200] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 18:55:54+0200] INFO - prefect.CloudFlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2021-06-08 18:55:54+0200] DEBUG - prefect.CloudFlowRunner | Flow 'map_retry_experiment': Handling state change from Running to Success
    I would expect the
    some_task[0]
    to be loaded the same way from S3 as
    some_task[1]
    ..
    some_task[3]
    are loaded
    Jenny

    Jenny

    1 year ago
    Hi @Lukas N. Let me look into that one for you....
    Hi @Lukas N. - can you confirm which restart button you are using? Is it the one on the flow run page or the task run page?
    l

    Lukas N.

    1 year ago
    Thanks @Jenny for looking into it, I'm using the one on the flow run page
    Jenny

    Jenny

    1 year ago
    Thanks @Lukas N. - I think you've found a small bug in our restart logic. Opening a ticket here: @Marvin open "Check for no map index incorrectly returns true when map index is 0" in UI
    Marvin

    Marvin

    1 year ago
    l

    Lukas N.

    1 year ago
    Hey @Jenny, thanks for opening the issue, I already see a fix for it 👏 . Although I'm surprised the fix is in prefect UI and not in prefect server where I would expect it to be 🤔 . Is it possible there is something similar with falsey value? Just to clarify my example, it was the easiest and smallest reproducible example I could come up with. In reality I'm not using the "cancel flow" button, in fact, my flow is executed on schedule in Kubernetes that runs on AWS EC2 Spot instances, where it may happen that the node is killed (therefore the flow run is killed when running). In such a case, the heartbeat system fails and the flow run is automatically restarted - and the map issue is there. No input from UI happens in this case. Sorry for misleading you, maybe a better example would be to kill the process that corresponds to the flow run on my local machine, rather than using the cancel button.
    Jenny

    Jenny

    1 year ago
    Hi @Lukas N. - the UI repo works for both prefect server and prefect cloud but you maybe found a UI bug that you didn't mean to. So to clarify - this happens without using the UI Restart button? And also comes from Lazarus restarts?
    l

    Lukas N.

    1 year ago
    yes, this happens without using the UI restart button. In fact there is no UI used in the process at all. The flow runs on a schedule, but sometimes the node it runs on fails, so the flow run loses heartbeat. Then it's automatically restarted by Lazarus. From logs I've observed that restarted flows re-run the 0 mapped tasks, which shouldn't happen, because it was already run in the original (failed) flow run and the result is present on S3
    Jenny

    Jenny

    1 year ago
    Well glad we got that straight. I wonder if there's similar logic in server. Let me look into that.
    l

    Lukas N.

    1 year ago
    I did a bit of debugging and narrowed the search to the task runner to this line https://github.com/PrefectHQ/prefect/blob/1867f205405e1f68b828ce9dcf00bbd4969903bc/src/prefect/engine/task_runner.py#L236 which calls this cloud task runner method
    initialize_run
    https://github.com/PrefectHQ/prefect/blob/1867f205405e1f68b828ce9dcf00bbd4969903bc/src/prefect/engine/cloud/task_runner.py#L154-L158 when I do call (from debugger)
    task_run_info = self.client.get_task_run_info(
        flow_run_id=context.get("flow_run_id", ""),
        task_id=context.get("task_id", ""),
        map_index=0, # force to index 0
    )
    i get
    task_run_info.state = Pending
    , but when call with
    map_index=1
    I get
    task_run_info.state = Success
    , when I try
    map_index=3
    (which didn't run yet) I also get
    task_run_info.state = Pending
    . If the state is pending, this check https://github.com/PrefectHQ/prefect/blob/1867f205405e1f68b828ce9dcf00bbd4969903bc/src/prefect/engine/task_runner.py#L248-L249 passes, when the state is
    Success
    it
    raise ENDRUN
    instead, which is the big difference. So I think the issue is that for map_index=0, the state got somehow overridden from
    Success
    to
    Pending
    , but I'm not exactly sure where 😕
    Jenny

    Jenny

    1 year ago
    Thanks @Lukas N. Let’s open a follow up issue on server! @Marvin open “0 index mapped tasks get marked for retry when they ran successfully” in server
    Marvin

    Marvin

    1 year ago
    l

    Lukas N.

    1 year ago
    @Jenny Actually I got it completely wrong 🙁 Talked with my colleague this morning and he says those restarts were actually not by lazarus, but manually through the restart button. And yesterday I was debugging the wrong thing. I'm confident that the fix in the UI will solve our troubles 🙂 So I'll close the issue in the server again, I don't want anybody to chase ghosts that are not there
    Actually I cannot close it, so at least I left a comment 🙏
    Jenny

    Jenny

    1 year ago
    Thanks for following up. I’ll close the issue.