Dave
04/26/2022, 7:30 AMprefect.engine.signals.LOOP
, where the task is suddenly is missing the context on retries.
e.g:
index = prefect.context.get('task_loop_result', {}).get('index', 0)
See more code in the thread 🧵@task(max_retries=14, retry_delay=pendulum.duration(minutes=15))
def fetch(queries):
index = prefect.context.get('task_loop_result', {}).get('index', 0)
url = f'URL_GOES_HERE?{queries[index]}'
r = <http://requests.post|requests.post>(url, timeout=300)
if not r.ok:
raise FAIL(r.text)
if index == len(queries)-1:
return
raise LOOP(result=dict(index=index+1))
It runs through the first three runs, but fails on the 4th run, and then is unable to continue:
Error received on retry:
TypeError: 'NoneType' object is not subscriptable
Agent is using version 1.2.0, and is a k8s agent.Anna Geller
import requests
from datetime import timedelta
import prefect
from prefect import task, Flow, Parameter
from prefect.engine.signals import LOOP
@task(max_retries=5, retry_delay=timedelta(seconds=2))
def compute_large_fibonacci(M):
# we extract the accumulated task loop result from context
loop_payload = prefect.context.get("task_loop_result", {})
n = loop_payload.get("n", 1)
fib = loop_payload.get("fib", 1)
next_fib = <http://requests.post|requests.post>(
"<https://nemo.api.stdlib.com/fibonacci@0.0.1/>", data={"nth": n}
).json()
if next_fib > M:
return fib # return statements end the loop
raise LOOP(message=f"Fib {n}={next_fib}", result=dict(n=n + 1, fib=next_fib))
with Flow("fibonacci") as flow:
M = Parameter("M")
fib_num = compute_large_fibonacci(M)
Dave
04/27/2022, 9:04 AMprefect.context.get()
inside the task?Anna Geller
sometimes it works and sometimes it doesn'tas stupid as it may sound, this type of problem can often be solved by: