Not sure if this is related to my post above, but ...
# prefect-server
a
Not sure if this is related to my post above, but now I'm running into an error where flow results appear to be dropping after mapping across 1000+ items. To reduce memory I'm serializing a task result and loading it from a passed location downstream. I initially read in data records, process them, and serialize them to Azure blob storage before returning a list of the serialized locations. Downstream I process these further, first deserializing them and then processing them. At a certain point (usually about 1100 records in but that could be totally arbitrary) the mapped task fails because the passed result location reference is None for some reason. I've logged the result locations after the first serialization and there are no Nones in it, yet later on the string location is no longer present and breaks the task. Below is the pseudo code:
Copy code
RESULT_HANDLER = AzureResult(connection_string=os.environ.get('AZURE_STORAGE_CONNECTION_STRING'),
                             container='prefect')
with Flow(
    'Example Flow',
    executor=LocalDaskExecutor(
        scheduler="threads",
        num_workers=8,
        namespace='prefect'
    ),
) as example_flow:

    result_locations = query_data_process_serialize(query)
    # Logging locations to be sure they are serialized
    log_item(result_locations)

    # This seems to break after ~1000 result locations
    result_locations = deserialize_and_process_more.map(
        record=result_locations,
        result_handler=unmapped(RESULT_HANDLER),
        upstream_tasks=[result_locations)]
    )
