Ritesh
07/17/2025, 5:17 AMMarvin
07/17/2025, 5:17 AMMarvin
07/17/2025, 5:18 AMget_client()
takes an optional sync_client
parameter which defaults to False
. When set to True
, it returns a SyncPrefectClient
instead of the standard async PrefectClient
.
All client operations that would normally require await
in the async version can be called directly with the sync client. The sync client handles all the necessary synchronous HTTP requests under the hood.
Is there a specific operation you're trying to perform with the client? I can provide more targeted examples based on your needs.Ritesh
07/17/2025, 5:47 AMMarvin
07/17/2025, 5:47 AMMarvin
07/17/2025, 5:48 AMpython
from prefect import flow
from prefect import get_client
@flow
def orchestrate_deployments():
client = get_client(sync_client=True)
# Deploy and run first flow
first_deployment = first_flow.from_source(
source="<https://github.com/your/repo>",
entrypoint="path/to/flow.py:flow_name"
).deploy(...)
# Create and wait for first flow run
first_run = client.create_flow_run(deployment=first_deployment.id)
first_run_result = client.wait_for_flow_run(first_run.id, timeout=0) # timeout=0 means wait indefinitely
# Only proceed if the first flow succeeded
if first_run_result.state.is_completed():
# Deploy and run second flow
second_deployment = second_flow.from_source(...)
...
2. The issue with multiple deployments of the first flow might be happening because:
- The flow might be retriggering itself due to some configuration
- There might be a loop in the logic that's not visible without seeing the code
- The flow might be failing and retrying (even though you mentioned retries=0)
3. For the pending state issue:
- Check if the work pool and worker are properly configured
- Verify that the worker is actually running and polling for work
- Check if there are any infrastructure or permission issues preventing the worker from picking up the runs
Could you share:
1. A simplified version of your orchestration script?
2. How you're running this script as a Prefect flow?
3. The work pool type you're using?
This would help me provide a more specific solution to your issue.Ritesh
07/17/2025, 6:40 AMMarvin
07/17/2025, 6:40 AMMarvin
07/17/2025, 6:41 AMRitesh
07/17/2025, 6:43 AMMarvin
07/17/2025, 6:43 AMMarvin
07/17/2025, 6:43 AMdeployment.py
directly with Python, it executes the synchronous code as expected. However, when you run it as a Prefect flow (through run.py
), you're essentially creating a flow that creates deployments, which can lead to unexpected behavior.
Let me help you modify your approach. Could you share:
1. How are you deploying the deployment-manager flow in run.py
? Are you using something like:
python
deployment_manager.from_source(...).deploy()
or some other method?
2. What work pool type are you using? (e.g., process, kubernetes, etc.)
In the meantime, here's a suggested modification that should work more reliably:
python
# deployment.py
from prefect import flow, get_client
from prefect.states import State
@flow(name="deployment-manager", retries=0)
def deployment_manager():
# Use sync client
client = get_client(sync_client=True)
# Deploy flow1
print("Deploying flow1...")
flow1_deployment = flow1.from_source(
source="<https://github.com/your/repo>",
entrypoint="code.py:flow1"
).deploy(
name="flow1-deployment",
work_pool_name="your-work-pool",
skip_upload=True # Add this to prevent multiple uploads
)
print(f"Created deployment: {flow1_deployment.id}")
# Run flow1 and wait
print("Starting flow1...")
flow1_run = client.create_flow_run(
deployment=flow1_deployment.id,
state=State(name="SCHEDULED") # Explicitly set initial state
)
print(f"Waiting for flow1 run {flow1_run.id}...")
flow1_result = client.wait_for_flow_run(
flow1_run.id,
timeout=0, # wait indefinitely
return_on_states=[State.COMPLETED, State.FAILED, State.CRASHED]
)
print(f"Flow1 finished with state: {flow1_result.state.name}")
if not flow1_result.state.is_completed():
raise Exception(f"Flow1 failed with state: {flow1_result.state.name}")
# Similar pattern for flow2
print("Deploying flow2...")
# ... rest of the code
And for run.py
, instead of deploying the deployment-manager flow, you might want to run it directly:
python
# run.py
from deployment import deployment_manager
if __name__ == "__main__":
deployment_manager()
If you really need to deploy the deployment-manager as a flow itself, you'll need to handle it differently. Could you share your current run.py
code and I can help modify it specifically?
Some additional suggestions:
1. Add logging statements (as shown above) to track exactly where the process is getting stuck
2. You might want to check if there are any existing deployments before creating new ones:
python
existing_deployments = client.read_deployments(
name="flow1-deployment"
)
if existing_deployments:
# Either use existing deployment or delete it first
pass
3. Make sure your workers are running and able to pick up the work:
python
# You can check work pool status
work_pools = client.read_work_pools()
workers = client.read_workers()
Could you share:
1. Your current run.py
code
2. The work pool type you're using
3. Whether you see any error messages in the logs
4. Whether the workers are running and healthy
This will help me provide a more specific solution to your issue.Ritesh
07/17/2025, 6:46 AMimport os
import sys
from prefect.client.schemas.schedules import CronSchedule
os.environ['PREFECT_API_URL'] = "{api}"
def create_deployment():
from deployment import main
deployment_id = main.from_source(
source="{git_link}",
entrypoint="deployment.py:main"
).deploy(
name="deployment-manager",
work_pool_name="default-agent-pool",
schedule=CronSchedule(cron="18 11 * * *"),
tags=["deployment", "manager", "scheduled"],
ignore_warnings=True
)
if __name__ == "__main__":
create_deployment()
Marvin
07/17/2025, 6:46 AMMarvin
07/17/2025, 6:47 AMbash
prefect work-pool create "process-pool" --type process
Then update your deployment code to use this work pool instead of the agent pool.
Would you like me to explain any part of these changes in more detail? Also, could you confirm:
1. Are you seeing any specific error messages in your logs?
2. Are you able to see the flow runs in the UI when they're created?
3. Is your worker/agent running and healthy?Ritesh
07/17/2025, 7:14 AMMarvin
07/17/2025, 7:14 AMMarvin
07/17/2025, 7:14 AMRitesh
07/17/2025, 7:14 AMMarvin
07/17/2025, 7:15 AMMarvin
07/17/2025, 7:15 AMbash
pip install prefect-github
2. Then modify your code:
python
from prefect_github import GitHubRepository
from prefect.client.schemas.schedules import CronSchedule
# For private repositories, create a GitHub block
github_credentials = GitHubRepository.create(
name="my-github",
repository="<https://github.com/your-org/your-repo>",
reference="main", # or your branch name
access_token="your-token" # if it's a private repository
)
github_credentials.save("my-github")
def create_deployment():
from deployment import main
# For public repository:
deployment_id = main.from_source(
source="<https://github.com/your-org/your-repo>",
entrypoint="deployment.py:main"
).deploy(
name="deployment-manager",
work_pool_name="default-agent-pool",
schedule=CronSchedule(cron="18 11 * * *"),
tags=["deployment", "manager", "scheduled"],
ignore_warnings=True
)
# OR for private repository using the block:
deployment_id = main.from_source(
source="github/my-github", # Reference the saved block
entrypoint="deployment.py:main"
).deploy(
name="deployment-manager",
work_pool_name="default-agent-pool",
schedule=CronSchedule(cron="18 11 * * *"),
tags=["deployment", "manager", "scheduled"],
ignore_warnings=True
)
The error you're seeing is because the deployment is trying to use old-style configuration parameters that are no longer supported in Prefect 3.x. By using one of the methods above, you should resolve the "unexpected additional keys" error.
Would you like me to explain any part of this in more detail? Also, could you share which type of repository you're using (public or private) so I can provide more specific guidance?Ritesh
07/17/2025, 7:17 AMMarvin
07/17/2025, 7:17 AMMarvin
07/17/2025, 7:18 AMyaml
# deployment.yaml
deployments:
- name: "deployment-manager"
entrypoint: "deployment.py:main" # your flow function
schedule:
cron: "18 11 * * *"
tags:
- "deployment"
- "manager"
- "scheduled"
work_pool_name: "default-agent-pool"
# For public repository:
source:
type: "github"
repository: "<https://github.com/your-org/your-repo>"
reference: "main" # or your branch name
# OR for private repository:
# source:
# type: "github"
# repository: "<https://github.com/your-org/your-repo>"
# reference: "main" # or your branch name
# access_token: "{{ GITHUB_ACCESS_TOKEN }}" # Use environment variable
To use this YAML file, you have several options:
1. For public repositories, deploy using:
bash
prefect deploy --file deployment.yaml
2. For private repositories, first set up your GitHub credentials:
bash
# Set your GitHub token as an environment variable
export GITHUB_ACCESS_TOKEN="your-token"
# Then deploy
prefect deploy --file deployment.yaml
3. Alternative for private repos using GitHub block:
First, create a GitHub block:
python
# In a Python script or notebook
from prefect_github import GitHubRepository
github_block = GitHubRepository.create(
name="my-github",
repository="<https://github.com/your-org/your-repo>",
reference="main",
access_token="your-token"
)
github_block.save("my-github")
Then modify your YAML to use the block:
yaml
# deployment.yaml
deployments:
- name: "deployment-manager"
entrypoint: "deployment.py:main"
schedule:
cron: "18 11 * * *"
tags:
- "deployment"
- "manager"
- "scheduled"
work_pool_name: "default-agent-pool"
source:
type: "github"
block: "github/my-github" # Reference the saved block
Some important notes:
1. Make sure your YAML file is in the correct directory structure
2. The entrypoint
should point to your flow function
3. The work_pool_name
should match your existing work pool
4. For private repositories, never commit the access token directly in the YAML file
You can also include additional configuration in the YAML:
yaml
deployments:
- name: "deployment-manager"
entrypoint: "deployment.py:main"
schedule:
cron: "18 11 * * *"
tags:
- "deployment"
- "manager"
- "scheduled"
work_pool_name: "default-agent-pool"
source:
type: "github"
repository: "<https://github.com/your-org/your-repo>"
reference: "main"
is_schedule_active: true # Enable/disable schedule
parameters: {} # Add any flow parameters here
enforce_parameter_schema: true
version: null # Optional version tag
description: "Deployment manager flow" # Optional description
path: null # Optional path within the repository
pull_steps: # Optional pull steps configuration
- prefect.deployments.steps.git_clone
To deploy using this YAML:
1. Save the YAML file in your project directory
2. Run the deployment command:
bash
prefect deploy --file deployment.yaml
3. Verify the deployment:
bash
prefect deployment ls
Would you like me to explain any part of the YAML configuration in more detail? Also, are you working with a public or private repository? This would help me provide more specific guidance.Ritesh
07/17/2025, 7:59 AMMarvin
07/17/2025, 7:59 AMMarvin
07/17/2025, 8:00 AMpython
from prefect import flow
from prefect.client import get_client
from prefect.deployments import run_deployment
from prefect.flows import load_flow_from_source
@flow(name="deployment_creator")
def create_and_run_deployments():
client = get_client(sync_client=True)
# 1. Create first flow deployment
flow1_deployment = flow1.from_source(
source="<https://github.com/your-org/your-repo>",
entrypoint="flows.py:flow1"
).deploy(
name="flow1-deployment",
work_pool_name="your-work-pool",
tags=["flow1"]
)
# 2. Run the deployment and wait for completion
flow1_run = run_deployment(
name="flow1-deployment",
timeout=0, # wait indefinitely
as_subflow=False # Run as independent flow, not as subflow
)
# 3. Only proceed if flow1 succeeded
if not flow1_run.state.is_completed():
raise Exception(f"Flow1 failed with state: {flow1_run.state.name}")
# 4. Create and run second flow deployment
flow2_deployment = flow2.from_source(
source="<https://github.com/your-org/your-repo>",
entrypoint="flows.py:flow2"
).deploy(
name="flow2-deployment",
work_pool_name="your-work-pool",
tags=["flow2"]
)
flow2_run = run_deployment(
name="flow2-deployment",
timeout=0,
as_subflow=False
)
return "All deployments completed"
# For scheduled execution of the deployment creator
@flow(name="scheduled_deployment_manager")
def scheduled_deployment_manager():
return create_and_run_deployments()
Key points to prevent unexpected behavior:
1. Use as_subflow=False
: This ensures the deployments run independently and don't inherit the parent flow's context.
2. Wait for Completion: The run_deployment
function handles waiting for completion automatically.
3. Error Handling: Check the state of each flow run before proceeding.
4. Synchronous Client: Use the sync client for better control flow.
If you need to schedule this deployment creator:
python
# deployment.yaml
deployments:
- name: "deployment-manager"
entrypoint: "your_file.py:scheduled_deployment_manager"
schedule:
cron: "0 0 * * *" # Example schedule
work_pool_name: "your-work-pool"
source:
type: "github"
repository: "<https://github.com/your-org/your-repo>"
reference: "main"
Or programmatically:
python
scheduled_deployment_manager.from_source(
source="<https://github.com/your-org/your-repo>",
entrypoint="your_file.py:scheduled_deployment_manager"
).deploy(
name="deployment-manager",
work_pool_name="your-work-pool",
schedule=(CronSchedule(cron="0 0 * * *")),
)
To verify the behavior, you can:
1. Monitor the flow runs in the UI
2. Check the logs for each deployment
3. Use the client to query deployment states:
python
client = get_client(sync_client=True)
deployments = client.read_deployments()
for deployment in deployments:
print(f"Deployment {deployment.name}: {deployment.status}")
Would you like me to explain any part of this in more detail? Or do you have a specific use case that needs additional customization?Ritesh
07/17/2025, 8:01 AMMarvin
07/17/2025, 8:01 AMMarvin
07/17/2025, 8:01 AMas_subflow
defaults to True
. This is important because it means that by default, when you run a deployment from within a flow, it will run as a subflow of the parent flow.
This default behavior (as_subflow=True
) means:
1. The deployment will run as a child of the current flow
2. The parent flow will wait for the subflow to complete
3. The subflow's state will affect the parent flow's state
4. The runs will be linked in the UI as parent/child
If you're experiencing unexpected behavior with deployments running multiple times or not in the expected order, setting as_subflow=False
might help because:
1. The deployment will run independently
2. It won't inherit context from the parent flow
3. It will be treated as a separate flow run
Here's an example showing the difference:
python
from prefect import flow
from prefect.deployments import run_deployment
@flow
def orchestrator_flow():
# This will run as a subflow (default behavior)
run1 = run_deployment(
name="flow1-deployment" # as_subflow defaults to True
)
# This will run as an independent flow
run2 = run_deployment(
name="flow2-deployment",
as_subflow=False
)
If you're creating deployments that need to run independently of your orchestrator flow, you should explicitly set as_subflow=False
. This is particularly important when:
1. You want the deployments to run independently
2. You're experiencing issues with recursive deployments
3. You need separate logging or monitoring for each flow
4. You want to avoid parent/child flow relationships
Would you like me to provide more examples or explain any specific scenario you're dealing with?Ritesh
07/17/2025, 8:03 AMflow_from_source = self.flows[index].from_source(
source=source_dir,
entrypoint=self.entrypoints[index]
)
# Create deployment
deployment = flow_from_source.to_deployment(
name=deployment_name,
work_pool_name=work_pool_name,
job_variables=self.job_variables[index]
)
how do i define as_subflow parameter hereMarvin
07/17/2025, 8:03 AMMarvin
07/17/2025, 8:03 AMfrom_source
and to_deployment
, the as_subflow
parameter isn't set at this stage - it's a parameter used when actually running the deployment. If you want to control the as_subflow
behavior, you'll need to handle it when you run the deployment.
Here's how you can modify your code to include this control:
python
# First create the deployment as you're doing
flow_from_source = self.flows[index].from_source(
source=source_dir,
entrypoint=self.entrypoints[index]
)
# Create deployment
deployment = flow_from_source.to_deployment(
name=deployment_name,
work_pool_name=work_pool_name,
job_variables=self.job_variables[index]
)
# Deploy it
deployment_id = deployment.apply()
# Then when you want to run it, you can control as_subflow
from prefect.deployments import run_deployment
# Run the deployment with as_subflow=False
flow_run = run_deployment(
name=deployment_name, # or f"{deployment.flow_name}/{deployment_name}"
as_subflow=False, # This will run it independently
timeout=0 # Optional: wait indefinitely for completion
)
If you're running multiple deployments sequentially, you might want to structure it like this:
python
from prefect import flow
from prefect.deployments import run_deployment
@flow
def create_and_run_deployments():
deployments = []
# Create all deployments first
for index in range(len(self.flows)):
flow_from_source = self.flows[index].from_source(
source=source_dir,
entrypoint=self.entrypoints[index]
)
deployment = flow_from_source.to_deployment(
name=f"deployment-{index}",
work_pool_name=work_pool_name,
job_variables=self.job_variables[index]
)
deployment_id = deployment.apply()
deployments.append(deployment)
# Then run them sequentially
for deployment in deployments:
flow_run = run_deployment(
name=deployment.name,
as_subflow=False, # Run independently
timeout=0 # Wait for completion
)
# Optional: Check if the run completed successfully
if not flow_run.state.is_completed():
raise Exception(f"Deployment {deployment.name} failed with state: {flow_run.state.name}")
Key points:
1. to_deployment()
creates the deployment configuration
2. apply()
registers the deployment with Prefect
3. run_deployment()
is where you control the as_subflow
behavior
If you're experiencing issues with deployments running multiple times or not in the expected order, setting as_subflow=False
when running the deployment should help resolve those issues.
Would you like me to explain any part of this in more detail or help with a specific scenario you're dealing with?Ritesh
07/17/2025, 8:04 AMMarvin
07/17/2025, 8:04 AMMarvin
07/17/2025, 8:06 AMparent_task_run_id=None
ensures the flow runs independently
2. Using wait_for_flow_run
helps ensure proper sequencing
3. Checking the final state helps with error handling
Would you like me to explain any part of this in more detail or help with a specific scenario you're dealing with?Ritesh
07/17/2025, 8:09 AMMarvin
07/17/2025, 8:10 AMMarvin
07/17/2025, 8:11 AMpython
from prefect import flow
@flow
def subflow(x):
return x * 2
@flow
def parent_flow():
# This creates a subflow
result = subflow(5)
return result
### Benefits of Subflows:
1. Code Organization: Break down complex workflows
2. Reusability: Use the same flow in different parent flows
3. Visibility: Clear parent-child relationship in the UI
4. Data Sharing: Easy data passing between flows
### Potential Issues with Subflows:
1. Resource Constraints:
python
@flow
def resource_intensive_subflow():
# Uses same resources as parent
large_computation()
@flow
def parent_flow():
# These run in the same process
for _ in range(10):
resource_intensive_subflow() # Might cause memory/CPU issues
2. Error Propagation:
python
@flow
def subflow():
raise Exception("Subflow error")
@flow
def parent_flow():
try:
subflow() # Parent flow will fail if subflow fails
except Exception as e:
# Need explicit handling
pass
3. Complexity with Deep Nesting:
python
@flow
def deep_subflow():
return "deep"
@flow
def middle_subflow():
return deep_subflow()
@flow
def parent_flow():
return middle_subflow() # Can become hard to debug
### Best Practices:
1. Use Independent Flows when you need:
python
from prefect.deployments import run_deployment
@flow
def orchestrator_flow():
# Run as independent flow, not subflow
run_deployment(
name="other-flow/deployment",
as_subflow=False
)
2. Use Subflows when you need:
python
@flow
def data_processing_subflow(data):
return process(data)
@flow
def main_flow():
data = load_data()
# Use subflow for logical grouping
processed = data_processing_subflow(data)
3. Handle Resources Carefully:
python
@flow
def parent_flow():
# Run subflows in parallel with care
from prefect.tasks import task_input_hash
results = []
for item in items:
result = subflow.submit(item) # Careful with memory
results.append(result)
### When to Use What:
Use Subflows When:
- You need tight coupling between flows
- You want to share memory/context
- The flows are part of the same logical process
- You need simple data passing between flows
Use Independent Flows When:
- You need isolation
- Flows should run in separate processes
- You want independent scaling
- You need different resource allocations
- You're orchestrating separate deployments
### Example of Safe Subflow Usage:
python
from prefect import flow, task
from typing import List
@task
def check_resources():
# Verify available memory/CPU
pass
@flow
def safe_subflow(data: List):
check_resources()
return process_chunk(data)
@flow
def parent_flow(all_data: List):
# Break into manageable chunks
chunks = split_data(all_data)
results = []
for chunk in chunks:
# Process in controlled manner
result = safe_subflow(chunk)
results.append(result)
return combine_results(results)
Would you like me to elaborate on any of these aspects or provide more specific examples for your use case?Ritesh
07/17/2025, 8:14 AMMarvin
07/17/2025, 8:14 AMMarvin
07/17/2025, 8:15 AMpython
from prefect import flow
from prefect.deployments import run_deployment
# Instead of this (subflow that can't use different work pool):
@flow
def subflow():
return "result"
@flow
def parent_flow():
result = subflow() # Will run in same work pool as parent
# Do this instead (independent flows that can use different work pools):
@flow
def flow_to_run_separately():
return "result"
@flow
def orchestrator_flow():
# This will run in a different work pool
result = run_deployment(
name="flow-to-run-separately/deployment-name",
as_subflow=False, # Important: run as independent flow
work_queue_name="different-work-queue" # Optional: specify work queue
)
# Deploy the flows to different work pools
flow_to_run_separately.from_source(
source="your_source",
entrypoint="flows.py:flow_to_run_separately"
).deploy(
name="deployment-name",
work_pool_name="different-work-pool" # This flow uses a different work pool
)
orchestrator_flow.from_source(
source="your_source",
entrypoint="flows.py:orchestrator_flow"
).deploy(
name="orchestrator-deployment",
work_pool_name="main-work-pool" # This flow uses the main work pool
)
If you're using the client directly:
python
from prefect import flow
from prefect import get_client
@flow
def orchestrator_flow():
client = get_client(sync_client=True)
# Create flow run in different work pool
flow_run = client.create_flow_run_from_deployment(
deployment_id="deployment-id",
parent_task_run_id=None, # Run independently
)
# Wait for completion if needed
final_state = client.wait_for_flow_run(
flow_run_id=str(flow_run.id),
timeout=0
)
Key points:
1. Subflows always run in the same work pool as their parent flow
2. To use a different work pool, you need to:
- Create a separate deployment for the flow
- Run it independently (not as a subflow)
- Use run_deployment
with as_subflow=False
or use the client
3. This gives you more flexibility but requires proper deployment management
Would you like me to provide more specific examples or explain any part of this in more detail?Ritesh
07/17/2025, 8:28 AMMarvin
07/17/2025, 8:28 AMMarvin
07/17/2025, 8:29 AMpython
# deployment_setup.py
def setup_deployments():
# Create all deployments here, outside of any flow
flow1.from_source(
source="your_repo",
entrypoint="flows.py:flow1"
).deploy(
name="flow1-deployment",
work_pool_name="your-pool"
)
flow2.from_source(
source="your_repo",
entrypoint="flows.py:flow2"
).deploy(
name="flow2-deployment",
work_pool_name="your-pool"
)
if __name__ == "__main__":
setup_deployments()
### 2. If You Must Create Deployments Within a Flow:
python
from prefect import flow
from prefect import get_client
@flow
def deployment_manager():
client = get_client(sync_client=True)
# Check if deployment already exists
existing_deployments = client.read_deployments(
name="flow1-deployment"
)
if not existing_deployments:
# Only create if it doesn't exist
new_deployment = flow1.from_source(
source="your_repo",
entrypoint="flows.py:flow1"
).deploy(
name="flow1-deployment",
work_pool_name="your-pool",
schedule=None # Important: avoid automatic scheduling
)
# Run the deployment
flow_run = client.create_flow_run_from_deployment(
deployment=existing_deployments[0].id if existing_deployments else new_deployment.id
)
### 3. Use Event-Based Triggers Instead:
python
from prefect.deployments import DeploymentEventTrigger
# Create deployments with event triggers
flow2.from_source(
source="your_repo",
entrypoint="flows.py:flow2"
).deploy(
name="flow2-deployment",
work_pool_name="your-pool",
triggers=[
DeploymentEventTrigger(
deployment_name="flow1-deployment",
event_type="completed"
)
]
)
### Best Practices to Avoid Issues:
1. Separate Creation from Execution:
python
# Create deployments in a separate script
# run.py
def setup():
create_all_deployments()
def run_orchestration():
run_existing_deployments()
if __name__ == "__main__":
setup() # Run once
run_orchestration() # Run as needed
2. Use Idempotent Creation:
python
@flow
def safe_deployment_manager():
client = get_client(sync_client=True)
def get_or_create_deployment(name, flow_obj, **kwargs):
existing = client.read_deployments(name=name)
if existing:
return existing[0]
return flow_obj.from_source(...).deploy(
name=name,
**kwargs
)
3. Control Flow Run Creation:
python
@flow
def orchestrate_flows():
client = get_client(sync_client=True)
# Create flow runs with explicit control
flow_run = client.create_flow_run_from_deployment(
deployment_id=deployment_id,
state=None, # Don't auto-schedule
parent_task_run_id=None # Run independently
)
# Wait for completion before proceeding
final_state = client.wait_for_flow_run(
flow_run_id=str(flow_run.id),
timeout=0
)
4. Use Version Control:
python
@flow
def versioned_deployment_manager():
deployment = flow1.from_source(
source="your_repo",
entrypoint="flows.py:flow1"
).deploy(
name=f"flow1-deployment-{version}", # Version your deployments
work_pool_name="your-pool",
version=version # Track versions
)
Key Recommendations:Marvin
07/17/2025, 8:29 AMRitesh
07/17/2025, 9:06 AMMarvin
07/17/2025, 9:06 AMMarvin
07/17/2025, 9:08 AMpython
from prefect import flow
from prefect import get_client
from prefect.artifacts import create_markdown_artifact
@flow
def limited_flow():
client = get_client(sync_client=True)
# Get count of previous runs
flow_runs = client.read_flow_runs(
flow_name="limited_flow",
sort="-start_time"
)
max_runs = 5 # Set your desired limit
current_runs = len(flow_runs)
if current_runs >= max_runs:
message = f"Flow has reached maximum runs ({max_runs})"
create_markdown_artifact(
key="max_runs_reached",
markdown=message,
description="Maximum run limit reached"
)
raise Exception(message)
# Your flow logic here
return "Flow executed successfully"
2. Using Global Concurrency Limits:
python
from prefect import flow
from prefect.client.schemas.objects import ConcurrencyLimit
@flow
def flow_with_concurrency():
client = get_client(sync_client=True)
# Create or update concurrency limit
client.create_concurrency_limit(
tag="my-flow-limit",
concurrency_limit=5
)
# Your flow logic here
3. Using Work Queue Limits:
python
from prefect import flow
@flow
def flow_with_queue_limit():
client = get_client(sync_client=True)
# Set work queue concurrency limit
client.update_work_queue(
name="your-work-queue",
concurrency_limit=5
)
4. Using a Persistent Counter:
python
from prefect import flow
from prefect.blocks.system import JSON
@flow
def flow_with_persistent_counter():
try:
counter_block = JSON.load("run-counter")
count = counter_block.value.get("count", 0)
except:
counter_block = JSON(value={"count": 0})
count = 0
max_runs = 5
if count >= max_runs:
raise Exception(f"Maximum runs ({max_runs}) reached")
# Increment counter
counter_block.value["count"] = count + 1
counter_block.save("run-counter")
# Your flow logic here
5. Using Deployment Configuration:
python
from datetime import timedelta
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
# Limit runs by scheduling window
deployment = Deployment.build_from_flow(
flow=your_flow,
name="limited-deployment",
schedule=CronSchedule(
cron="0 0 * * *", # Daily at midnight
timezone="UTC",
end_date=datetime.utcnow() + timedelta(days=5) # Run for 5 days only
)
)
6. Using Tags and Concurrency Limits:
python
from prefect import flow, tags
@flow
def flow_with_tag_limit():
with tags("limited-flow"):
client = get_client(sync_client=True)
# Set concurrency limit for tag
client.create_concurrency_limit(
tag="limited-flow",
concurrency_limit=5
)
# Your flow logic here
7. Monitoring and Stopping Programmatically:
python
from prefect import flow
from prefect.states import Cancelled
@flow
def monitored_flow():
client = get_client(sync_client=True)
flow_runs = client.read_flow_runs(
flow_name="monitored_flow",
state={"type": "COMPLETED"}
)
if len(flow_runs) >= 5:
# Cancel any future runs
client.set_flow_run_state(
flow_run_id=flow_run_id,
state=Cancelled(message="Maximum run limit reached")
)
return
# Your flow logic here
Best Practices:
1. Document the Limits:
python
@flow(description="This flow is limited to 5 runs")
def limited_flow():
# Add documentation about limits
pass
2. Handle Limit Gracefully:
```python
@flow
def graceful_limited_flow():
try:
# Check limits
if limit_reached():
# Create notification or logMarvin
07/17/2025, 9:08 AM3. *Monitor Limits*:
```python
@flow
def monitored_limited_flow():
client = get_client(sync_client=True)
# Track run counts
runs = client.read_flow_runs(
flow_name="monitored_limited_flow"
)
# Create monitoring artifact
create_markdown_artifact(
key="run-count",
markdown=f"Current run count: {len(runs)}"
)
Would you like me to explain any of these approaches in more detail or provide more specific examples for your use case?