mondras
03/19/2024, 6:20 PMdistributed.core - Event loop was unresponsive in Nanny for 14.37s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
Marvin
03/19/2024, 6:21 PMmondras
03/19/2024, 6:32 PMMarvin
03/19/2024, 6:33 PMmondras
03/19/2024, 6:39 PMMarvin
03/19/2024, 6:40 PMDaskExecutor
, one improvement could be exposing Dask scheduler information, including the dashboard link, in your Prefect logs. You can set distributed.dashboard.link
in your Dask configuration, which provides template functionality for this purpose. Here's a discussion from GitHub that might be helpful in setting up logging for your Dask Executor and ECS.
4. CloudProvider for Dashboard Link: A discussion on GitHub suggests that to get the right dashboard link in a setup similar to yours, you would need dask-cloudprovider to expose the dashboard’s public IP.
5. Agent Configuration: Agents orchestrate flow runs and need to be configured correctly. This involves provisioning and monitoring the infrastructure required for your flow runs. Here's a detailed explanation on how FlowRunners, Executors, and Agents work together, which might help you configure them correctly.
Please review the linked resources for more in-depth guidance on configuring your ECS tasks and Agents to work effectively with Dask. If you need more detailed help, specific to parts of your configuration, please let me know, and I can help look for more information.mondras
03/20/2024, 2:48 AMKeyError: 77567
in relation to
self._h2_state.end_stream(stream_id)
File "/Users/data-snowflake/.venv/lib/python3.11/site-packages/h2/connection.py", line 883, in end_stream
frames = self.streams[stream_id].end_stream()
Marvin
03/20/2024, 2:49 AMOccasional http2 connection errors (KeyError)
: A bug report PrefectHQ/prefect#11660 details an error where flows crash with a connection-related exception originating from h2, which seems to occur occasionally during longer flow runs.
2. Client sometimes fails with httpx.LocalProtocolError
: Another related issue report PrefectHQ/prefect#7442 discusses scenarios where the agent is not able to retrieve jobs due to an exception raised that's related to the connection state being closed unexpectedly.
Based on the information you provided, especially the traceback part where self._h2_state.end_stream(stream_id)
in connection.py
causes a KeyError
, it seems like the connection might have been closed or encountered an error, but an attempt was still made to process the end of a stream with an invalid stream ID.
To resolve this issue, a few potential steps can be taken:
- Ensure that all dependencies are up to date. It could be that a bug in the h2 library has been patched in a more recent version.
- Review the Prefect and related dependencies' changelogs and open issues to see if there are known issues with long-running flows or connections being closed prematurely.
- You could also implement error handling in your workflow to catch such exceptions and manage retries or cleanup appropriately.
If these suggestions do not resolve your issue and due to the complexity of async I/O, you might consider creating a new issue on the Prefect GitHub repository if one doesn't already exist for your specific case. Be sure to provide details of your environment setup, such as versions of Prefect, Dask, httpx, h2, and how you're deploying Prefect (e.g., with Docker), as this could help maintainers diagnose the problem.mondras
03/20/2024, 10:51 PMMarvin
03/20/2024, 10:51 PMRunning
state at any one time. These limits are set using task tags and apply across all tasks with the tag. You can read more about task concurrency limits here.
In terms of subflows, while there isn't a specific mention of a limit to the number of tasks that can be included in a subflow, you will want to be mindful of system resources and maintain good practices in workflow design to avoid overloading your execution environment, which can lead to performance issues or failures.
Design-wise, you can choose to use subflows when you have complex workflows where it makes sense to encapsulate a group of tasks into their own logical unit. Subflows can help you manage dependencies and task execution ordering in a more organized way, especially when dealing with large numbers of tasks.
If you're encountering performance issues or limitations with generating many tasks within subflows, you may need to look into optimizing your task design or scaling your execution environment (e.g., by using a Dask cluster) to better handle a high number of tasks.mondras
03/20/2024, 11:02 PMMarvin
03/20/2024, 11:02 PMasyncio.gather
to concurrently execute them. Here's a code example from the Prefect recipes on GitHub:
python
import asyncio
from prefect import flow
@flow
async def subflow(x):
print(f"Running subflow with {x=}")
return x
@flow
async def parent_flow():
return await asyncio.gather(*[subflow(x) for x in range(3)])
if __name__ == "__main__":
assert asyncio.run(parent_flow())
This example demonstrates how to define the parent flow (parent_flow
) as an async
function and use asyncio.gather
to await the concurrent completion of all the subflow tasks initiated by list comprehension. Each subflow will receive a different parameter (in this case, numbers 0 to 2), allowing them to be executed in parallel.
You can find the entire code snippet and more details on how to run asynchronous subflows on the Prefect recipes GitHub page.
Additionally, Prefect's documentation on composing flows indicates that a subflow run is created when a flow function is called inside the execution of another flow. While this excerpt doesn't directly address asynchronous subflows, it confirms that subflows are considered first-class citizens within Prefect, meaning they can be run just like top-level flows.
In summary, you can certainly use an asynchronous approach to run subflows concurrently, similar to how you use .map
for tasks. Keep in mind that your flow functions would need to be prepared for asynchronous operation (i.e., defined with async def
).mondras
03/21/2024, 12:04 AMMarvin
03/21/2024, 12:04 AMKeyError
exception was raised with the key 63681
. This typically occurs when you attempt to access a dictionary with a key that does not exist within that dictionary.
In the context of Prefect, this could be related to a situation where either:
1. Your code, or
2. A Prefect internal operation
is trying to access some information indexed by a specific key (in this case, 63681
), and that key is not in the expected data structure, which is likely a Python dictionary.
This can happen for various reasons depending on what your flow and tasks are doing. It might be an unexpected state of data, an issue with the system managing the runs (like a Dask issue, for example), or something else entirely.
Without more information on what task or operation within your flow was running when this error occurred, it's challenging to give a precise cause. However, such issues are often related to code logic errors, wherein certain assumptions about available data are violated—perhaps due to an unexpected data input, a race condition, or a problem during intermediate data storage or transfer.
To troubleshoot this further, examine the details of the KeyError
- specifically what operation was being performed when this key was looked up and what dataset or context it pertains to. Checking the surrounding logs or traceback may provide more insights into the cause of the issue. Additionally, considering any recent changes or specific conditions under which this error occurs might help pinpoint the problem.mondras
03/21/2024, 4:22 AMimport asyncio
from prefect import flow, task
@task
def run_task(field: str):
return field
@flow
async def subflow(x, sep):
async with sep:
y = [x for x in range(1000)]
run_task.map(y)
@flow
async def parent_flow():
sep = asyncio.Semaphore(16)
return await asyncio.gather(*[subflow(x, sep) for x in range(100)])
if __name__ == "__main__":
assert asyncio.run(parent_flow())
this is creating 100 subflows all in the state running; what is meant by running here? i would think that only 16 are running and the others are pending but that's not what this example showsMarvin
03/21/2024, 4:22 AMparent_flow
, and each subflow is being launched in the context of an asyncio.gather
. This suggests that all 100 subflows are initiated concurrently.
The sem
(which I believe you meant to be sep
for a semaphore) is used within the subflow
to apply a concurrency limit—each subflow is entering the asynchronous context manager created by async with sep
. This semaphore is intended to limit the number of concurrent executions within the subflow
.
However, the semaphore here isn't actually affecting the number of concurrent subflows running because the subflow isn't awaiting on any task inside the context manager. The run_task.map(y)
is a Prefect method to map tasks over an iterable y
, and it schedules the mapped tasks to run potentially in parallel, but it's not an async
call and doesn't work directly with asyncio
constructs like a semaphore. Hence, the semaphore doesn't control the concurrency of this operation directly, and all subflows are initiated and marked as 'running', because from Prefect's perspective, they've all started executing.
Prefect considers a subflow 'running' when the subflow function is called—it doesn't necessarily reflect whether any given mapped task within the subflow has started, is waiting, or has completed. It's a state indicating that the subflow has begun execution.
If you are trying to limit the actual execution of subflows to 16 at a time, you would need to have tasks within the subflow await
on some asynchronous operation that is controlled by the semaphore. Simply entering the context manager of the semaphore does not affect the execution in this case, as the semaphore needs to be awaited upon to limit concurrency.
Also, keep in mind that asyncio.Semaphore
controls the concurrency of async
functions/tasks in an asyncio
event loop. Regular Prefect task mapping does not integrate with asyncio
concurrency constructs directly.
To limit concurrency of subflows or tasks in Prefect, you can use Prefect concurrency primitives like task concurrency limits
which use task tags, or potentially modify your flow execution strategy to control the number of subflows running at a given time through Prefect-native mechanisms rather than trying to mix in asyncio.Semaphore
.mondras
03/21/2024, 4:26 AMMarvin
03/21/2024, 4:27 AMasyncio.Semaphore
.
To implement these concurrency patterns in your flows, you may want to use the aforementioned mechanisms from Prefect, tailored to suit the specific requirements and constraints of your workflow.