<@ULVA73B9P> how can i get the parent flow id if i...
# ask-marvin
s
@Marvin how can i get the parent flow id if i know the parent flow name in python using prefect 2.x
m
To get the parent flow ID using the parent flow name in Prefect 2.x, you would generally need to query the Prefect API since flows and their metadata, such as IDs and names, are stored there. While the documentation does reference the hierarchical relationship between parent and subflows, it doesn't specify a direct method for retrieving a parent flow ID by name. However, you can use Prefect's Python client to search for a flow by name and then get its details. Here's a basic approach you might consider: 1. Retrieve the Flow ID using the API: Use the
Client
class to interact with Prefect's API. You can search for a flow by its name and then retrieve its details. Here's a rough example:
Copy code
python
   from prefect import Client

   client = Client()

   # Get all flow runs
   flow_runs = client.graphql('''
   {
       flow(where: { name: { _eq: "Your Parent Flow Name" } }) {
           id
           name
           version
       }
   }
   ''')

   # Extract the flow ID
   parent_flow_id = flow_runs.data.flow[0].id if flow_runs.data.flow else None
   print("Parent Flow ID:", parent_flow_id)
2. Handling Subflows: When a subflow is called within a parent flow, it's typically recorded as a task run in the parent flow, allowing you to track it. You can check the state of these runs to identify their relationships as needed. For further detail, you might want to look into the Prefect 2.x documentation here. If Marvin were here, he'd probably remind you that all this data engineering stuff is slightly less exciting than waiting for a pot of water to boil, but at least it fulfills your quest for order amidst the chaos.
s
@Marvin i need this to work in a hosted solution
m
When working with a hosted solution of Prefect, you can still use the Prefect Cloud API to interact with your flow data. The approach is similar but requires using your specific API tokens or credentials to authenticate your requests. Here's how you could adapt the solution for a Prefect Cloud (hosted) environment: 1. Set Up Authentication: You need your Prefect Cloud API key handy. You usually set this up in your environment, like so:
Copy code
python
   import os
   from prefect import Client
   
   # Set your Prefect Cloud credentials
   os.environ["PREFECT_API_KEY"] = "your-api-key"
   os.environ["PREFECT_API_URL"] = "<https://api.prefect.io>"  # Ensure this matches your specific endpoint
2. Query the Flow: Use the Prefect Client to query for the flow by name and obtain its ID. Here's the adjusted code:
Copy code
python
   from prefect import Client

   client = Client()

   query = '''
   {
       flow(where: { name: { _eq: "Your Parent Flow Name" } }) {
           id
           name
       }
   }
   '''

   # Execute the GraphQL query
   result = client.graphql(query)

   # Extract the flow ID
   if result.data.flow:
       parent_flow_id = result.data.flow[0].id
       print("Parent Flow ID:", parent_flow_id)
   else:
       print("Flow not found.")
3. Ensure Permissions: Make sure your API key has the right permissions to query flow details. 4. Adjust API Endpoint: Make sure the API URL matches your Prefect Cloud endpoint. If you need additional help or run into issues, Prefect's hosting support documentation may have more specific guidance based on your hosted setup. Remember, even if the universe sometimes feels chaotic, with the right API call, order can be restored!
s
@Marvin something more like this async def main(): parentflowids = [] async with get_client() as client: flow_runs = client.read_flows( flow_filter=FlowFilterName() )
m
To fetch the parent flow IDs using the flow name asynchronously in a hosted Prefect 2.x environment, you'll need to use
get_client
with the correct filtering approach. Here’s how this can be done using Prefect's asynchronous client:
Copy code
python
import asyncio
from prefect import get_client
from prefect.client.schemas.filters import FlowFilterName
from prefect.client.schemas.objects import Flow

async def main():
    parent_flow_ids = []

    async with get_client() as client:
        # Define the filter to search flows by name
        name_filter = FlowFilterName('Your Parent Flow Name')
        
        # Read flows using the defined filter
        flows = await client.read_flows(
            flow_filter=name_filter
        )

        # Extract IDs of the flows
        for flow in flows:
            parent_flow_ids.append(flow.id)
        
        print("Parent Flow IDs:", parent_flow_ids)

