<@ULVA73B9P> I would like a quick tutorial and sam...
# ask-marvin
j
@Marvin I would like a quick tutorial and sample code. I have created a service account with "Runner" privileges and have an API key from that. I want to write a python program that connects to prefect cloud and runs a particular flow in a particular deployment using that API key. Assume the deployment is running and ready to accept this request. Show me the basic code outline and a concise explanation of what the code is doing.
m
thought for 105.2 seconds
Perfect! Now I can provide you with accurate, up-to-date code for Prefect 3.x. Here's a concise example of how to run a deployment programmatically:
Copy code
python
from prefect import get_client
from prefect.client.cloud import get_cloud_client
import asyncio

async def run_my_deployment():
    # Connect to Prefect Cloud using your API key
    # The API key should be set in your environment as PREFECT_API_KEY
    client = await get_cloud_client()
    
    # Create a flow run from your deployment
    # Replace these with your actual deployment details
    flow_run = await client.create_flow_run_from_deployment(
        deployment_id="deployment-id",  # Your deployment ID
        name="My Programmatic Run",     # Optional: custom name for this run
        parameters={}                   # Optional: parameters for your flow
    )
    
    print(f"Created flow run {flow_run.id}")
    return flow_run

# Run the async function
if __name__ == "__main__":
    asyncio.run(run_my_deployment())
Here's what you need to do to use this code: 1. Set up your API key:
Copy code
bash
   export PREFECT_API_KEY='your-api-key-here'
Or in Python before running the code:
Copy code
python
   import os
   os.environ["PREFECT_API_KEY"] = "your-api-key-here"
2. Get your deployment ID: You can find this in the Prefect Cloud UI or by using the CLI command
prefect deployment ls
3. Add any parameters your flow expects in the
parameters
dict The code does the following: 1. Connects to Prefect Cloud using your API key 2. Creates a new flow run from your specified deployment 3. Returns the flow run object, which contains the ID and other metadata Important notes: - The code is async because Prefect's client API is primarily async-based - The flow run will be created and scheduled according to your deployment's configuration - You don't need to worry about workers or agents - they will pick up the run automatically - The service account with "Runner" privileges you created has the correct permissions for this operation If you need to wait for the flow run to complete, you can add this functionality:
Copy code
python
async def run_and_wait_for_deployment():
    client = await get_cloud_client()
    
    flow_run = await client.create_flow_run_from_deployment(
        deployment_id="deployment-id",
        name="My Programmatic Run"
    )
    
    # Wait for the flow run to complete
    flow_run = await client.wait_for_flow_run(
        flow_run.id,
        timeout=300  # 5 minutes timeout
    )
    
    print(f"Flow run completed with state: {flow_run.state.type}")
    return flow_run
This extended version will wait for the flow run to complete and show its final state. The timeout parameter prevents infinite waiting if something goes wrong.
j
@Marvin Can the code get the deployment id from the deployment's name from when it was created? I don't want to have to run "prefect deployment ls" and hard-code the result in case it changes.
m
thought for 84.9 seconds
Here's the updated code that looks up a deployment by name before running it:
Copy code
python
from prefect import get_client
from prefect.client.cloud import get_cloud_client
import asyncio

async def run_deployment_by_name(flow_name: str, deployment_name: str):
    # Connect to Prefect Cloud using your API key
    client = await get_cloud_client()
    
    # Find the deployment by name
    deployment = await client.read_deployment_by_name(
        flow_name=flow_name,
        deployment_name=deployment_name
    )
    
    # Create a flow run using the found deployment
    flow_run = await client.create_flow_run_from_deployment(
        deployment_id=deployment.id,
        name=f"Programmatic run of {deployment_name}",
        parameters={}  # Add any parameters your flow needs
    )
    
    print(f"Created flow run {flow_run.id} for deployment {deployment_name}")
    return flow_run

# Example usage with waiting for completion
async def run_and_wait_by_name(flow_name: str, deployment_name: str):
    client = await get_cloud_client()
    
    # Find the deployment
    deployment = await client.read_deployment_by_name(
        flow_name=flow_name,
        deployment_name=deployment_name
    )
    
    # Create and run the flow
    flow_run = await client.create_flow_run_from_deployment(
        deployment_id=deployment.id,
        name=f"Programmatic run of {deployment_name}"
    )
    
    # Wait for completion
    flow_run = await client.wait_for_flow_run(
        flow_run.id,
        timeout=300  # 5 minutes timeout
    )
    
    print(f"Flow run completed with state: {flow_run.state.type}")
    return flow_run

