I am trying to raise a RETRY signal inside a mappe...
# ask-community
t
I am trying to raise a RETRY signal inside a mapped Task. The follow up task is using
flatten
to gather all of the results back up. When
RETRY
is raised I am getting the following:
TypeError: object of type 'RETRY' has no len()
n
Hi @Tim Enders - could you share a min example of your code?
t
Copy code
items = get_items_list.map(
        unmapped(client),
        unmapped(entity),
        page=pages,
        path=unmapped(path),
        response_attr=unmapped(response_attr),
    )

    frames = transactions_dataframe(flatten(items), api_name)
that is the Flow definition of the two tasks in question
Copy code
if res.status_code == 500:
        logger.error(f"Got a 500 error on page {page}")
        raise signals.RETRY(f"Retrying {res.url}")
    else:
        resp = res.json()
    try:
        # breakpoint()
        return resp[response_attr]
    except KeyError as e:
        logger.error(f"Could not fetch {response_attr} from API response")
        logger.error(resp.keys())
        return []
piece of the task I want to retry
I think the issue is the interaction between the two tasks because I get the same error when I raise a
FAIL
n
Thanks Tim - is your task configured to use the retry signal? Something like this as parameters to your task:
Copy code
max_retries=3, retry_delay=datetime.timedelta(minutes=10)
t
yes, it is
Traceback (most recent call last):
File "/home/tenders/.pyenv/versions/3.8.6/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/home/tenders/.pyenv/versions/3.8.6/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/home/tenders/Documents/code/platformsh-prefect/platform_prefect/vendors/accounts/transactions.py", line 132, in <module>
flow.run(executor=LocalDaskExecutor())
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/core/flow.py", line 1233, in run
state = self._run(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/core/flow.py", line 1051, in _run
flow_state = runner.run(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 270, in run
raise exc
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 252, in run
state = self.get_flow_run_state(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/utilities/executors.py", line 71, in inner
return runner_method(self, *args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/engine/runner.py", line 68, in inner
raise exc
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 495, in get_flow_run_state
children = executors.flatten_mapped_children(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/utilities/executors.py", line 559, in flatten_mapped_children
counts = executor.wait(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/executors/dask.py", line 611, in wait
return dask.compute(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/base.py", line 563, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/threaded.py", line 76, in get
results = get_async(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/local.py", line 487, in get_async
raise_exception(exc, tb)
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/local.py", line 317, in reraise
raise exc
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/local.py", line 222, in execute_task
result = _execute_task(task, data)
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/utilities/executors.py", line 560, in <lambda>
[executor.submit(lambda c: len(c._result.value), c) for c in mapped_children]
TypeError: object of type 'RETRY' has no len()
full stack trace from the error
n
Got it - instead of raising a
RETRY
signal, try raising an error; I can't test it just at the moment but I suspect raising the retry signal isn't triggering a failure of the task as you'd expect in a retry scenario
t
when I raise an exception the whole flow fails
ERROR - prefect.FlowRunner | Unexpected error: Exception('please retry')
n
Thanks for trying that @Tim Enders - I think this is a bug on our side with the
flatten
operator, where it's not accounting for the
signal
correctly; I'll open a ticket for this in Core 👍
t
ok, thank you
n
@Marvin open "Flatten doesn't work when a raised value isn't iterable"
s
Hi @nicholas, is it fixed yet? facing similar issue
n
Hi @Satheesh K - I don't think the issue has been picked up yet
s
Oh, okay
n
Feel free to comment on the linked issue above to let the Core team know you're affected by it!