https://prefect.io logo
Title
i

Ilya Sapunov

06/10/2022, 3:31 PM
Hi I have the same issue as in this thread with loosing value of “task_loop_result” on retries https://prefect-community.slack.com/archives/CL09KU1K7/p1650958214772249 More code inside
error_task = ShellTask(
    name="error",
    stream_output=True,
)

messages = ["HELLO", "WORLD"]

@task(max_retries=5, retry_delay=datetime.timedelta(seconds=5))
def echo(start_date, message):
    current_date = prefect.context.get('task_loop_result', start_date)
    if current_date >= datetime.datetime.now(current_date.tzinfo):
        return True

    error_task.run(
        command="python error.py",
        env={
            "MESSAGE": message,
            "DATE": current_date.date().isoformat()
        },
    )
    print(f"Got data for {current_date.isoformat()}")
    raise LOOP(
        message=f"Got data for {current_date.isoformat()}",
        result=(current_date + datetime.timedelta(days=1)),
    )


with Flow("test") as flow:
        date = DateTimeParameter("date")
        echo.map(unmapped(date), messages)

flow_state = flow.run(date="2022-05-01")
^ this is my flow code
import os
import random

print(os.getenv("MESSAGE"))

if random.random() * 100 > 70:
    raise Exception
^ And this is my error.py code which I called from shell task
I expect that when my loop falls it will retry with same date
But real behavior is unpredictable, it can retry from last successful date, but also can start from the beginning
k

Kevin Kho

06/10/2022, 4:17 PM
Will take a look at this in a bit
i

Ilya Sapunov

06/14/2022, 8:30 AM
Is there any updates on that? 🙂
k

Kevin Kho

06/14/2022, 2:10 PM
Hey thanks for pinging! So sorry I was too swamped with non-community stuff recently. Will definitely have time today
Testing now
I think it’s working fine, but the logs get intertwined. Test this:
from prefect import Flow, task 
from prefect.engine.signals import LOOP
import datetime
import prefect
import random

@task(max_retries=5, retry_delay=datetime.timedelta(seconds=3))
def echo(start_date):
    current_date = prefect.context.get('task_loop_result', start_date)
    if current_date >= datetime.datetime.now(current_date.tzinfo):
        return True
    x = random.random()
    if x < 0.1:
        raise ValueError()
    loop_count = prefect.context.get("task_loop_count")
    print(f"{start_date}  {current_date}-{loop_count}")
    raise LOOP(
        message=f"Got data for {current_date.isoformat()}",
        result=(current_date + datetime.timedelta(days=1)),
    )

with Flow("test") as flow:
    echo.map([datetime.datetime(2022, 6, 1), datetime.datetime(2022, 6, 2)])

flow.run()
The thing that happens is there are two mapped tasks, and mid loop it can jump to the other one before firing off the retry. But if I look at the logs, the runs are as expected in terms of total runs