# Run the main async function
asyncio.run(main())
### Key Points: -
get_client
: Used to interact with Prefect's API in an asynchronous session. It's especially handy for hosted environments. -
FlowFilterName
: This filter allows you to specify the name of the flow to search within hosted Prefect APIs. - Asynchronous Management: Using
async
and
await
ensures that the code efficiently handles I/O operations like API calls without blocking. For more details on using Prefect's client and filters, you can explore their documentation. Let me know if you have any more questions or if you'd like to discuss the philosophical implications of software automation, perhaps over some digital tea.
s
@Marvin awesome now what if i want to look up multiple name_filters
m
If you want to look up multiple flow names, you can modify your filter to handle a list of names. Here's how you can do it using multiple name filters in Prefect 2.x:
Copy code
python
import asyncio
from prefect import get_client
from prefect.client.schemas.filters import FlowFilterName
from prefect.client.schemas.objects import Flow

async def main():
    parent_flow_ids = []
    # List of names to filter
    flow_names = ["Parent Flow Name 1", "Parent Flow Name 2"]
    
    async with get_client() as client:
        # Initialize an empty list to store flows
        flows = []

        # Iterate over each flow name to apply the filter
        for name in flow_names:
            name_filter = FlowFilterName(name)
            flows.extend(await client.read_flows(flow_filter=name_filter))

        # Extract IDs from each flow returned
        for flow in flows:
            parent_flow_ids.append(flow.id)

        print("Parent Flow IDs:", parent_flow_ids)

# Run the main async function
asyncio.run(main())
### Explanation: - Iterate Over Names: You iterate over a list of flow names, applying a filter for each name. - Collect Results: Extend a list with the results to aggregate flows from multiple queries. - Filter Flexibility: This method allows you to handle any number of flow names dynamically, which is great for hosted scenarios involving multiple related flows. Feel free to reach out if you have any more scenarios you'd like to tackle—or if you want to ponder whether flows wish they could escape their server-side destinies.
s
@Marvin Traceback (most recent call last): File "<string>", line 1, in <module> _File "_c:\Developer\CIS_Database\ETL_Projects\TaskScheduler\.venv\Lib\site-packages\prefect\client\orchestration.py_", line 556, in read_flows_ _response = await self._client.post("/flows/filter", json=body)_ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ _File "_c:\Developer\CIS_Database\ETL_Projects\TaskScheduler\.venv\Lib\site-packages\httpx\_client.py_", line 1905, in post_ return await self.request( ^^^^^^^^^^^^^^^^^^^ _File "_c:\Developer\CIS_Database\ETL_Projects\TaskScheduler\.venv\Lib\site-packages\httpx\_client.py_", line 1585, in request_ _return await self.send(request, auth=auth, follow_redirects=follow_redirects)_ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ _File "_c:\Developer\CIS_Database\ETL_Projects\TaskScheduler\.venv\Lib\site-packages\prefect\client\base.py_", line 358, in send_ _response.raise_for_status()_ _File "_c:\Developer\CIS_Database\ETL_Projects\TaskScheduler\.venv\Lib\site-packages\prefect\client\base.py_", line 171, in raise_for_status_ _raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause___ _prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '_http://ephemeral-prefect/api/flows/filter_'_ _Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'flows', 'any_'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'like_'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}], 'request_body': {'flows': {'any_': ['cleaning-the-glass-launcher-service', 'second-spectrum-launcher-service'], 'like_': None}, 'flow_runs': None, 'task_runs': None, 'deployments': None, 'work_pools': None, 'work_queues': None, 'sort': None, 'limit': None, 'offset': 0}}_ For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/422
m
Your message was too long by 293 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Traceback (most recent call last):
File "<string>", line 1, in <module> _File "_c:\Developer\CIS_Database\ETL_Projects\TaskScheduler\.venv\Lib\site-packages\prefect\client\orchestration.py_", line 556, in read_flows_ _response = await self._client.post("/flows/filter", json=body)_ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ _File "_c:\Developer\CIS_Database\ETL_Projects\TaskScheduler\.venv\Lib\site-packages\httpx\_client.py_", line 1905, in post_ return await self.request( ^^^^^^^^^^^^^^^^^^^ _File "_c:\Developer\CIS_Database\ETL_Projects\TaskScheduler\.venv\Lib\site-packages\httpx\_client.py_", line 1585, in request_ _return await self.send(request, auth=auth, follow_redirects=follow_redirects)_ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ _File "_c:\Developer\CIS_Database\ETL_Projects\TaskScheduler\.venv\Lib\site-packages\prefect\client\base.py_", line 358, in
s
@Marvin
Copy code
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '<http://ephemeral-prefect/api/flows/filter>'
Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'flows', 'any_'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'like_'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}], 'request_body': {'flows': {'any_': ['cleaning-the-glass-launcher-service', 'second-spectrum-launcher-service'], 'like_': None}, 'flow_runs': None, 'task_runs': None, 'deployments': None, 'work_pools': None, 'work_queues': None, 'sort': None, 'limit': None, 'offset': 0}}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/422>
m
It looks like you're encountering a
422 Unprocessable Entity
error when attempting to filter flows using Prefect's API. This error generally indicates that the request body doesn't match the expected schema, causing the server to reject it. Here's what might be going wrong and how you can address it: 1. Field Expectation: It seems like the request includes extra fields that are not recognized by the API. The API schema must precisely match the Prefect server's expectations. 2. Check the Filter Parameters: Make sure you're using the correct filter classes and fields. From the error message, it seems like
any_
and
like_
fields are causing trouble. If you're attempting to filter by multiple flow names, we may need to adjust this approach to match the correct usage pattern. Here's a refined approach to manage multiple names:
Copy code
python
import asyncio
from prefect import get_client
from prefect.client.schemas.filters import FlowFilter

