I'm running a mapped task that has many children t...
# prefect-server
a
I'm running a mapped task that has many children tasks (~2800) and about 95% of them are sucessfull but at some point I get this error:
Copy code
Heartbeat process died with exit code 1
and the few children tasks left incomplete are unsuccsful. Any ideas how to debug why the parent mapping task is dying?
k
Hey @Alex Furrier, the heartbeat is a lightweight process so if it dies, I think it’s likely due to a lack resources. Do you think that could be a factor?
a
That's definitely possible. I'm executing this on K8s so probably hit some type of resource limit that killed things.
k
Do you know if those failures are resource related?
a
I don't know what exactly failed. The children tasks seemed to be executing successfully but at some point started erroring out with this connection error:
Copy code
requests.exceptions.ConnectionError: HTTPConnectionPool(host='prefect-apollo.prefect', port=4200): Max retries exceeded with url: /graphql/graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f90cdf864f0>: Failed to establish a new connection: [Errno 111] Connection refused'))
The task eventually fails with
Finished task run for task with final state: 'ClientFailed'
I'm guessing something in our k8s setup failed which may have caused the heartbeat failure or task failure? Just not sure where to look for what broke
Not sure what's going on. The K8s job that runs the flow doesn't appear to die at a certain point the child tasks can't connect to the Apollo endpoint. It refuses the connection for some reason
Failed to set task state with error: ConnectionError(ProtocolError('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
k
Hey, checked in this the team and this is indeed a symptom of not having enough resources.
a
Thanks @Kevin Kho Still trying to dig into where the resource error would be. It seems like something happens with a mapped task where the connection between the task and the GraphQL endpoint is refused and eventually it hits the max retries in the requests which causes a final error. This is the traceback for the first part:
Copy code
Failed to retrieve task state with error: ClientError([{'message': 'request to <http://prefect-graphql.prefect:4201/graphql/> failed, reason: connect ECONNREFUSED 10.0.83.43:4201', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'request to <http://prefect-graphql.prefect:4201/graphql/> failed, reason: connect ECONNREFUSED 10.0.83.43:4201', 'type': 'system', 'errno': 'ECONNREFUSED', 'code': 'ECONNREFUSED'}}}])
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
    task_run_info = self.client.get_task_run_info(
  File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 1399, in get_task_run_info
    result = self.graphql(mutation)  # type: Any
  File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 319, in graphql
    raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'message': 'request to <http://prefect-graphql.prefect:4201/graphql/> failed, reason: connect ECONNREFUSED 10.0.83.43:4201', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'request to <http://prefect-graphql.prefect:4201/graphql/> failed, reason: connect ECONNREFUSED 10.0.83.43:4201', 'type': 'system', 'errno': 'ECONNREFUSED', 'code': 'ECONNREFUSED'}}}]
and this is for the second part once it's hit max retries
Copy code
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/requests/adapters.py", line 439, in send
    resp = conn.urlopen(
  File "/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py", line 726, in urlopen
    retries = retries.increment(
  File "/opt/conda/lib/python3.8/site-packages/urllib3/util/retry.py", line 446, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='prefect-apollo.prefect', port=4200): Max retries exceeded with url: /graphql/graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fa4041c3f10>: Failed to establish a new connection: [Errno 111] Connection refused'))
So the root issue seems to be whatever is causing that connection refusal between the mapped task and the graphql endpoint. Any ideas of where to look where increasing resources might alleviate that issue?
Still working on this issue. My best guess is that it's being caused by a Dask worker dying due to OOM errors. It doesn't make sense to me why there's such a large memory issue and may be some type of memory leak or otherwise. Trying to debug where exactly the memory issue is coming from.
k
Hey, I’ll ping someone on the team who might be able to help you better than me here
n
Hi @Alex Furrier - since it sounds like this is happening at a pretty consistent point in your graph, I wonder if it has to do with how you're passing/returning data in your mapping; are you passing data directly or are you writing results somewhere for downstream consumers?
a
I think that is the issue. The memory is blowing up b/c it is holding the result of each child task in memory. So far for the mapped task I've been trying to use
LocalResult()
to serialize to disk and release memory but that didn't appear to work.
n
Have you tried returning nothing at all and seeing if execution proceeds as expected?
a
The other thing I'm not quite getting and this may just be due to my inexperience with Dask is that the OOM seems to be hitting when the memory consumed across all workers hits the memory limit specified in the
DaskExecutor()
For this flow I've set that to 12GB with
Copy code
with Flow(
    name="my OOM Flow",
    storage=Docker(
        base_image="<http://container-registry.io/oom-flow:dev|container-registry.io/oom-flow:dev>",
        registry_url="<http://container-registry.io|container-registry.io>",
        image_name="my-oom-flow",
        image_tag="latest",
    ),
    executor=DaskExecutor(
        cluster_class=lambda: KubeCluster(make_pod_spec(
            image=prefect.context.image, memory_limit='12G', memory_request='4G'), namespace='prefect'), 
        adapt_kwargs={"minimum": 2, "maximum": 25},
    ),
    run_config=KubernetesRun(),
    # result=PrefectResult()
) as my_oom_flow:
When this executes I can view the Dask dashboard to view resource consumption. Here it's showing across all workers memory consumption above the 12GB limit:
But looking at specific workers none are even close to the 12GB worker limit.
Could this be a similar issue to what is discussed here: https://prefect-community.slack.com/archives/CL09KU1K7/p1623226992151500
n
I think it's entirely possible, have you tried not writing anything to disk and returning only external data references (like a bucket) for downstream consumers?
a
So I reran the flow returning nothing on the child tasks of the mapped part that's causing issues. Nothing changed and it OOM-ed in the same way as before
Would the bucket suggestion be the same as implementing a cloud result for tasks like
AzureResult
or
S3Result
?
n
Yeah basically thinking it might be helpful to tighten the scope of memory usage by passing only references to data and not the data itself; the same could definitely be useful for the parent mapped task (depending how you're passing that to begin with).
a
@nicholas I updated the flow to use
AzureResult
for all tasks but it didn't seem to change much. If anything it seems to have possibly made it worse as the serialization+deserialization to Azure blob consumed more memory that wasn't released. It seems like the main issue is a Dask one. When running a mapping task with a large number of child tasks the memory consumed by a child task is never released after reaching a Success state. I may be wrong but what I think should be happening is a task reaches a Success state, serializes the result, and releases memory for that child task. Once all child tasks have been completed the serialized results are aggregated by the parent task. Is that correct?