# Run the async function
if __name__ == "__main__":
    # Example: if your flow is named "my-flow" and deployment is "prod-deployment"
    asyncio.run(run_deployment_by_name("my-flow", "prod-deployment"))
    
    # Or with waiting:
    # asyncio.run(run_and_wait_by_name("my-flow", "prod-deployment"))
To use this code: 1. Set your API key as before:
Copy code
python
   import os
   os.environ["PREFECT_API_KEY"] = "your-api-key-here"
2. Call the function with your flow and deployment names:
Copy code
python
   asyncio.run(run_deployment_by_name(
       flow_name="your-flow-name",
       deployment_name="your-deployment-name"
   ))
The flow name and deployment name are the ones you used when creating the deployment. For example, if you created your deployment with:
Copy code
python
flow.from_source(...).deploy(
    name="prod-deployment",
    work_pool_name="your-pool"
)
Then: -
flow_name
would be the name of your flow (the name of the function decorated with @flow) -
deployment_name
would be "prod-deployment" This approach is more maintainable because: 1. It doesn't require hardcoding IDs 2. It's more readable (names are more meaningful than IDs) 3. It's more resilient to deployment updates or recreations 4. It matches how you typically think about and refer to your deployments If the deployment isn't found, the code will raise an exception that you can catch and handle appropriately.
j
@Marvin I am trying to use the above code and successfully got a client with get_cloud_client, but I'm getting this error when trying to get the deployment
Copy code
AttributeError: 'CloudClient' object has no attribute 'read_deployment_by_name'
m
thought for 42.4 seconds
I apologize for the confusion in my previous response. You're right - we need to use a different approach. Here's the corrected code that will work:
Copy code
python
from prefect import get_client
from prefect.client.cloud import get_cloud_client
import asyncio

async def run_deployment_by_name(flow_name: str, deployment_name: str):
    # Connect to Prefect Cloud using your API key
    client = await get_cloud_client()
    
    # Find the deployment by name using the correct method
    deployments = await client.read_deployments(
        flows={flow_name},
        deployment_names={deployment_name}
    )
    
    if not deployments:
        raise ValueError(f"No deployment found with name '{deployment_name}' for flow '{flow_name}'")
    
    deployment = deployments[0]
    
    # Create a flow run using the found deployment
    flow_run = await client.create_flow_run(
        deployment_id=deployment.id,
        name=f"Programmatic run of {deployment_name}",
        parameters={}  # Add any parameters your flow needs
    )
    
    print(f"Created flow run {flow_run.id} for deployment {deployment_name}")
    return flow_run

# Example usage with waiting for completion
async def run_and_wait_by_name(flow_name: str, deployment_name: str):
    client = await get_cloud_client()
    
    # Find the deployment
    deployments = await client.read_deployments(
        flows={flow_name},
        deployment_names={deployment_name}
    )
    
    if not deployments:
        raise ValueError(f"No deployment found with name '{deployment_name}' for flow '{flow_name}'")
    
    deployment = deployments[0]
    
    # Create and run the flow
    flow_run = await client.create_flow_run(
        deployment_id=deployment.id,
        name=f"Programmatic run of {deployment_name}"
    )
    
    # Wait for completion
    flow_run = await client.wait_for_flow_run(
        flow_run.id,
        timeout=300  # 5 minutes timeout
    )
    
    print(f"Flow run completed with state: {flow_run.state.type}")
    return flow_run

# Run the async function
if __name__ == "__main__":
    # Example: if your flow is named "my-flow" and deployment is "prod-deployment"
    asyncio.run(run_deployment_by_name("my-flow", "prod-deployment"))
    
    # Or with waiting:
    # asyncio.run(run_and_wait_by_name("my-flow", "prod-deployment"))
The key changes are: 1. Using
read_deployments()
instead of the non-existent
read_deployment_by_name()
2. Using
create_flow_run()
instead of
create_flow_run_from_deployment()
3. The
flows
and
deployment_names
parameters take sets of strings The
read_deployments()
method will return a list of deployments that match your criteria. Since we're using both the flow name and deployment name, it should return exactly one deployment (or an empty list if not found). This code should now work correctly with the CloudClient. Remember to still set your API key in the environment:
Copy code
python
import os
os.environ["PREFECT_API_KEY"] = "your-api-key-here"
I apologize for the confusion in my earlier response. The API has evolved, and this is the current correct way to accomplish what you want.
j
@Marvin In the debugger I see:
Copy code
(Pdb) dir (client)
['__aenter__', '__aexit__', '__annotations__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_client', 'account_base_url', 'account_id', 'api_healthcheck', 'check_ip_allowlist_access', 'get', 'read_account_ip_allowlist', 'read_account_settings', 'read_current_workspace', 'read_worker_metadata', 'read_workspaces', 'request', 'update_account_ip_allowlist', 'update_account_settings', 'workspace_base_url', 'workspace_id']
There is no "deployments" function listed
m
thought for 10.7 seconds
Ah, I see the issue now. The
CloudClient
we're getting from
get_cloud_client()
is specifically for cloud administration tasks. For working with deployments, we need to use the orchestration client instead. Here's the corrected code:
Copy code
python
from prefect import get_client
import asyncio

