Tim Enders
03/19/2021, 2:40 PMflatten
to gather all of the results back up. When RETRY
is raised I am getting the following: TypeError: object of type 'RETRY' has no len()
nicholas
Tim Enders
03/19/2021, 2:50 PMitems = get_items_list.map(
unmapped(client),
unmapped(entity),
page=pages,
path=unmapped(path),
response_attr=unmapped(response_attr),
)
frames = transactions_dataframe(flatten(items), api_name)
Tim Enders
03/19/2021, 2:50 PMTim Enders
03/19/2021, 2:51 PMif res.status_code == 500:
logger.error(f"Got a 500 error on page {page}")
raise signals.RETRY(f"Retrying {res.url}")
else:
resp = res.json()
try:
# breakpoint()
return resp[response_attr]
except KeyError as e:
logger.error(f"Could not fetch {response_attr} from API response")
logger.error(resp.keys())
return []
piece of the task I want to retryTim Enders
03/19/2021, 3:04 PMFAIL
nicholas
max_retries=3, retry_delay=datetime.timedelta(minutes=10)
Tim Enders
03/19/2021, 3:05 PMTim Enders
03/19/2021, 3:13 PMTraceback (most recent call last):
File "/home/tenders/.pyenv/versions/3.8.6/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/home/tenders/.pyenv/versions/3.8.6/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/home/tenders/Documents/code/platformsh-prefect/platform_prefect/vendors/accounts/transactions.py", line 132, in <module>
flow.run(executor=LocalDaskExecutor())
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/core/flow.py", line 1233, in run
state = self._run(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/core/flow.py", line 1051, in _run
flow_state = runner.run(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 270, in run
raise exc
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 252, in run
state = self.get_flow_run_state(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/utilities/executors.py", line 71, in inner
return runner_method(self, *args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/engine/runner.py", line 68, in inner
raise exc
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 495, in get_flow_run_state
children = executors.flatten_mapped_children(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/utilities/executors.py", line 559, in flatten_mapped_children
counts = executor.wait(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/executors/dask.py", line 611, in wait
return dask.compute(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/base.py", line 563, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/threaded.py", line 76, in get
results = get_async(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/local.py", line 487, in get_async
raise_exception(exc, tb)
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/local.py", line 317, in reraise
raise exc
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/local.py", line 222, in execute_task
result = _execute_task(task, data)
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/utilities/executors.py", line 560, in <lambda>
[executor.submit(lambda c: len(c._result.value), c) for c in mapped_children]
TypeError: object of type 'RETRY' has no len()
Tim Enders
03/19/2021, 3:13 PMnicholas
RETRY
signal, try raising an error; I can't test it just at the moment but I suspect raising the retry signal isn't triggering a failure of the task as you'd expect in a retry scenarioTim Enders
03/19/2021, 3:30 PMTim Enders
03/19/2021, 3:30 PMERROR - prefect.FlowRunner | Unexpected error: Exception('please retry')
nicholas
flatten
operator, where it's not accounting for the signal
correctly; I'll open a ticket for this in Core 👍Tim Enders
03/19/2021, 3:45 PMnicholas
Marvin
03/19/2021, 3:46 PMSatheesh K
04/15/2021, 9:35 AMnicholas
Satheesh K
04/15/2021, 2:29 PMnicholas