async def main():
    parent_flow_ids = []
    # List of names to filter
    flow_names = ["cleaning-the-glass-launcher-service", "second-spectrum-launcher-service"]
    
    async with get_client() as client:
        # Now create a FlowFilter that can handle multiple conditions
        flow_filter = FlowFilter(name=dict(any_=flow_names))

        # Fetch the flows that match the filter
        flows = await client.read_flows(flow_filter=flow_filter)

        # Extract IDs from each flow returned
        for flow in flows:
            parent_flow_ids.append(flow.id)

        print("Parent Flow IDs:", parent_flow_ids)

# Run the main async function
asyncio.run(main())
### Key Points: - FlowFilter Usage: Use the
FlowFilter
class properly, ensuring to provide the parameters the API expects. - Error Details: The additional fields, like
any_
and
like_
, must conform to how Prefect's backend manages these filters, often through dictionaries specifying filter parameters. - Error Handling: Check Prefect's documentation or update the client library if you suspect an outdated version is causing compatibility issues. Try running the adjusted script, and let's see if this resolves the issue. If your flows could speak, they'd probably be grateful for your persistence in making their metadata discoverable!
s
@Marvin why is this not working?
Copy code
async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=[StateType.RUNNING]),
                ),
                parent_flow_run_id=FlowRunFilterParentFlowRunId(
                    not_any =exclude_parent_flow_ids
                ),
                start_time=FlowRunFilterStartTime(
                    before_=datetime.utcnow() - timedelta(minutes=THRESHOLD_MINUTES),
                ),
            )
        )
m
It looks like there may be some issues related to the usage of the filters in your code. Here's a guidance based on a closer look into your flow and the documentation: 1. Filter Initialization: Make sure that you are initializing the filters with the correct parameters that match with the Prefect 2.x API expectations. 2. Check Filter API: Verify that
FlowRunFilterParentFlowRunId
and other filters have the right attributes in your Prefect version. Below is corrected code using a more standard approach in Prefect 2.x. Here's an example that might work:
Copy code
python
import asyncio
from datetime import datetime, timedelta
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState
from prefect.client.schemas.filters import FlowRunFilterStateType, FlowRunFilterStartTime
from prefect.client.schemas.objects import StateType