async def run_deployment_by_name(flow_name: str, deployment_name: str):
    # Get the Prefect client (this gets the orchestration client)
    client = await get_client()
    
    # Find the deployment by name using the correct method
    deployments = await client.read_deployments(
        flows={flow_name},
        deployment_names={deployment_name}
    )
    
    if not deployments:
        raise ValueError(f"No deployment found with name '{deployment_name}' for flow '{flow_name}'")
    
    deployment = deployments[0]
    
    # Create a flow run using the found deployment
    flow_run = await client.create_flow_run(
        deployment_id=deployment.id,
        name=f"Programmatic run of {deployment_name}",
        parameters={}  # Add any parameters your flow needs
    )
    
    print(f"Created flow run {flow_run.id} for deployment {deployment_name}")
    return flow_run

# Example usage with waiting for completion
async def run_and_wait_by_name(flow_name: str, deployment_name: str):
    client = await get_client()
    
    # Find the deployment
    deployments = await client.read_deployments(
        flows={flow_name},
        deployment_names={deployment_name}
    )
    
    if not deployments:
        raise ValueError(f"No deployment found with name '{deployment_name}' for flow '{flow_name}'")
    
    deployment = deployments[0]
    
    # Create and run the flow
    flow_run = await client.create_flow_run(
        deployment_id=deployment.id,
        name=f"Programmatic run of {deployment_name}"
    )
    
    # Wait for completion
    flow_run = await client.wait_for_flow_run(
        flow_run.id,
        timeout=300  # 5 minutes timeout
    )
    
    print(f"Flow run completed with state: {flow_run.state.type}")
    return flow_run

# Run the async function
if __name__ == "__main__":
    # Example: if your flow is named "my-flow" and deployment is "prod-deployment"
    asyncio.run(run_deployment_by_name("my-flow", "prod-deployment"))
The key change is that we're now using
get_client()
instead of
get_cloud_client()
. The
get_client()
function returns the orchestration client which has all the deployment-related methods we need. Your API key will still be used for authentication, so you still need to set it in your environment:
Copy code
python
import os
os.environ["PREFECT_API_KEY"] = "your-api-key-here"
This should now work correctly. The orchestration client has all the methods we need for working with deployments and flow runs. I apologize for the confusion with the different client types earlier!
j
@Marvin I have a basic function working as follows:
Copy code
async def run_deployment_by_name(
    flow: str = "very-simple-flow",
    deployment: str= "very-simple-flow"
):
    os.environ["PREFECT_API_KEY"] = RUNNER_API_KEY
    client = get_client()
    deployment = await client.read_deployment_by_name(
        f"{flow}/{deployment}"
    )
    flow_run = await client.create_flow_run_from_deployment(
        deployment_id = deployment.id,
        parameters={},
    )
    print(f"Created {flow_run.name}")
If I run my python script from a shell where I have run
prefect cloud login
it works just fine. If I run
prefect cloud logout
from my shell, the script is unable to log in. I am getting the error
Copy code
ValueError: No Prefect API URL provided. Please set PREFECT_API_URL to the address of a running Prefect server.
How do I log in programmatically?
m
thought for 121.5 seconds
Here's how to modify your code to work without requiring `prefect cloud login`:
Copy code
python
import os
from prefect import get_client
import asyncio

# Your API key
RUNNER_API_KEY = "your-api-key-here"

# Get these from your Prefect Cloud URL:
# <https://app.prefect.cloud/account/{ACCOUNT_ID}/workspace/{WORKSPACE_ID}/>
ACCOUNT_ID = "your-account-id"
WORKSPACE_ID = "your-workspace-id"

