Janet Carson
07/30/2025, 5:26 PMMarvin
07/30/2025, 5:26 PMMarvin
07/30/2025, 5:28 PMpython
from prefect import get_client
from prefect.client.cloud import get_cloud_client
import asyncio
async def run_my_deployment():
# Connect to Prefect Cloud using your API key
# The API key should be set in your environment as PREFECT_API_KEY
client = await get_cloud_client()
# Create a flow run from your deployment
# Replace these with your actual deployment details
flow_run = await client.create_flow_run_from_deployment(
deployment_id="deployment-id", # Your deployment ID
name="My Programmatic Run", # Optional: custom name for this run
parameters={} # Optional: parameters for your flow
)
print(f"Created flow run {flow_run.id}")
return flow_run
# Run the async function
if __name__ == "__main__":
asyncio.run(run_my_deployment())
Here's what you need to do to use this code:
1. Set up your API key:
bash
export PREFECT_API_KEY='your-api-key-here'
Or in Python before running the code:
python
import os
os.environ["PREFECT_API_KEY"] = "your-api-key-here"
2. Get your deployment ID: You can find this in the Prefect Cloud UI or by using the CLI command prefect deployment ls
3. Add any parameters your flow expects in the parameters
dict
The code does the following:
1. Connects to Prefect Cloud using your API key
2. Creates a new flow run from your specified deployment
3. Returns the flow run object, which contains the ID and other metadata
Important notes:
- The code is async because Prefect's client API is primarily async-based
- The flow run will be created and scheduled according to your deployment's configuration
- You don't need to worry about workers or agents - they will pick up the run automatically
- The service account with "Runner" privileges you created has the correct permissions for this operation
If you need to wait for the flow run to complete, you can add this functionality:
python
async def run_and_wait_for_deployment():
client = await get_cloud_client()
flow_run = await client.create_flow_run_from_deployment(
deployment_id="deployment-id",
name="My Programmatic Run"
)
# Wait for the flow run to complete
flow_run = await client.wait_for_flow_run(
flow_run.id,
timeout=300 # 5 minutes timeout
)
print(f"Flow run completed with state: {flow_run.state.type}")
return flow_run
This extended version will wait for the flow run to complete and show its final state. The timeout parameter prevents infinite waiting if something goes wrong.Janet Carson
07/30/2025, 5:38 PMMarvin
07/30/2025, 5:40 PMMarvin
07/30/2025, 5:41 PMpython
from prefect import get_client
from prefect.client.cloud import get_cloud_client
import asyncio
async def run_deployment_by_name(flow_name: str, deployment_name: str):
# Connect to Prefect Cloud using your API key
client = await get_cloud_client()
# Find the deployment by name
deployment = await client.read_deployment_by_name(
flow_name=flow_name,
deployment_name=deployment_name
)
# Create a flow run using the found deployment
flow_run = await client.create_flow_run_from_deployment(
deployment_id=deployment.id,
name=f"Programmatic run of {deployment_name}",
parameters={} # Add any parameters your flow needs
)
print(f"Created flow run {flow_run.id} for deployment {deployment_name}")
return flow_run
# Example usage with waiting for completion
async def run_and_wait_by_name(flow_name: str, deployment_name: str):
client = await get_cloud_client()
# Find the deployment
deployment = await client.read_deployment_by_name(
flow_name=flow_name,
deployment_name=deployment_name
)
# Create and run the flow
flow_run = await client.create_flow_run_from_deployment(
deployment_id=deployment.id,
name=f"Programmatic run of {deployment_name}"
)
# Wait for completion
flow_run = await client.wait_for_flow_run(
flow_run.id,
timeout=300 # 5 minutes timeout
)
print(f"Flow run completed with state: {flow_run.state.type}")
return flow_run
# Run the async function
if __name__ == "__main__":
# Example: if your flow is named "my-flow" and deployment is "prod-deployment"
asyncio.run(run_deployment_by_name("my-flow", "prod-deployment"))
# Or with waiting:
# asyncio.run(run_and_wait_by_name("my-flow", "prod-deployment"))
To use this code:
1. Set your API key as before:
python
import os
os.environ["PREFECT_API_KEY"] = "your-api-key-here"
2. Call the function with your flow and deployment names:
python
asyncio.run(run_deployment_by_name(
flow_name="your-flow-name",
deployment_name="your-deployment-name"
))
The flow name and deployment name are the ones you used when creating the deployment. For example, if you created your deployment with:
python
flow.from_source(...).deploy(
name="prod-deployment",
work_pool_name="your-pool"
)
Then:
- flow_name
would be the name of your flow (the name of the function decorated with @flow)
- deployment_name
would be "prod-deployment"
This approach is more maintainable because:
1. It doesn't require hardcoding IDs
2. It's more readable (names are more meaningful than IDs)
3. It's more resilient to deployment updates or recreations
4. It matches how you typically think about and refer to your deployments
If the deployment isn't found, the code will raise an exception that you can catch and handle appropriately.Janet Carson
07/30/2025, 6:28 PMAttributeError: 'CloudClient' object has no attribute 'read_deployment_by_name'
Marvin
07/30/2025, 6:28 PMMarvin
07/30/2025, 6:29 PMpython
from prefect import get_client
from prefect.client.cloud import get_cloud_client
import asyncio
async def run_deployment_by_name(flow_name: str, deployment_name: str):
# Connect to Prefect Cloud using your API key
client = await get_cloud_client()
# Find the deployment by name using the correct method
deployments = await client.read_deployments(
flows={flow_name},
deployment_names={deployment_name}
)
if not deployments:
raise ValueError(f"No deployment found with name '{deployment_name}' for flow '{flow_name}'")
deployment = deployments[0]
# Create a flow run using the found deployment
flow_run = await client.create_flow_run(
deployment_id=deployment.id,
name=f"Programmatic run of {deployment_name}",
parameters={} # Add any parameters your flow needs
)
print(f"Created flow run {flow_run.id} for deployment {deployment_name}")
return flow_run
# Example usage with waiting for completion
async def run_and_wait_by_name(flow_name: str, deployment_name: str):
client = await get_cloud_client()
# Find the deployment
deployments = await client.read_deployments(
flows={flow_name},
deployment_names={deployment_name}
)
if not deployments:
raise ValueError(f"No deployment found with name '{deployment_name}' for flow '{flow_name}'")
deployment = deployments[0]
# Create and run the flow
flow_run = await client.create_flow_run(
deployment_id=deployment.id,
name=f"Programmatic run of {deployment_name}"
)
# Wait for completion
flow_run = await client.wait_for_flow_run(
flow_run.id,
timeout=300 # 5 minutes timeout
)
print(f"Flow run completed with state: {flow_run.state.type}")
return flow_run
# Run the async function
if __name__ == "__main__":
# Example: if your flow is named "my-flow" and deployment is "prod-deployment"
asyncio.run(run_deployment_by_name("my-flow", "prod-deployment"))
# Or with waiting:
# asyncio.run(run_and_wait_by_name("my-flow", "prod-deployment"))
The key changes are:
1. Using read_deployments()
instead of the non-existent read_deployment_by_name()
2. Using create_flow_run()
instead of create_flow_run_from_deployment()
3. The flows
and deployment_names
parameters take sets of strings
The read_deployments()
method will return a list of deployments that match your criteria. Since we're using both the flow name and deployment name, it should return exactly one deployment (or an empty list if not found).
This code should now work correctly with the CloudClient. Remember to still set your API key in the environment:
python
import os
os.environ["PREFECT_API_KEY"] = "your-api-key-here"
I apologize for the confusion in my earlier response. The API has evolved, and this is the current correct way to accomplish what you want.Janet Carson
07/30/2025, 6:31 PM(Pdb) dir (client)
['__aenter__', '__aexit__', '__annotations__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_client', 'account_base_url', 'account_id', 'api_healthcheck', 'check_ip_allowlist_access', 'get', 'read_account_ip_allowlist', 'read_account_settings', 'read_current_workspace', 'read_worker_metadata', 'read_workspaces', 'request', 'update_account_ip_allowlist', 'update_account_settings', 'workspace_base_url', 'workspace_id']
There is no "deployments" function listedMarvin
07/30/2025, 6:31 PMMarvin
07/30/2025, 6:31 PMCloudClient
we're getting from get_cloud_client()
is specifically for cloud administration tasks. For working with deployments, we need to use the orchestration client instead. Here's the corrected code:
python
from prefect import get_client
import asyncio
async def run_deployment_by_name(flow_name: str, deployment_name: str):
# Get the Prefect client (this gets the orchestration client)
client = await get_client()
# Find the deployment by name using the correct method
deployments = await client.read_deployments(
flows={flow_name},
deployment_names={deployment_name}
)
if not deployments:
raise ValueError(f"No deployment found with name '{deployment_name}' for flow '{flow_name}'")
deployment = deployments[0]
# Create a flow run using the found deployment
flow_run = await client.create_flow_run(
deployment_id=deployment.id,
name=f"Programmatic run of {deployment_name}",
parameters={} # Add any parameters your flow needs
)
print(f"Created flow run {flow_run.id} for deployment {deployment_name}")
return flow_run
# Example usage with waiting for completion
async def run_and_wait_by_name(flow_name: str, deployment_name: str):
client = await get_client()
# Find the deployment
deployments = await client.read_deployments(
flows={flow_name},
deployment_names={deployment_name}
)
if not deployments:
raise ValueError(f"No deployment found with name '{deployment_name}' for flow '{flow_name}'")
deployment = deployments[0]
# Create and run the flow
flow_run = await client.create_flow_run(
deployment_id=deployment.id,
name=f"Programmatic run of {deployment_name}"
)
# Wait for completion
flow_run = await client.wait_for_flow_run(
flow_run.id,
timeout=300 # 5 minutes timeout
)
print(f"Flow run completed with state: {flow_run.state.type}")
return flow_run
# Run the async function
if __name__ == "__main__":
# Example: if your flow is named "my-flow" and deployment is "prod-deployment"
asyncio.run(run_deployment_by_name("my-flow", "prod-deployment"))
The key change is that we're now using get_client()
instead of get_cloud_client()
. The get_client()
function returns the orchestration client which has all the deployment-related methods we need.
Your API key will still be used for authentication, so you still need to set it in your environment:
python
import os
os.environ["PREFECT_API_KEY"] = "your-api-key-here"
This should now work correctly. The orchestration client has all the methods we need for working with deployments and flow runs. I apologize for the confusion with the different client types earlier!Janet Carson
07/30/2025, 6:39 PMasync def run_deployment_by_name(
flow: str = "very-simple-flow",
deployment: str= "very-simple-flow"
):
os.environ["PREFECT_API_KEY"] = RUNNER_API_KEY
client = get_client()
deployment = await client.read_deployment_by_name(
f"{flow}/{deployment}"
)
flow_run = await client.create_flow_run_from_deployment(
deployment_id = deployment.id,
parameters={},
)
print(f"Created {flow_run.name}")
If I run my python script from a shell where I have run prefect cloud login
it works just fine.
If I run prefect cloud logout
from my shell, the script is unable to log in. I am getting the error
ValueError: No Prefect API URL provided. Please set PREFECT_API_URL to the address of a running Prefect server.
How do I log in programmatically?Marvin
07/30/2025, 6:39 PMMarvin
07/30/2025, 6:41 PMpython
import os
from prefect import get_client
import asyncio
# Your API key
RUNNER_API_KEY = "your-api-key-here"
# Get these from your Prefect Cloud URL:
# <https://app.prefect.cloud/account/{ACCOUNT_ID}/workspace/{WORKSPACE_ID}/>
ACCOUNT_ID = "your-account-id"
WORKSPACE_ID = "your-workspace-id"
async def run_deployment_by_name(
flow: str = "very-simple-flow",
deployment: str = "very-simple-flow"
):
# Set both the API key and API URL
os.environ["PREFECT_API_KEY"] = RUNNER_API_KEY
os.environ["PREFECT_API_URL"] = f"<https://api.prefect.cloud/api/accounts/{ACCOUNT_ID}/workspaces/{WORKSPACE_ID}>"
client = get_client()
deployment = await client.read_deployment_by_name(
f"{flow}/{deployment}"
)
flow_run = await client.create_flow_run_from_deployment(
deployment_id=deployment.id,
parameters={},
)
print(f"Created {flow_run.name}")
# Run the async function
if __name__ == "__main__":
asyncio.run(run_deployment_by_name())
To get your ACCOUNT_ID
and `WORKSPACE_ID`:
1. Log into Prefect Cloud in your browser
2. Look at the URL in your browser - it will look like:
<https://app.prefect.cloud/account/[ACCOUNT-ID]/workspace/[WORKSPACE-ID]/>
3. Copy those values into your code
For example, if your URL is:
<https://app.prefect.cloud/account/1234567/workspace/89abcdef-ghij-klmn-opqr-stuvwxyz1234/>
Then:
- ACCOUNT_ID = "1234567"
- WORKSPACE_ID = "89abcdef-ghij-klmn-opqr-stuvwxyz1234"
You can also store these values in environment variables or a configuration file instead of hardcoding them in your script:
python
import os
from prefect import get_client
import asyncio
async def run_deployment_by_name(
flow: str = "very-simple-flow",
deployment: str = "very-simple-flow"
):
# Get values from environment variables
api_key = os.environ["PREFECT_API_KEY"]
account_id = os.environ["PREFECT_CLOUD_ACCOUNT_ID"]
workspace_id = os.environ["PREFECT_CLOUD_WORKSPACE_ID"]
# Set the API URL
os.environ["PREFECT_API_URL"] = f"<https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}>"
client = get_client()
deployment = await client.read_deployment_by_name(
f"{flow}/{deployment}"
)
flow_run = await client.create_flow_run_from_deployment(
deployment_id=deployment.id,
parameters={},
)
print(f"Created {flow_run.name}")
Then you can set these environment variables before running your script:
bash
export PREFECT_API_KEY="your-api-key"
export PREFECT_CLOUD_ACCOUNT_ID="your-account-id"
export PREFECT_CLOUD_WORKSPACE_ID="your-workspace-id"
python your_script.py
This approach will work without requiring prefect cloud login
and is more suitable for automated scripts or CI/CD environments.Janet Carson
07/30/2025, 7:20 PMasync def run_deployment_by_name(
flow: str = "very-simple-flow",
deployment: str= "very-simple-flow"
):
os.environ["PREFECT_API_KEY"] = RUNNER_API_KEY
cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
info = await cloud_client.get("/me/workspaces")
cloud_client.account_id = info[0]['account_id']
cloud_client.workspace_id = info[0]['workspace_id']
workspace_base_url = cloud_client.workspace_base_url
api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
os.environ["PREFECT_API_URL"] = api_url
async with get_client() as client:
deployment = await client.read_deployment_by_name(
f"{flow}/{deployment}"
)
flow_run = await client.create_flow_run_from_deployment(
deployment_id = deployment.id,
parameters={},
)
print(f"Created {flow_run.name}")
But, the call to get_client
is still failing with
ValueError: No Prefect API URL provided. Please set PREFECT_API_URL to the address of a running Prefect server.
Is there another way to set the api url besides with os.environ?Marvin
07/30/2025, 7:20 PMMarvin
07/30/2025, 7:20 PMpython
async def run_deployment_by_name(
flow: str = "very-simple-flow",
deployment: str= "very-simple-flow"
):
# Get cloud client and workspace info
cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
info = await cloud_client.get("/me/workspaces")
cloud_client.account_id = info[0]['account_id']
cloud_client.workspace_id = info[0]['workspace_id']
workspace_base_url = cloud_client.workspace_base_url
api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
# Create client with API URL directly
from prefect.client.orchestration import PrefectClient
async with PrefectClient(
api=api_url,
api_key=RUNNER_API_KEY
) as client:
deployment = await client.read_deployment_by_name(
f"{flow}/{deployment}"
)
flow_run = await client.create_flow_run_from_deployment(
deployment_id = deployment.id,
parameters={},
)
print(f"Created {flow_run.name}")
The key changes are:
1. Removed the environment variable setting
2. Imported PrefectClient
directly
3. Created the client with explicit api
and api_key
parameters
Also, I notice you had angle brackets in your API URL string (<https://...>
). Those should be removed - the URL should be just:
python
api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
This approach should work better as it doesn't rely on environment variables and explicitly configures the client with the API URL. Let me know if you're still seeing any issues!Janet Carson
07/30/2025, 7:51 PMasync def run_deployment_by_name(
flow: str = "very-simple-flow",
deployment: str= "very-simple-flow"
):
cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
info = await cloud_client.get("/me/workspaces")
cloud_client.account_id = info[0]['account_id']
cloud_client.workspace_id = info[0]['workspace_id']
workspace_base_url = cloud_client.workspace_base_url
api_url = f"<https://api.prefect.com/api/{workspace_base_url}/>"
client = PrefectClient( api=api_url, api_key=RUNNER_API_KEY)
result = await client.hello()
print("hello!", result)
return
The error is:
httpcore.ConnectError: [Errno -2] Name or service not known
Marvin
07/30/2025, 7:51 PMMarvin
07/30/2025, 7:51 PM<http://prefect.com|prefect.com>
instead of prefect.cloud
. Here's the corrected version:
python
async def run_deployment_by_name(
flow: str = "very-simple-flow",
deployment: str= "very-simple-flow"
):
cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
info = await cloud_client.get("/me/workspaces")
cloud_client.account_id = info[0]['account_id']
cloud_client.workspace_id = info[0]['workspace_id']
workspace_base_url = cloud_client.workspace_base_url
# Remove the angle brackets and use prefect.cloud instead of <http://prefect.com|prefect.com>
api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
client = PrefectClient(api=api_url, api_key=RUNNER_API_KEY)
result = await client.hello()
print("hello!", result)
return
The changes are:
1. Removed <
and >
from the URL
2. Changed <http://prefect.com|prefect.com>
to prefect.cloud
3. Removed the trailing slash after workspace_base_url
Let's also verify what workspace_base_url
looks like to make sure it's formatted correctly. Could you add this debug print before creating the client?
python
print(f"Debug - workspace_base_url: {workspace_base_url}")
print(f"Debug - final api_url: {api_url}")
This will help us see exactly what URL is being constructed. The final URL should look something like:
<https://api.prefect.cloud/api/accounts/YOUR_ACCOUNT_ID/workspaces/YOUR_WORKSPACE_ID>
If you're still seeing issues after these changes, please share what those debug prints show!Janet Carson
07/30/2025, 7:56 PMasync def run_deployment_by_name(
flow: str = "very-simple-flow",
deployment: str= "very-simple-flow"
):
cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
info = await cloud_client.get("/me/workspaces")
cloud_client.account_id = info[0]['account_id']
cloud_client.workspace_id = info[0]['workspace_id']
workspace_base_url = cloud_client.workspace_base_url
api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
client = PrefectClient( api=api_url, api_key=RUNNER_API_KEY)
deployment = await client.read_deployment_by_name(
f"{flow}/{deployment}"
)
flow_run = await client.create_flow_run_from_deployment(
deployment_id = deployment.id,
parameters={},
)
print(f"Created {flow_run.name}")
I have successfully retrieved the deployment id, but when I try to create a flow run I get the following error:
raise PydanticUserError(self._error_message, code=self._code)
pydantic.errors.PydanticUserError: `StateCreate` is not fully defined; you should define all referenced types, then call `StateCreate.model_rebuild()`.
Marvin
07/30/2025, 7:58 PMMarvin
07/30/2025, 7:59 PMpython
async def run_deployment_by_name(
flow: str = "very-simple-flow",
deployment: str= "very-simple-flow"
):
cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
info = await cloud_client.get("/me/workspaces")
cloud_client.account_id = info[0]['account_id']
cloud_client.workspace_id = info[0]['workspace_id']
workspace_base_url = cloud_client.workspace_base_url
# Remove the angle brackets from the URL
api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
# Use async context manager to properly initialize the client
async with PrefectClient(api=api_url, api_key=RUNNER_API_KEY) as client:
deployment = await client.read_deployment_by_name(
f"{flow}/{deployment}"
)
# Create flow run with minimal parameters
flow_run = await client.create_flow_run_from_deployment(
deployment_id=deployment.id,
name=f"API-triggered run of {deployment.name}"
)
print(f"Created flow run {flow_run.name} with ID: {flow_run.id}")
return flow_run
# Run the async function
if __name__ == "__main__":
import asyncio
flow_run = asyncio.run(run_deployment_by_name())
Key changes:
1. Removed angle brackets from the URL
2. Used async with
context manager for the client
3. Simplified the flow run creation parameters
4. Added proper asyncio handling for running the function
If you're still seeing the Pydantic error, let's try an alternative approach using the lower-level API:
python
async def run_deployment_by_name(
flow: str = "very-simple-flow",
deployment: str= "very-simple-flow"
):
cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
info = await cloud_client.get("/me/workspaces")
cloud_client.account_id = info[0]['account_id']
cloud_client.workspace_id = info[0]['workspace_id']
workspace_base_url = cloud_client.workspace_base_url
api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
async with PrefectClient(api=api_url, api_key=RUNNER_API_KEY) as client:
deployment = await client.read_deployment_by_name(
f"{flow}/{deployment}"
)
# Use the lower-level API
endpoint = f"/deployments/{deployment.id}/create_flow_run"
flow_run = await <http://client.post|client.post>(
endpoint,
json={
"name": f"API-triggered run of {deployment.name}",
"parameters": {},
}
)
print(f"Created flow run with ID: {flow_run['id']}")
return flow_run
if __name__ == "__main__":
import asyncio
flow_run = asyncio.run(run_deployment_by_name())
This second approach uses the REST API directly, which might avoid the Pydantic model initialization issue. Try both versions and let me know which one works better for you!
Also, make sure you have the latest version of Prefect installed, as this could be a version-related issue:
bash
pip install --upgrade prefect
Janet Carson
07/30/2025, 8:03 PMAttributeError: 'PrefectClient' object has no attribute 'post'
Marvin
07/30/2025, 8:03 PMMarvin
07/30/2025, 8:06 PMpython
async def run_deployment_by_name(
flow: str = "very-simple-flow",
deployment: str= "very-simple-flow"
):
cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
info = await cloud_client.get("/me/workspaces")
cloud_client.account_id = info[0]['account_id']
cloud_client.workspace_id = info[0]['workspace_id']
workspace_base_url = cloud_client.workspace_base_url
api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
async with PrefectClient(api=api_url, api_key=RUNNER_API_KEY) as client:
deployment = await client.read_deployment_by_name(
f"{flow}/{deployment}"
)
# Create flow run using request method
flow_run = await client.create_flow_run_from_deployment(
deployment_id=deployment.id
)
print(f"Created flow run with ID: {flow_run.id}")
return flow_run
if __name__ == "__main__":
import asyncio
flow_run = asyncio.run(run_deployment_by_name())
The key changes are:
1. Removed the angle brackets from the URL
2. Simplified the create_flow_run_from_deployment
call to just use the deployment ID
3. Using the correct method name create_flow_run_from_deployment
instead of trying to use a lower-level API
If you're still seeing issues, could you try printing the client's available methods to see what we're working with?
python
async def run_deployment_by_name(
flow: str = "very-simple-flow",
deployment: str= "very-simple-flow"
):
cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
info = await cloud_client.get("/me/workspaces")
cloud_client.account_id = info[0]['account_id']
cloud_client.workspace_id = info[0]['workspace_id']
workspace_base_url = cloud_client.workspace_base_url
api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
async with PrefectClient(api=api_url, api_key=RUNNER_API_KEY) as client:
# Debug: print available methods
print("Available methods:", [method for method in dir(client) if not method.startswith('_')])
deployment = await client.read_deployment_by_name(
f"{flow}/{deployment}"
)
print("Deployment found:", deployment)
# Try to create the flow run
flow_run = await client.create_flow_run_from_deployment(
deployment_id=deployment.id
)
print(f"Created flow run with ID: {flow_run.id}")
return flow_run
This will help us understand exactly what methods are available on the client object and help diagnose any issues.Janet Carson
07/30/2025, 8:54 PMAvailable methods: ['api_healthcheck', 'api_url', 'api_version', 'apply_slas_for_deployment', 'client_version', 'create_artifact', 'create_automation', 'create_block_document', 'create_block_schema', 'create_block_type', 'create_concurrency_limit', 'create_deployment', 'create_deployment_branch', 'create_deployment_schedules', 'create_flow', 'create_flow_from_name', 'create_flow_run', 'create_flow_run_from_deployment', 'create_flow_run_input', 'create_global_concurrency_limit', 'create_logs', 'create_task_run', 'create_variable', 'create_work_pool', 'create_work_queue', 'decrement_v1_concurrency_slots', 'delete_artifact', 'delete_automation', 'delete_block_document', 'delete_block_type', 'delete_concurrency_limit_by_tag', 'delete_deployment', 'delete_deployment_schedule', 'delete_flow', 'delete_flow_run', 'delete_flow_run_input', 'delete_global_concurrency_limit_by_name', 'delete_resource_owned_automations', 'delete_task_run', 'delete_variable_by_name', 'delete_work_pool', 'delete_work_queue_by_id', 'filter_flow_run_input', 'find_automation', 'get_most_recent_block_schema_for_block_type', 'get_runs_in_work_queue', 'get_scheduled_flow_runs_for_deployments', 'get_scheduled_flow_runs_for_work_pool', 'hello', 'increment_concurrency_slots', 'increment_concurrency_slots_with_lease', 'increment_v1_concurrency_slots', 'logger', 'loop', 'manage_lifespan', 'match_work_queues', 'pause_automation', 'pause_deployment', 'raise_for_api_version_mismatch', 'read_artifacts', 'read_automation', 'read_automations', 'read_automations_by_name', 'read_block_document', 'read_block_document_by_name', 'read_block_documents', 'read_block_documents_by_type', 'read_block_schema_by_checksum', 'read_block_schemas', 'read_block_type_by_slug', 'read_block_types', 'read_concurrency_limit_by_tag', 'read_concurrency_limits', 'read_deployment', 'read_deployment_by_name', 'read_deployment_schedules', 'read_deployments', 'read_flow', 'read_flow_by_name', 'read_flow_run', 'read_flow_run_input', 'read_flow_run_states', 'read_flow_runs', 'read_flows', 'read_global_concurrency_limit_by_name', 'read_global_concurrency_limits', 'read_latest_artifacts', 'read_logs', 'read_resource_related_automations', 'read_task_run', 'read_task_run_states', 'read_task_runs', 'read_variable_by_name', 'read_variables', 'read_work_pool', 'read_work_pools', 'read_work_queue', 'read_work_queue_by_name', 'read_work_queue_status', 'read_work_queues', 'read_worker_metadata', 'read_workers_for_work_pool', 'release_concurrency_slots', 'release_concurrency_slots_with_lease', 'renew_concurrency_lease', 'request', 'reset_concurrency_limit_by_tag', 'resume_automation', 'resume_deployment', 'resume_flow_run', 'send_worker_heartbeat', 'server_type', 'set_deployment_paused_state', 'set_flow_run_name', 'set_flow_run_state', 'set_task_run_name', 'set_task_run_state', 'update_artifact', 'update_automation', 'update_block_document', 'update_block_type', 'update_deployment', 'update_deployment_schedule', 'update_flow_run', 'update_flow_run_labels', 'update_global_concurrency_limit', 'update_variable', 'update_work_pool', 'update_work_queue', 'upsert_global_concurrency_limit_by_name']
Deployment found: id=UUID('snip') name='very-simple-flow' version='0275ba89' version_id=UUID('snip') version_info=VersionInfo(type='vcs:git', version='0275ba89', url='snip', branch='snip', message='documentation - doc strings, docs, doc tech debt', commit_sha='0275ba89a504e421c8bb0480b52d27fbd4038671', repository='snip') branch=None base=None root=None description=None flow_id=UUID('snip') concurrency_limit=None global_concurrency_limit=None concurrency_options=None paused=False schedules=[] job_variables={} parameters={} pull_steps=None tags=[] labels={'prefect.flow.id': 'snip'} work_queue_name=None last_polled=DateTime(2025, 7, 30, 20, 49, 53, 112713, tzinfo=Timezone('UTC')) parameter_openapi_schema={'type': 'object', 'title': 'Parameters', 'properties': {}} path='.' entrypoint='prototype_back_end.py:very_simple_flow' storage_document_id=None infrastructure_document_id=None created_by=CreatedBy(id=UUID('snip'), type='USER', display_value='iota2-xlapak') updated_by=UpdatedBy(id=UUID('snip'), type='USER', display_value='iota2-xlapak') work_queue_id=None enforce_parameter_schema=True work_pool_name=None status=DeploymentStatus.READY
Traceback (most recent call last):
File "/remote_dataloader/jds-pipeline/jds_pipeline/harbor_backend/marvin_front_end.py", line 40, in <module>
flow_run = asyncio.run(run_deployment_by_name())
File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/remote_dataloader/jds-pipeline/jds_pipeline/harbor_backend/marvin_front_end.py", line 31, in run_deployment_by_name
flow_run = await client.create_flow_run_from_deployment(
File "/home/echodyne.int/janet/remote_dataloader/.venv/lib/python3.10/site-packages/prefect/client/orchestration/_deployments/client.py", line 1328, in create_flow_run_from_deployment
state=to_state_create(state),
File "/remote_dataloader/.venv/lib/python3.10/site-packages/prefect/states.py", line 65, in to_state_create
return StateCreate(
File "/remote_dataloader/.venv/lib/python3.10/site-packages/pydantic/main.py", line 253, in __init__
validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)
File "/remote_dataloader/.venv/lib/python3.10/site-packages/pydantic/_internal/_mock_val_ser.py", line 100, in __getattr__
raise PydanticUserError(self._error_message, code=self._code)
pydantic.errors.PydanticUserError: `StateCreate` is not fully defined; you should define all referenced types, then call `StateCreate.model_rebuild()`.
For further information visit <https://errors.pydantic.dev/2.11/u/class-not-fully-defined>
Marvin
07/30/2025, 8:54 PMMarvin
07/30/2025, 8:55 PMpython
from prefect.states import State, StateType
async def run_deployment_by_name(
flow: str = "very-simple-flow",
deployment: str= "very-simple-flow"
):
cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
info = await cloud_client.get("/me/workspaces")
cloud_client.account_id = info[0]['account_id']
cloud_client.workspace_id = info[0]['workspace_id']
workspace_base_url = cloud_client.workspace_base_url
api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
async with PrefectClient(api=api_url, api_key=RUNNER_API_KEY) as client:
deployment = await client.read_deployment_by_name(
f"{flow}/{deployment}"
)
print("Deployment found:", deployment)
# Create flow run using create_flow_run instead
flow_run = await client.create_flow_run(
deployment_id=deployment.id,
state=State(
type=StateType.SCHEDULED,
name="Scheduled"
)
)
print(f"Created flow run with ID: {flow_run.id}")
return flow_run
if __name__ == "__main__":
import asyncio
flow_run = asyncio.run(run_deployment_by_name())
If that still gives us issues, let's try an even simpler approach using just `create_flow_run`:
python
async def run_deployment_by_name(
flow: str = "very-simple-flow",
deployment: str= "very-simple-flow"
):
cloud_client = get_cloud_client(api_key=RUNNER_API_KEY)
info = await cloud_client.get("/me/workspaces")
cloud_client.account_id = info[0]['account_id']
cloud_client.workspace_id = info[0]['workspace_id']
workspace_base_url = cloud_client.workspace_base_url
api_url = f"<https://api.prefect.cloud/api/{workspace_base_url}>"
async with PrefectClient(api=api_url, api_key=RUNNER_API_KEY) as client:
deployment = await client.read_deployment_by_name(
f"{flow}/{deployment}"
)
print("Deployment found:", deployment)
# Create flow run with minimal parameters
flow_run = await client.create_flow_run(
deployment_id=deployment.id,
name=f"API-triggered run of {deployment.name}"
)
print(f"Created flow run with ID: {flow_run.id}")
return flow_run
if __name__ == "__main__":
import asyncio
flow_run = asyncio.run(run_deployment_by_name())
Let's also check your Prefect version, as this might be version-related:
python
import prefect
print(prefect.__version__)
Could you try one of these versions and let me know what version of Prefect you're running? The error suggests you're using Pydantic 2.11, which might be causing compatibility issues with your current Prefect version.
If you're still seeing issues, we might need to:
1. Either upgrade/downgrade Prefect to match your Pydantic version
2. Or use a different approach to trigger the flow run
Could you share your Prefect version so we can better diagnose the issue?