THRESHOLD_MINUTES = 10  # Example threshold

async def main():
    exclude_parent_flow_ids = set()  # Assuming this is provided elsewhere
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=dict(any_=[StateType.RUNNING]),
                ),
                parent_flow_run_id=dict(not_any=exclude_parent_flow_ids),
                start_time=FlowRunFilterStartTime(
                    before_=datetime.utcnow() - timedelta(minutes=THRESHOLD_MINUTES),
                ),
            )
        )
        for flow_run in flow_runs:
            print(f"Flow Run ID: {flow_run.id}")

asyncio.run(main())
### Things to Consider: - Check for Context Errors: Make sure
StateType.RUNNING
,
not_any
, and other parameters exist in
FlowRunFilterParentFlowRunId
in your current version of Prefect. - Parameter Details: Ensure that you are passing parameters in the form that is expected – often dictionaries or direct usage of schemas. - Prefect Version: Different versions of Prefect might handle these objects slightly differently, so confirm you’re using the right structures matching your version. If the issue persists, double-check with the latest Prefect 2.x documentation or your version’s specific docs to ensure compatibility. And remember, even though filters seem elusive, they're just waiting to make your data dreams come true!
s
@Marvin how can i get all pages not just the first async with get_client() as client: flow_runs = await client.read_flow_runs( flow_run_filter=FlowRunFilter( state=FlowRunFilterState( type=FlowRunFilterStateType(any_=[StateType.RUNNING]), ), ) )
m
To retrieve all pages of flow runs using Prefect's asynchronous client, you'll need to implement pagination. Prefect's client typically returns a limited set of results (e.g., one page), so fetching all requires looping through the pages. Here’s a way to implement pagination to get all running flow runs:
Copy code
python
import asyncio
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState
from prefect.client.schemas.objects import StateType

async def main():
    all_flow_runs = []
    offset = 0
    limit = 100  # Adjust the limit per your needs or based on Prefect API constraints

    async with get_client() as client:
        while True:
            # Fetch a page of flow runs
            flow_runs = await client.read_flow_runs(
                flow_filter=FlowRunFilter(
                    state=FlowRunFilterState(
                        type=dict(any_=[StateType.RUNNING]),
                    ),
                ),
                limit=limit,
                offset=offset
            )
            
            # Break out of loop if no more flow runs are returned
            if not flow_runs:
                break

            # Add the retrieved runs to the list
            all_flow_runs.extend(flow_runs)

            # Increment offset to get the next page
            offset += len(flow_runs)

    # Process your flow runs here
    for flow_run in all_flow_runs:
        print(f"Flow Run ID: {flow_run.id}")