async def run_deployment_by_name(
    flow: str = "very-simple-flow",
    deployment: str = "very-simple-flow"
):
    # Set both the API key and API URL
    os.environ["PREFECT_API_KEY"] = RUNNER_API_KEY
    os.environ["PREFECT_API_URL"] = f"<https://api.prefect.cloud/api/accounts/{ACCOUNT_ID}/workspaces/{WORKSPACE_ID}>"
    
    client = get_client()
    deployment = await client.read_deployment_by_name(
        f"{flow}/{deployment}"
    )
    flow_run = await client.create_flow_run_from_deployment(
        deployment_id=deployment.id,
        parameters={},
    )
    print(f"Created {flow_run.name}")

# Run the async function
if __name__ == "__main__":
    asyncio.run(run_deployment_by_name())
To get your
ACCOUNT_ID
and `WORKSPACE_ID`: 1. Log into Prefect Cloud in your browser 2. Look at the URL in your browser - it will look like:
<https://app.prefect.cloud/account/[ACCOUNT-ID]/workspace/[WORKSPACE-ID]/>
3. Copy those values into your code For example, if your URL is:
<https://app.prefect.cloud/account/1234567/workspace/89abcdef-ghij-klmn-opqr-stuvwxyz1234/>
Then: -
ACCOUNT_ID = "1234567"
-
WORKSPACE_ID = "89abcdef-ghij-klmn-opqr-stuvwxyz1234"
You can also store these values in environment variables or a configuration file instead of hardcoding them in your script:
Copy code
python
import os
from prefect import get_client
import asyncio

async def run_deployment_by_name(
    flow: str = "very-simple-flow",
    deployment: str = "very-simple-flow"
):
    # Get values from environment variables
    api_key = os.environ["PREFECT_API_KEY"]
    account_id = os.environ["PREFECT_CLOUD_ACCOUNT_ID"]
    workspace_id = os.environ["PREFECT_CLOUD_WORKSPACE_ID"]
    
    # Set the API URL
    os.environ["PREFECT_API_URL"] = f"<https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}>"
    
    client = get_client()
    deployment = await client.read_deployment_by_name(
        f"{flow}/{deployment}"
    )
    flow_run = await client.create_flow_run_from_deployment(
        deployment_id=deployment.id,
        parameters={},
    )
    print(f"Created {flow_run.name}")
Then you can set these environment variables before running your script:
Copy code
bash
export PREFECT_API_KEY="your-api-key"
export PREFECT_CLOUD_ACCOUNT_ID="your-account-id"
export PREFECT_CLOUD_WORKSPACE_ID="your-workspace-id"
python your_script.py
This approach will work without requiring
prefect cloud login
and is more suitable for automated scripts or CI/CD environments.
j
@Marvin I have the following implementation that is getting my account and workspace id:
Copy code
async def run_deployment_by_name(
    flow: str = "very-simple-flow",
    deployment: str= "very-simple-flow"
):
    os.environ["PREFECT_API_KEY"] = RUNNER_API_KEY

    cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
    
    info = await cloud_client.get("/me/workspaces")
    cloud_client.account_id = info[0]['account_id']
    cloud_client.workspace_id = info[0]['workspace_id']
    workspace_base_url = cloud_client.workspace_base_url

    api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
    
    os.environ["PREFECT_API_URL"] = api_url
  
    async with get_client() as client:
        deployment = await client.read_deployment_by_name(
            f"{flow}/{deployment}"
        )
        flow_run = await client.create_flow_run_from_deployment(
            deployment_id = deployment.id,
            parameters={},
        )
        
    print(f"Created {flow_run.name}")
But, the call to
get_client
is still failing with
Copy code
ValueError: No Prefect API URL provided. Please set PREFECT_API_URL to the address of a running Prefect server.
Is there another way to set the api url besides with os.environ?
m
thought for 37.1 seconds
Great! Here's how you can modify your code to set the API URL directly when getting the client:
Copy code
python
async def run_deployment_by_name(
    flow: str = "very-simple-flow",
    deployment: str= "very-simple-flow"
):
    # Get cloud client and workspace info
    cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
    
    info = await cloud_client.get("/me/workspaces")
    cloud_client.account_id = info[0]['account_id']
    cloud_client.workspace_id = info[0]['workspace_id']
    workspace_base_url = cloud_client.workspace_base_url

    api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
    
    # Create client with API URL directly
    from prefect.client.orchestration import PrefectClient
    
    async with PrefectClient(
        api=api_url,
        api_key=RUNNER_API_KEY
    ) as client:
        deployment = await client.read_deployment_by_name(
            f"{flow}/{deployment}"
        )
        flow_run = await client.create_flow_run_from_deployment(
            deployment_id = deployment.id,
            parameters={},
        )
        
    print(f"Created {flow_run.name}")
