Thread
#prefect-community
    Ilya Sapunov

    Ilya Sapunov

    3 months ago
    Hi I have the same issue as in this thread with loosing value of “task_loop_result” on retrieshttps://prefect-community.slack.com/archives/CL09KU1K7/p1650958214772249 More code inside
    error_task = ShellTask(
        name="error",
        stream_output=True,
    )
    
    messages = ["HELLO", "WORLD"]
    
    @task(max_retries=5, retry_delay=datetime.timedelta(seconds=5))
    def echo(start_date, message):
        current_date = prefect.context.get('task_loop_result', start_date)
        if current_date >= datetime.datetime.now(current_date.tzinfo):
            return True
    
        error_task.run(
            command="python error.py",
            env={
                "MESSAGE": message,
                "DATE": current_date.date().isoformat()
            },
        )
        print(f"Got data for {current_date.isoformat()}")
        raise LOOP(
            message=f"Got data for {current_date.isoformat()}",
            result=(current_date + datetime.timedelta(days=1)),
        )
    
    
    with Flow("test") as flow:
            date = DateTimeParameter("date")
            echo.map(unmapped(date), messages)
    
    flow_state = flow.run(date="2022-05-01")
    ^ this is my flow code
    import os
    import random
    
    print(os.getenv("MESSAGE"))
    
    if random.random() * 100 > 70:
        raise Exception
    ^ And this is my error.py code which I called from shell task
    I expect that when my loop falls it will retry with same date
    But real behavior is unpredictable, it can retry from last successful date, but also can start from the beginning
    Kevin Kho

    Kevin Kho

    3 months ago
    Will take a look at this in a bit
    Ilya Sapunov

    Ilya Sapunov

    3 months ago
    Is there any updates on that? 🙂
    Kevin Kho

    Kevin Kho

    3 months ago
    Hey thanks for pinging! So sorry I was too swamped with non-community stuff recently. Will definitely have time today
    Testing now
    I think it’s working fine, but the logs get intertwined. Test this:
    from prefect import Flow, task 
    from prefect.engine.signals import LOOP
    import datetime
    import prefect
    import random
    
    @task(max_retries=5, retry_delay=datetime.timedelta(seconds=3))
    def echo(start_date):
        current_date = prefect.context.get('task_loop_result', start_date)
        if current_date >= datetime.datetime.now(current_date.tzinfo):
            return True
        x = random.random()
        if x < 0.1:
            raise ValueError()
        loop_count = prefect.context.get("task_loop_count")
        print(f"{start_date}  {current_date}-{loop_count}")
        raise LOOP(
            message=f"Got data for {current_date.isoformat()}",
            result=(current_date + datetime.timedelta(days=1)),
        )
    
    with Flow("test") as flow:
        echo.map([datetime.datetime(2022, 6, 1), datetime.datetime(2022, 6, 2)])
    
    flow.run()
    The thing that happens is there are two mapped tasks, and mid loop it can jump to the other one before firing off the retry. But if I look at the logs, the runs are as expected in terms of total runs