I've run this a few times and the same thing always happens. I've copied the logged result locations and indexed that list using failed task children to see if I can deserialize using
RESULT_HANDLER
and that works fine. E.g. if the flow failed on child 1100 when mapping
deserialize_and_process_more
I tried running
RESULT_HANDLER.serializer.deserialize(
RESULT_HANDLER.read(result_locations[1100]).value)
and was able to deserialize the data just fine. Not sure where the disconnect is happening where the result locations become None somehow.
s
Try changing your dask scheduler from
threads
to
processes
. I've found on k8s infra that if your worker thread count exceeds your cpu allocation then the threads can randomly fail in unexpected ways over large maps using local dask. Switchign to processes has helped alleviate this, in my experience
đź‘Ť 2
a
@Sam Cook Thanks for the tip I'll try that.
I think the root issue is that something is causing the graphQL endpoint to crash. I switched to using Dask processes and during a flow run graphql crashed. That in turn seemed to cause some tasks to fail with these logs:
Copy code
Failed to retrieve task state with error: ClientError([{'message': 'Unable to complete operation. An internal API error occurred.', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Unable to complete operation. An internal API error occurred.'}}}])
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
    task_run_info = self.client.get_task_run_info(
  File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 1728, in get_task_run_info
    result = self.graphql(mutation)  # type: Any
  File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 565, in graphql
    raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'message': 'Unable to complete operation. An internal API error occurred.', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Unable to complete operation. An internal API error occurred.'}}}]
07:58:25
INFO
CloudTaskRunner
Trying to investigate what's going wrong. There is a log in the graphql that looks like it might be related to the mapped task breaking:
Copy code
GraphQL request:2:3
1 | {
2 |   mapped_children(task_run_id: "1af93162-0606-4418-8dcd-8715c36fa3ae") {
  |   ^
3 |     max_end_time
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 628, in await_result
    return await result
  File "/prefect-server/src/prefect_server/graphql/extensions.py", line 52, in resolve
    result = await result
  File "/prefect-server/src/prefect_server/graphql/runs.py", line 78, in resolve_mapped_children
    records = await connection.fetch(query, task_run_id, timeout=0.5)
  File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 443, in fetch
    return await self._execute(query, args, 0, timeout)
  File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 1446, in _execute
    query, args, limit, timeout, return_status=return_status)
  File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 1454, in __execute
    return await self._do_execute(query, executor, timeout)
  File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 1479, in _do_execute
    result = await executor(stmt, timeout)
  File "asyncpg/protocol/protocol.pyx", line 196, in bind_execute
concurrent.futures._base.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 674, in await_completed
    return await completed
  File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 659, in await_result
    return_type, field_nodes, info, path, await result
  File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 733, in complete_value
    raise result
  File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 628, in await_result
    return await result
  File "/prefect-server/src/prefect_server/graphql/extensions.py", line 52, in resolve
    result = await result
  File "/prefect-server/src/prefect_server/graphql/runs.py", line 78, in resolve_mapped_children
    records = await connection.fetch(query, task_run_id, timeout=0.5)
  File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 443, in fetch
    return await self._execute(query, args, 0, timeout)
  File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 1446, in _execute
    query, args, limit, timeout, return_status=return_status)
  File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 1454, in __execute
    return await self._do_execute(query, executor, timeout)
  File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 1479, in _do_execute
    result = await executor(stmt, timeout)
  File "asyncpg/protocol/protocol.pyx", line 196, in bind_execute
graphql.error.graphql_error.GraphQLError: 

GraphQL request:2:3
1 | {
2 |   mapped_children(task_run_id: "1af93162-0606-4418-8dcd-8715c36fa3ae") {
  |   ^
3 |     max_end_time
@Kevin Kho @Sam Cook Any ideas what might be happening here? I think it might have something to do with using a result handler to serialize/deserialize data within tasks to save memory. I tried running these flows without a dask/local dask executor and the mapped tasks lasted longer but still had the same issue causing an erroring out. Currently my team is blocked based on this issue and I'm having difficulty determining where to look next.
s
Can you change your results storage options to something more persistent? Like AzureResult maybe? I've had issues where pods can lose state with the default if the flow get restarted at the wrong time because of connection issues. https://docs.prefect.io/api/latest/engine/results.html#azureresult
a
So would I change that result at the task level? Going from the example code something like
Copy code
@task(result=RESULT_HANDLER)
def query_data_process_serialize():
    input_data = query()
    proc_data = process()
    result_locations = serialize_data_return_results(proc_data, RESULT_HANDLER)
    return result_locations
k
I am wondering if the AzureResult you are using here creates a connection or client, and there are too many concurrent ones open such that they hang. Maybe you can try decreasing the number of workers? Does this work for a lower number of mapped tasks?
a
If I'm importing that task
query_data_process_serialize
from elsewhere is there a way to set its result within the flow or file the flow exists in
@Kevin Kho That could make sense. It does work for smaller number of mapped tasks. Usually errors out somewhere between 1100-1600 tasks
k
Oh actually you said you took out the DaskExecutor and it failed also. Is there just one mapped task in this Flow?
a
There is only one mapped task. Running with local executor instead of dask/local dask made the erroring out take longer (i.e. higher number of mapped tasks succeeded before breaking)
k
Would you know if your memory is running out?
a
It doesn't appear to be running out. I've set the job resource requests fairly high and examining the various pods for the job, graphql, etc don't show a 137 exit code (if there are pod restarts) or OOMKilled/Evicted
k
I’ll try seeing if there is a way to explicitly close that AzureResult client.
Doesn’t look like they have a close method for the Azure client and they say on Github they can support concurrent load (but they don’t give limits). Will ask the team for more ideas.
a
Would it make a difference if I instantiated the result handler within the task rather than pass reference to a global result handler like the example code? Or perhaps use a context manager when serializing/deserializing with the result handler?
k
Chatted with the team and the errors look like something on the API is hitting some kind of limit. It seems like your database with server might be getting overloaded. Does not look Azure client related. You may have to break up this flow for your database.
a
So it's the mapped tasks that would need to be broken up into batches? Seems like an issue I keep running into haha. I may need to look into that more, but it seems like a problem without an obvious solution since I would either need to map over some batches (and mapping is causing these issues to begin with) or iterate over the batches and within each batch map over smaller amounts of data (which I don't think is possible to use
map
within a task?).
Would it potentially alleviate issues by upping the resources assigned to our postgres database? I'm assuming the database issues you are referring to are related to the prefect server database
k
Lol I know! Yes you can’t use
map
in a task. Yes that’s right I was referring to the server database and it might help to up the resources for that.