The key changes are: 1. Removed the environment variable setting 2. Imported
PrefectClient
directly 3. Created the client with explicit
api
and
api_key
parameters Also, I notice you had angle brackets in your API URL string (
<https://...>
). Those should be removed - the URL should be just:
Copy code
python
api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
This approach should work better as it doesn't rely on environment variables and explicitly configures the client with the API URL. Let me know if you're still seeing any issues!
j
@Marvin Can you help me understand why this isn't working? Maybe I'm not setting the URL correctly?
Copy code
async def run_deployment_by_name(
    flow: str = "very-simple-flow",
    deployment: str= "very-simple-flow"
):
    cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
    
    info = await cloud_client.get("/me/workspaces")
    cloud_client.account_id = info[0]['account_id']
    cloud_client.workspace_id = info[0]['workspace_id']
    workspace_base_url = cloud_client.workspace_base_url

    api_url = f"<https://api.prefect.com/api/{workspace_base_url}/>"
    
    client =  PrefectClient( api=api_url, api_key=RUNNER_API_KEY)
    result = await client.hello()
    print("hello!", result)
    return
The error is:
Copy code
httpcore.ConnectError: [Errno -2] Name or service not known
m
thought for 8.2 seconds
Ah, I see two issues in your code! First, the URL has angle brackets and an extra slash, and second, you're using
<http://prefect.com|prefect.com>
instead of
prefect.cloud
. Here's the corrected version:
Copy code
python
async def run_deployment_by_name(
    flow: str = "very-simple-flow",
    deployment: str= "very-simple-flow"
):
    cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
    
    info = await cloud_client.get("/me/workspaces")
    cloud_client.account_id = info[0]['account_id']
    cloud_client.workspace_id = info[0]['workspace_id']
    workspace_base_url = cloud_client.workspace_base_url

    # Remove the angle brackets and use prefect.cloud instead of <http://prefect.com|prefect.com>
    api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
    
    client = PrefectClient(api=api_url, api_key=RUNNER_API_KEY)
    result = await client.hello()
    print("hello!", result)
    return
The changes are: 1. Removed
<
and
>
from the URL 2. Changed
<http://prefect.com|prefect.com>
to
prefect.cloud
3. Removed the trailing slash after
workspace_base_url
Let's also verify what
workspace_base_url
looks like to make sure it's formatted correctly. Could you add this debug print before creating the client?
Copy code
python
    print(f"Debug - workspace_base_url: {workspace_base_url}")
    print(f"Debug - final api_url: {api_url}")
This will help us see exactly what URL is being constructed. The final URL should look something like:
<https://api.prefect.cloud/api/accounts/YOUR_ACCOUNT_ID/workspaces/YOUR_WORKSPACE_ID>
If you're still seeing issues after these changes, please share what those debug prints show!
j
@Marvin OK, my workspace URL is formatted correctly and my function now looks like this:
Copy code
async def run_deployment_by_name(
    flow: str = "very-simple-flow",
    deployment: str= "very-simple-flow"
):
    cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
    
    info = await cloud_client.get("/me/workspaces")
    cloud_client.account_id = info[0]['account_id']
    cloud_client.workspace_id = info[0]['workspace_id']
    workspace_base_url = cloud_client.workspace_base_url

    api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
    
    client =  PrefectClient( api=api_url, api_key=RUNNER_API_KEY)
    deployment = await client.read_deployment_by_name(
        f"{flow}/{deployment}"
    )
    flow_run = await client.create_flow_run_from_deployment(
        deployment_id = deployment.id,
        parameters={},
    )
    print(f"Created {flow_run.name}")
I have successfully retrieved the deployment id, but when I try to create a flow run I get the following error:
Copy code
raise PydanticUserError(self._error_message, code=self._code)
pydantic.errors.PydanticUserError: `StateCreate` is not fully defined; you should define all referenced types, then call `StateCreate.model_rebuild()`.
m
thought for 70.8 seconds
I see the issue. The problem is likely related to the client initialization. Let's modify your code to use an async context manager and ensure we're using the client correctly:
Copy code
python
async def run_deployment_by_name(
    flow: str = "very-simple-flow",
    deployment: str= "very-simple-flow"
):
    cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
    
    info = await cloud_client.get("/me/workspaces")
    cloud_client.account_id = info[0]['account_id']
    cloud_client.workspace_id = info[0]['workspace_id']
    workspace_base_url = cloud_client.workspace_base_url

    # Remove the angle brackets from the URL
    api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
    
    # Use async context manager to properly initialize the client
    async with PrefectClient(api=api_url, api_key=RUNNER_API_KEY) as client:
        deployment = await client.read_deployment_by_name(
            f"{flow}/{deployment}"
        )
        
        # Create flow run with minimal parameters
        flow_run = await client.create_flow_run_from_deployment(
            deployment_id=deployment.id,
            name=f"API-triggered run of {deployment.name}"
        )
        
        print(f"Created flow run {flow_run.name} with ID: {flow_run.id}")
        return flow_run