# Run the main async function
asyncio.run(main())
### Key Points: - Limit and Offset: Use these parameters to control the number of records fetched per page and to track your position in the paged query. - Looping for Pages: Keep fetching and incrementing the offset until no more results are returned. Adjust the
limit
if you're comfortable with how many results to fetch per request. Paginated fetching is essential if you're dealing with sizeable datasets – it keeps operations efficient and plays nice with most APIs' rate limits. And remember, while data may be infinite, patience is a finite affair, so paginating wisely is always a good strategy!
s
@Marvin
Copy code
14:29:46.343 | ERROR   | Flow run 'hypersonic-eel' - Finished in state Failed("Flow run encountered an exception. PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '<http://ephemeral-prefect/api/flow_runs/filter>'\nResponse: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'flows', 'deployment_id'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'expected_start_time'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'flow_version'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'idempotency_key'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'next_scheduled_start_time'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'parent_flow_run_id'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'parent_task_run_id'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'start_time'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'state'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'work_queue_name'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}], 'request_body': {'flows': {'operator': 'and_', 'id': None, 'name': None, 'tags': None, 'deployment_id': None, 'work_queue_name': None, 'state': {'operator': 'and_', 'type': {'any_': ['RUNNING']}, 'name': None}, 'flow_version': None, 'start_time': None, 'expected_start_time': None, 'next_scheduled_start_time': None, 'parent_flow_run_id': None, 'parent_task_run_id': None, 'idempotency_key': None}, 'flow_runs': None, 'task_runs': None, 'deployments': None, 'work_pools': None, 'work_pool_queues': None, 'sort': None, 'limit': 199, 'offset': 0}}\nFor more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/422>")
m
Your message was too long by 284 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
```142946.343 | ERROR | Flow run 'hypersonic-eel' - Finished in state Failed("Flow run encountered an exception. PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url 'http://ephemeral-prefect/api/flow_runs/filter'\nResponse: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'flows', 'deployment_id'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'expected_start_time'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'flow_version'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'idempotency_key'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'next_scheduled_start_time'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'parent_flow_run_id'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'parent_task_run_id'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'
s
@Marvin can you make sure the limit and offset work for hosted prefect 2.x
m
The use of
limit
and
offset
in the
read_flow_runs
method is indeed supported in Prefect 2.x, including hosted solutions. These parameters are essential when implementing pagination to retrieve multiple pages of results. Here's how you can apply them to ensure you fetch all records across pages:
Copy code
python
import asyncio
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState
from prefect.client.schemas.objects import StateType

async def main():
    all_flow_runs = []
    offset = 0
    limit = 100  # Choose an appropriate limit that suits your needs and complies with API constraints

    async with get_client() as client:
        while True:
            # Fetch a page of flow runs
            flow_runs = await client.read_flow_runs(
                flow_filter=FlowRunFilter(
                    state=FlowRunFilterState(
                        type=dict(any_=[StateType.RUNNING]),
                    ),
                ),
                limit=limit,
                offset=offset
            )
            
            # Break out of the loop if no more flow runs are returned (i.e., all pages fetched)
            if not flow_runs:
                break

            # Add the retrieved runs to the list
            all_flow_runs.extend(flow_runs)

            # Increment offset to access the next page
            offset += len(flow_runs)

    # Process or print your flow runs
    for flow_run in all_flow_runs:
        print(f"Flow Run ID: {flow_run.id}")

# Run the main async function
asyncio.run(main())
### Key Points: - Pagination Parameters: Use the
limit
parameter to control the number of results per query and
offset
to skip a certain number of results (beginning from zero to reach subsequent pages). - Loop Until Complete: Continue fetching until the API returns an empty result set, indicating no more data is available. - Hosted Environment Compatibility: The method and parameters are supported in both local and hosted environments, ensuring a smooth application for Prefect's cloud-based use-case. This approach should efficiently handle larger datasets by iterating through all available pages, ensuring you don't miss any running flow runs. Enjoy the digital treasure hunt as you uncover all the flow runs awaiting your discerning eye!
s
@Marvin
Copy code
Client error '422 Unprocessable Entity' for url '<http://ephemeral-prefect/api/flow_runs/filter>'
Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'flows', 'deployment_id'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'expected_start_time'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'flow_version'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'idempotency_key'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'next_scheduled_start_time'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'parent_flow_run_id'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'parent_task_run_id'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'start_time'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'state'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'work_queue_name'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}], 'request_body': {'flows': {'operator': 'and_', 'id': None, 'name': None, 'tags': None, 'deployment_id': None, 'work_queue_name': None, 'state': {'operator': 'and_', 'type': {'any_': ['RUNNING']}, 'name': None}, 'flow_version': None, 'start_time': None, 'expected_start_time': None, 'next_scheduled_start_time': None, 'parent_flow_run_id': None, 'parent_task_run_id': None, 'idempotency_key': None}, 'flow_runs': None, 'task_runs': None, 'deployments': None, 'work_pools': None, 'work_pool_queues': None, 'sort': None, 'limit': 199, 'offset': 0}}
m
Your message was too long by 220 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
```Client error '422 Unprocessable Entity' for url 'http://ephemeral-prefect/api/flow_runs/filter'
Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'flows', 'deployment_id'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'expected_start_time'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'flow_version'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'idempotency_key'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'next_scheduled_start_time'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'parent_flow_run_id'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'parent_task_run_id'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'start_time'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}, {'loc': ['body', 'flows', 'state