Alex Furrier
08/20/2021, 4:05 AMRESULT_HANDLER = AzureResult(connection_string=os.environ.get('AZURE_STORAGE_CONNECTION_STRING'),
container='prefect')
with Flow(
'Example Flow',
executor=LocalDaskExecutor(
scheduler="threads",
num_workers=8,
namespace='prefect'
),
) as example_flow:
result_locations = query_data_process_serialize(query)
# Logging locations to be sure they are serialized
log_item(result_locations)
# This seems to break after ~1000 result locations
result_locations = deserialize_and_process_more.map(
record=result_locations,
result_handler=unmapped(RESULT_HANDLER),
upstream_tasks=[result_locations)]
)
I've run this a few times and the same thing always happens. I've copied the logged result locations and indexed that list using failed task children to see if I can deserialize using RESULT_HANDLER
and that works fine. E.g. if the flow failed on child 1100 when mapping deserialize_and_process_more
I tried running RESULT_HANDLER.serializer.deserialize(
RESULT_HANDLER.read(result_locations[1100]).value)
and was able to deserialize the data just fine. Not sure where the disconnect is happening where the result locations become None somehow.Sam Cook
08/20/2021, 1:33 PMthreads
to processes
. I've found on k8s infra that if your worker thread count exceeds your cpu allocation then the threads can randomly fail in unexpected ways over large maps using local dask. Switchign to processes has helped alleviate this, in my experienceAlex Furrier
08/20/2021, 3:02 PMFailed to retrieve task state with error: ClientError([{'message': 'Unable to complete operation. An internal API error occurred.', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Unable to complete operation. An internal API error occurred.'}}}])
Traceback (most recent call last):
File "/opt/conda/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
task_run_info = self.client.get_task_run_info(
File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 1728, in get_task_run_info
result = self.graphql(mutation) # type: Any
File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 565, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'message': 'Unable to complete operation. An internal API error occurred.', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Unable to complete operation. An internal API error occurred.'}}}]
07:58:25
INFO
CloudTaskRunner
GraphQL request:2:3
1 | {
2 | mapped_children(task_run_id: "1af93162-0606-4418-8dcd-8715c36fa3ae") {
| ^
3 | max_end_time
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 628, in await_result
return await result
File "/prefect-server/src/prefect_server/graphql/extensions.py", line 52, in resolve
result = await result
File "/prefect-server/src/prefect_server/graphql/runs.py", line 78, in resolve_mapped_children
records = await connection.fetch(query, task_run_id, timeout=0.5)
File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 443, in fetch
return await self._execute(query, args, 0, timeout)
File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 1446, in _execute
query, args, limit, timeout, return_status=return_status)
File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 1454, in __execute
return await self._do_execute(query, executor, timeout)
File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 1479, in _do_execute
result = await executor(stmt, timeout)
File "asyncpg/protocol/protocol.pyx", line 196, in bind_execute
concurrent.futures._base.TimeoutError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 674, in await_completed
return await completed
File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 659, in await_result
return_type, field_nodes, info, path, await result
File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 733, in complete_value
raise result
File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 628, in await_result
return await result
File "/prefect-server/src/prefect_server/graphql/extensions.py", line 52, in resolve
result = await result
File "/prefect-server/src/prefect_server/graphql/runs.py", line 78, in resolve_mapped_children
records = await connection.fetch(query, task_run_id, timeout=0.5)
File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 443, in fetch
return await self._execute(query, args, 0, timeout)
File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 1446, in _execute
query, args, limit, timeout, return_status=return_status)
File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 1454, in __execute
return await self._do_execute(query, executor, timeout)
File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 1479, in _do_execute
result = await executor(stmt, timeout)
File "asyncpg/protocol/protocol.pyx", line 196, in bind_execute
graphql.error.graphql_error.GraphQLError:
GraphQL request:2:3
1 | {
2 | mapped_children(task_run_id: "1af93162-0606-4418-8dcd-8715c36fa3ae") {
| ^
3 | max_end_time
Sam Cook
08/23/2021, 3:25 PMAlex Furrier
08/23/2021, 3:56 PM@task(result=RESULT_HANDLER)
def query_data_process_serialize():
input_data = query()
proc_data = process()
result_locations = serialize_data_return_results(proc_data, RESULT_HANDLER)
return result_locations
Kevin Kho
08/23/2021, 3:57 PMAlex Furrier
08/23/2021, 3:58 PMquery_data_process_serialize
from elsewhere is there a way to set its result within the flow or file the flow exists inKevin Kho
08/23/2021, 3:59 PMAlex Furrier
08/23/2021, 4:01 PMKevin Kho
08/23/2021, 4:03 PMAlex Furrier
08/23/2021, 4:05 PMKevin Kho
08/23/2021, 4:09 PMAlex Furrier
08/23/2021, 4:43 PMKevin Kho
08/23/2021, 4:47 PMAlex Furrier
08/23/2021, 5:02 PMmap
within a task?).Kevin Kho
08/23/2021, 5:05 PMmap
in a task. Yes that’s right I was referring to the server database and it might help to up the resources for that.