# Run the async function
if __name__ == "__main__":
    import asyncio
    flow_run = asyncio.run(run_deployment_by_name())
Key changes: 1. Removed angle brackets from the URL 2. Used
async with
context manager for the client 3. Simplified the flow run creation parameters 4. Added proper asyncio handling for running the function If you're still seeing the Pydantic error, let's try an alternative approach using the lower-level API:
Copy code
python
async def run_deployment_by_name(
    flow: str = "very-simple-flow",
    deployment: str= "very-simple-flow"
):
    cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
    
    info = await cloud_client.get("/me/workspaces")
    cloud_client.account_id = info[0]['account_id']
    cloud_client.workspace_id = info[0]['workspace_id']
    workspace_base_url = cloud_client.workspace_base_url

    api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
    
    async with PrefectClient(api=api_url, api_key=RUNNER_API_KEY) as client:
        deployment = await client.read_deployment_by_name(
            f"{flow}/{deployment}"
        )
        
        # Use the lower-level API
        endpoint = f"/deployments/{deployment.id}/create_flow_run"
        flow_run = await <http://client.post|client.post>(
            endpoint,
            json={
                "name": f"API-triggered run of {deployment.name}",
                "parameters": {},
            }
        )
        
        print(f"Created flow run with ID: {flow_run['id']}")
        return flow_run

if __name__ == "__main__":
    import asyncio
    flow_run = asyncio.run(run_deployment_by_name())
This second approach uses the REST API directly, which might avoid the Pydantic model initialization issue. Try both versions and let me know which one works better for you! Also, make sure you have the latest version of Prefect installed, as this could be a version-related issue:
Copy code
bash
pip install --upgrade prefect
j
@Marvin The error is now
Copy code
AttributeError: 'PrefectClient' object has no attribute 'post'
m
thought for 189.9 seconds
Based on the research, let me provide you with the correct way to create a flow run. Here's the updated code:
Copy code
python
async def run_deployment_by_name(
    flow: str = "very-simple-flow",
    deployment: str= "very-simple-flow"
):
    cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
    
    info = await cloud_client.get("/me/workspaces")
    cloud_client.account_id = info[0]['account_id']
    cloud_client.workspace_id = info[0]['workspace_id']
    workspace_base_url = cloud_client.workspace_base_url

    api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
    
    async with PrefectClient(api=api_url, api_key=RUNNER_API_KEY) as client:
        deployment = await client.read_deployment_by_name(
            f"{flow}/{deployment}"
        )
        
        # Create flow run using request method
        flow_run = await client.create_flow_run_from_deployment(
            deployment_id=deployment.id
        )
        
        print(f"Created flow run with ID: {flow_run.id}")
        return flow_run

if __name__ == "__main__":
    import asyncio
    flow_run = asyncio.run(run_deployment_by_name())
The key changes are: 1. Removed the angle brackets from the URL 2. Simplified the
create_flow_run_from_deployment
call to just use the deployment ID 3. Using the correct method name
create_flow_run_from_deployment
instead of trying to use a lower-level API If you're still seeing issues, could you try printing the client's available methods to see what we're working with?
Copy code
python
async def run_deployment_by_name(
    flow: str = "very-simple-flow",
    deployment: str= "very-simple-flow"
):
    cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
    
    info = await cloud_client.get("/me/workspaces")
    cloud_client.account_id = info[0]['account_id']
    cloud_client.workspace_id = info[0]['workspace_id']
    workspace_base_url = cloud_client.workspace_base_url

    api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
    
    async with PrefectClient(api=api_url, api_key=RUNNER_API_KEY) as client:
        # Debug: print available methods
        print("Available methods:", [method for method in dir(client) if not method.startswith('_')])
        
        deployment = await client.read_deployment_by_name(
            f"{flow}/{deployment}"
        )
        print("Deployment found:", deployment)
        
        # Try to create the flow run
        flow_run = await client.create_flow_run_from_deployment(
            deployment_id=deployment.id
        )
        
        print(f"Created flow run with ID: {flow_run.id}")
        return flow_run
This will help us understand exactly what methods are available on the client object and help diagnose any issues.
j
@Marvin I am still getting errors. Here are the outputs, with identifiers snipped out
Copy code
Available methods: ['api_healthcheck', 'api_url', 'api_version', 'apply_slas_for_deployment', 'client_version', 'create_artifact', 'create_automation', 'create_block_document', 'create_block_schema', 'create_block_type', 'create_concurrency_limit', 'create_deployment', 'create_deployment_branch', 'create_deployment_schedules', 'create_flow', 'create_flow_from_name', 'create_flow_run', 'create_flow_run_from_deployment', 'create_flow_run_input', 'create_global_concurrency_limit', 'create_logs', 'create_task_run', 'create_variable', 'create_work_pool', 'create_work_queue', 'decrement_v1_concurrency_slots', 'delete_artifact', 'delete_automation', 'delete_block_document', 'delete_block_type', 'delete_concurrency_limit_by_tag', 'delete_deployment', 'delete_deployment_schedule', 'delete_flow', 'delete_flow_run', 'delete_flow_run_input', 'delete_global_concurrency_limit_by_name', 'delete_resource_owned_automations', 'delete_task_run', 'delete_variable_by_name', 'delete_work_pool', 'delete_work_queue_by_id', 'filter_flow_run_input', 'find_automation', 'get_most_recent_block_schema_for_block_type', 'get_runs_in_work_queue', 'get_scheduled_flow_runs_for_deployments', 'get_scheduled_flow_runs_for_work_pool', 'hello', 'increment_concurrency_slots', 'increment_concurrency_slots_with_lease', 'increment_v1_concurrency_slots', 'logger', 'loop', 'manage_lifespan', 'match_work_queues', 'pause_automation', 'pause_deployment', 'raise_for_api_version_mismatch', 'read_artifacts', 'read_automation', 'read_automations', 'read_automations_by_name', 'read_block_document', 'read_block_document_by_name', 'read_block_documents', 'read_block_documents_by_type', 'read_block_schema_by_checksum', 'read_block_schemas', 'read_block_type_by_slug', 'read_block_types', 'read_concurrency_limit_by_tag', 'read_concurrency_limits', 'read_deployment', 'read_deployment_by_name', 'read_deployment_schedules', 'read_deployments', 'read_flow', 'read_flow_by_name', 'read_flow_run', 'read_flow_run_input', 'read_flow_run_states', 'read_flow_runs', 'read_flows', 'read_global_concurrency_limit_by_name', 'read_global_concurrency_limits', 'read_latest_artifacts', 'read_logs', 'read_resource_related_automations', 'read_task_run', 'read_task_run_states', 'read_task_runs', 'read_variable_by_name', 'read_variables', 'read_work_pool', 'read_work_pools', 'read_work_queue', 'read_work_queue_by_name', 'read_work_queue_status', 'read_work_queues', 'read_worker_metadata', 'read_workers_for_work_pool', 'release_concurrency_slots', 'release_concurrency_slots_with_lease', 'renew_concurrency_lease', 'request', 'reset_concurrency_limit_by_tag', 'resume_automation', 'resume_deployment', 'resume_flow_run', 'send_worker_heartbeat', 'server_type', 'set_deployment_paused_state', 'set_flow_run_name', 'set_flow_run_state', 'set_task_run_name', 'set_task_run_state', 'update_artifact', 'update_automation', 'update_block_document', 'update_block_type', 'update_deployment', 'update_deployment_schedule', 'update_flow_run', 'update_flow_run_labels', 'update_global_concurrency_limit', 'update_variable', 'update_work_pool', 'update_work_queue', 'upsert_global_concurrency_limit_by_name']
Deployment found: id=UUID('snip') name='very-simple-flow' version='0275ba89' version_id=UUID('snip') version_info=VersionInfo(type='vcs:git', version='0275ba89', url='snip', branch='snip', message='documentation - doc strings, docs, doc tech debt', commit_sha='0275ba89a504e421c8bb0480b52d27fbd4038671', repository='snip') branch=None base=None root=None description=None flow_id=UUID('snip') concurrency_limit=None global_concurrency_limit=None concurrency_options=None paused=False schedules=[] job_variables={} parameters={} pull_steps=None tags=[] labels={'prefect.flow.id': 'snip'} work_queue_name=None last_polled=DateTime(2025, 7, 30, 20, 49, 53, 112713, tzinfo=Timezone('UTC')) parameter_openapi_schema={'type': 'object', 'title': 'Parameters', 'properties': {}} path='.' entrypoint='prototype_back_end.py:very_simple_flow' storage_document_id=None infrastructure_document_id=None created_by=CreatedBy(id=UUID('snip'), type='USER', display_value='iota2-xlapak') updated_by=UpdatedBy(id=UUID('snip'), type='USER', display_value='iota2-xlapak') work_queue_id=None enforce_parameter_schema=True work_pool_name=None status=DeploymentStatus.READY
Traceback (most recent call last):
  File "/remote_dataloader/jds-pipeline/jds_pipeline/harbor_backend/marvin_front_end.py", line 40, in <module>
    flow_run = asyncio.run(run_deployment_by_name())
  File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/remote_dataloader/jds-pipeline/jds_pipeline/harbor_backend/marvin_front_end.py", line 31, in run_deployment_by_name
    flow_run = await client.create_flow_run_from_deployment(
  File "/home/echodyne.int/janet/remote_dataloader/.venv/lib/python3.10/site-packages/prefect/client/orchestration/_deployments/client.py", line 1328, in create_flow_run_from_deployment
    state=to_state_create(state),
  File "/remote_dataloader/.venv/lib/python3.10/site-packages/prefect/states.py", line 65, in to_state_create
    return StateCreate(
  File "/remote_dataloader/.venv/lib/python3.10/site-packages/pydantic/main.py", line 253, in __init__
    validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)
  File "/remote_dataloader/.venv/lib/python3.10/site-packages/pydantic/_internal/_mock_val_ser.py", line 100, in __getattr__
    raise PydanticUserError(self._error_message, code=self._code)
pydantic.errors.PydanticUserError: `StateCreate` is not fully defined; you should define all referenced types, then call `StateCreate.model_rebuild()`.

For further information visit <https://errors.pydantic.dev/2.11/u/class-not-fully-defined>
m
thought for 16.8 seconds
Ah, now I see the issue more clearly. This appears to be a Pydantic initialization issue with the state creation. Let's try to specify the state explicitly when creating the flow run.
Copy code
python
from prefect.states import State, StateType

async def run_deployment_by_name(
    flow: str = "very-simple-flow",
    deployment: str= "very-simple-flow"
):
    cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
    
    info = await cloud_client.get("/me/workspaces")
    cloud_client.account_id = info[0]['account_id']
    cloud_client.workspace_id = info[0]['workspace_id']
    workspace_base_url = cloud_client.workspace_base_url

    api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
    
    async with PrefectClient(api=api_url, api_key=RUNNER_API_KEY) as client:
        deployment = await client.read_deployment_by_name(
            f"{flow}/{deployment}"
        )
        print("Deployment found:", deployment)
        
        # Create flow run using create_flow_run instead
        flow_run = await client.create_flow_run(
            deployment_id=deployment.id,
            state=State(
                type=StateType.SCHEDULED,
                name="Scheduled"
            )
        )
        
        print(f"Created flow run with ID: {flow_run.id}")
        return flow_run

if __name__ == "__main__":
    import asyncio
    flow_run = asyncio.run(run_deployment_by_name())
If that still gives us issues, let's try an even simpler approach using just `create_flow_run`:
Copy code
python
async def run_deployment_by_name(
    flow: str = "very-simple-flow",
    deployment: str= "very-simple-flow"
):
    cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
    
    info = await cloud_client.get("/me/workspaces")
    cloud_client.account_id = info[0]['account_id']
    cloud_client.workspace_id = info[0]['workspace_id']
    workspace_base_url = cloud_client.workspace_base_url

    api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
    
    async with PrefectClient(api=api_url, api_key=RUNNER_API_KEY) as client:
        deployment = await client.read_deployment_by_name(
            f"{flow}/{deployment}"
        )
        print("Deployment found:", deployment)
        
        # Create flow run with minimal parameters
        flow_run = await client.create_flow_run(
            deployment_id=deployment.id,
            name=f"API-triggered run of {deployment.name}"
        )
        
        print(f"Created flow run with ID: {flow_run.id}")
        return flow_run

if __name__ == "__main__":
    import asyncio
    flow_run = asyncio.run(run_deployment_by_name())
Let's also check your Prefect version, as this might be version-related:
Copy code
python
import prefect
print(prefect.__version__)
Could you try one of these versions and let me know what version of Prefect you're running? The error suggests you're using Pydantic 2.11, which might be causing compatibility issues with your current Prefect version. If you're still seeing issues, we might need to: 1. Either upgrade/downgrade Prefect to match your Pydantic version 2. Or use a different approach to trigger the flow run Could you share your Prefect version so we can better diagnose the issue?