Scott Pedersen
07/07/2025, 5:56 AMwith tags
context manager work for concurrency?Marvin
07/07/2025, 5:56 AMMarvin
07/07/2025, 5:56 AMwith tags
context manager in Prefect is a powerful tool for controlling concurrency and organizing task execution. Here's how it works:
1. Basic Usage:
python
from prefect import flow, task, tags
@task
async def my_task():
pass
@flow
def my_flow():
with tags("database"):
# All tasks here get the "database" tag
my_task()
with tags("api", "external"):
# These tasks get both "api" and "external" tags
my_task()
2. Concurrency Control:
- Tags can be used to set concurrency limits for groups of tasks
- When a task runs, it must have available slots for ALL of its tags
- Limits are respected across all flow runs in your Prefect instance
3. Key Behaviors:
- Tags are cumulative and merge with existing tags
- Nested tag contexts combine their tags
- Tags apply to all tasks within their context
- Concurrency limits are enforced per-tag
To set concurrency limits for tagged tasks, you can use the Prefect UI or the API. For example:
python
from prefect.concurrency import configure_concurrent_slots
# Limit tasks with "database" tag to 3 concurrent runs
configure_concurrent_slots(tags=["database"], slots=3)
This means:
- If you have tasks tagged with "database", only 3 can run at once
- Tasks with multiple tags must satisfy ALL their tag limits
- Tasks without concurrency limits run without restrictions
For more details on implementing tag-based concurrency limits, see the official documentation.
Remember that tags are also useful for organization and filtering in the UI, even when not used for concurrency control.Scott Pedersen
07/07/2025, 6:05 AMpython
from prefect import flow, task, tags
@task
async def my_task():
pass
@task
async def my_other_task():
pass
@flow
def my_flow():
with tags("database"):
# All tasks here get the "database" tag
my_task()
# These tasks get both "api" and "external" tags
my_other_task()
If it second flow is started while the first flow is running, will the second flow stop at the with tags("database")
if the first task is still at this point or will it continue on to my_other_task()
?Marvin
07/07/2025, 6:05 AMMarvin
07/07/2025, 6:06 AMpython
from prefect import flow, task, tags
from prefect.concurrency import configure_concurrent_slots
# Configure concurrency limit for "database" tag
configure_concurrent_slots(tags=["database"], slots=1)
@task
async def my_task():
pass
@task
async def my_other_task():
pass
@flow
def my_flow():
with tags("database"):
# All tasks here get the "database" tag
my_task()
# This task has no tags
my_other_task()
In this case:
1. If Flow Run A is executing and is currently running my_task()
(with the "database" tag)
2. When Flow Run B starts:
- It will skip over the tagged section (since it's at capacity)
- It will proceed to execute my_other_task()
immediately since it has no tags or concurrency restrictions
- It will come back to execute the tagged task when a slot becomes available
To achieve what I think you want (complete isolation between flow runs), you should use flow-level concurrency instead of tag-based concurrency. You can do this in two ways:
1. Using a deployment configuration:
python
flow.from_source(...).deploy(
name="my-deployment",
concurrent_limit=1
)
2. Or using the concurrency API for the flow itself:
python
configure_concurrent_slots(flows=["my-flow"], slots=1)
This will ensure that only one instance of your flow runs at a time, regardless of the tags within it.Scott Pedersen
07/07/2025, 6:08 AMMarvin
07/07/2025, 6:08 AMMarvin
07/07/2025, 6:09 AMpython
@flow
def my_flow():
with tags("database"):
# All tasks here get the "database" tag
my_task()
# This task has no tags
my_other_task()
If you have a concurrency limit set on the "database" tag, and Flow Run B starts while Flow Run A is still executing the tagged task:
- Flow Run B will indeed proceed to execute my_other_task()
while waiting for the "database" tagged task to become available
- The concurrency control in 2.x happens at the task level, not the flow level
If you want to prevent multiple flow runs from executing simultaneously in 2.x, you should use one of these approaches:
1. Set up a flow-level concurrency limit using tags:
python
@flow(tags=["my-flow"])
def my_flow():
...
# Then set concurrency limit for the flow tag
from prefect.client import get_client
async with get_client() as client:
await client.create_concurrency_limit(
tag="my-flow",
concurrency_limit=1
)
2. Or use a Worker with a concurrency limit set to 1 (this would limit all flows running on that worker)
The key difference in 2.x is the API for setting concurrency limits, but the fundamental behavior of tagged vs untagged tasks remains the same - untagged tasks will execute even while tagged tasks are waiting for concurrency slots to become available.Marvin
07/07/2025, 6:12 AMMarvin
07/07/2025, 6:13 AMScott Pedersen
07/07/2025, 6:14 AMwith tags
be nested inside another with tags
for example?
with tags(x):
my_task_1()
my_task_2()
with tags(y):
my_task_3()
Marvin
07/07/2025, 6:14 AMMarvin
07/07/2025, 6:15 AMwith tags
contexts! Here's how it works:
python
from prefect import flow, task, tags
@task
def my_task_1():
pass
@task
def my_task_2():
pass
@task
def my_task_3():
pass
@flow
def my_flow():
with tags("x"):
my_task_1() # Has tag: "x"
my_task_2() # Has tag: "x"
with tags("y"):
my_task_3() # Has BOTH tags: "x" and "y"
In this case:
- my_task_1
and my_task_2
will have only the tag "x"
- my_task_3
will have both tags "x" and "y"
This means:
- If you have concurrency limits on both tags, my_task_3
will need to satisfy both limits to run
- You can nest as many tag contexts as needed
- Tags are cumulative - inner contexts inherit all tags from their outer contexts
This is particularly useful when you want to apply both broad and specific concurrency controls. For example:
python
@flow
def my_flow():
with tags("database"): # Limit all database operations
with tags("write"): # Additional limit on write operations
write_to_db()
with tags("read"): # Different limit for read operations
read_from_db()
In this case, you could set different concurrency limits for:
- All database operations (via "database" tag)
- Write operations specifically (must satisfy both "database" and "write" limits)
- Read operations specifically (must satisfy both "database" and "read" limits)Scott Pedersen
07/08/2025, 3:20 AMconcurrency_tag = f"{data_product_name}-{deployment_name}"
Can I reuse the same tag more than once, for example
python
@flow
def my_flow():
with tags("data_product_name-deployment_name"):
task_1()
task_2()
with tags("data_product_name-deployment_name"):
task_3()
or do they have to be unique?Marvin
07/08/2025, 3:20 AMMarvin
07/08/2025, 3:21 AMpython
@flow
def my_flow():
with tags("data_product_name-deployment_name"):
task_1()
task_2()
with tags("data_product_name-deployment_name"):
task_3()
Both task_1()
and task_3()
will be subject to the same concurrency limit (if one is set for that tag), while task_2()
will run independently of that concurrency constraint.
A few important points to note:
1. If you haven't set a specific concurrency limit for the tag, the tasks will run without any concurrency restrictions.
2. To set a concurrency limit, you'll need to configure it using the Prefect API or CLI.
3. The concurrency limit applies across all flows and tasks using that tag, not just within a single flow.
For example, if you set a concurrency limit of 2 for the tag "data_product_name-deployment_name", only 2 tasks with that tag can run simultaneously, regardless of which flow or deployment they belong to.
This pattern is particularly useful for your use case where you want to control concurrency at a deployment level, as you can apply the same tag to multiple tasks that should share the same concurrency constraints.Scott Pedersen
07/08/2025, 3:24 AMMarvin
07/08/2025, 3:24 AMMarvin
07/08/2025, 3:24 AMdata_product_name
and deployment_name
will create a distinct tag
2. The concurrency limit will only apply to tasks sharing the exact same tag value
3. Different data products or different deployments will have different tags and won't interfere with each other
For example:
python
# These would be different tags and wouldn't affect each other:
"product_a-deployment_1" # Only affects this specific product/deployment combo
"product_a-deployment_2" # Different deployment, different tag
"product_b-deployment_1" # Different product, different tag
# These would share concurrency limits:
@flow
def my_flow(data_product_name="product_a", deployment_name="deployment_1"):
concurrency_tag = f"{data_product_name}-{deployment_name}"
with tags(concurrency_tag):
task_1() # These tasks share the same concurrency limit
task_2() # This task runs independently
with tags(concurrency_tag):
task_3() # These tasks share the same concurrency limit
So if you have multiple flows running simultaneously:
- Two instances of product_a-deployment_1
will respect the concurrency limit
- But product_a-deployment_1
and product_b-deployment_1
can run simultaneously without affecting each other
- And product_a-deployment_1
and product_a-deployment_2
can also run simultaneously
This gives you precise control over concurrency at exactly the granularity you want - per data product and deployment combination.Scott Pedersen
07/09/2025, 1:06 AMMarvin
07/09/2025, 1:06 AMMarvin
07/09/2025, 1:07 AMpython
from prefect import flow
from prefect.client import get_client
from prefect.states import Cancelled
async def check_and_cancel_concurrent_flows(deployment_id):
async with get_client() as client:
# Get all running flows for this deployment
flow_runs = await client.read_flow_runs(
deployment_id=deployment_id,
states=["RUNNING", "PENDING"]
)
# Sort by start time (newest first)
sorted_runs = sorted(
flow_runs,
key=lambda x: x.start_time or x.expected_start_time,
reverse=True
)
# If there's more than one running flow
if len(sorted_runs) > 1:
# Cancel all but the newest run
for run in sorted_runs[1:]:
await client.set_flow_run_state(
flow_run_id=run.id,
state=Cancelled(message="Cancelled due to concurrent flow run")
)
@flow
def my_flow():
# Your flow logic here
deployment_id = get_deployment_id() # You'll need to implement this based on your context
await check_and_cancel_concurrent_flows(deployment_id)
# Rest of your flow
A few important notes:
1. You'll need to get the deployment ID. This could come from:
- Environment variables
- Flow run context
- Parameters passed to the flow
2. The cancellation will only work if:
- The flows haven't completed yet
- The infrastructure executing the flows supports cancellation
- You have the appropriate permissions
3. You might want to add additional filtering criteria like:
python
flow_runs = await client.read_flow_runs(
deployment_id=deployment_id,
states=["RUNNING", "PENDING"],
start_time={"after": some_timestamp} # To only look at recent runs
)
4. For better cancellation support, you can enable enhanced cancellation:
python
# In your environment variables
PREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION=True
Alternative Approach:
Instead of cancelling flows after they've started, you might want to consider using deployment concurrency limits to prevent multiple flows from running in the first place:
python
from prefect.deployments import Deployment
deployment = Deployment.build_from_flow(
flow=my_flow,
name="my-deployment",
version="1",
concurrency_limit=1 # Only allows one flow run at a time
)
This preventive approach might be more efficient as it stops concurrent runs before they start, rather than cancelling them after they've begun.
Choose the approach that best fits your use case:
- Use concurrency limits if you want to prevent concurrent runs entirely
- Use the cancellation approach if you need more fine-grained control or need to handle edge cases where flows might start concurrently despite other controlsScott Pedersen
07/09/2025, 1:10 AMpython
from prefect.deployments import Deployment
deployment = Deployment.build_from_flow(
flow=my_flow,
name="my-deployment",
version="1",
concurrency_limit=1 # Only allows one flow run at a time
)
Marvin
07/09/2025, 1:10 AMMarvin
07/09/2025, 1:10 AMDeployment.build_from_flow()
does work, the recommended approach is to use the DeploymentSpec
class instead. Here's the correct way to set a concurrency limit for a deployment in Prefect 2.x:
python
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import SubprocessFlowRunner
deployment = DeploymentSpec(
name="my-deployment",
flow=my_flow,
version="1",
flow_runner=SubprocessFlowRunner(),
tags=["my-tag"],
parameters={},
schedule=None,
concurrency_limit=1 # Only allows one flow run at a time
)
Or alternatively, you can set the concurrency limit through the CLI after creating the deployment:
bash
prefect deployment set-concurrency-limit "my-flow/my-deployment" 1
Note that in Prefect 3.x, this syntax has changed completely. If you're starting a new project, I'd recommend using Prefect 3.x where the deployment syntax is:
python
from prefect import flow
from prefect.deployments import Deployment
@flow
def my_flow():
pass
deployment = Deployment(
name="my-deployment",
flow=my_flow,
concurrency_limit=1
)
deployment.apply()
Scott Pedersen
07/09/2025, 1:15 AMMarvin
07/09/2025, 1:15 AMMarvin
07/09/2025, 1:16 AMpython
from prefect import flow, task, tags
from prefect.client import get_client
from prefect.states import Cancelled, Pending, Running
from datetime import datetime, timedelta
import asyncio
async def check_task_wait_time(task_run_id, max_wait_minutes=10):
async with get_client() as client:
task_run = await client.read_task_run(task_run_id)
# Check if task is in a pending state
if isinstance(task_run.state, Pending):
start_time = task_run.state.timestamp
current_time = datetime.utcnow()
# If task has been waiting longer than max_wait_minutes
if current_time - start_time > timedelta(minutes=max_wait_minutes):
# Get the flow run ID from the task run
flow_run_id = task_run.flow_run_id
# Cancel the entire flow run
await client.set_flow_run_state(
flow_run_id=flow_run_id,
state=Cancelled(
message=f"Cancelled due to task waiting on concurrency limit for more than {max_wait_minutes} minutes"
)
)
return True
return False
@task
async def monitored_task(task_run_id):
while True:
cancelled = await check_task_wait_time(task_run_id)
if cancelled:
raise Exception("Flow cancelled due to excessive wait time")
await asyncio.sleep(60) # Check every minute
@flow
def my_flow():
concurrency_tag = "my-concurrency-tag"
with tags(concurrency_tag):
# Your task that might hit concurrency limits
task_1_future = task_1.submit()
# Start monitoring this task
monitored_task.submit(task_1_future.task_run.id)
# Wait for the task to complete
task_1_result = task_1_future.result()
# Rest of your flow
A few important points about this implementation:
1. The monitoring task runs in parallel with your main task and checks its state periodically
2. You can adjust the monitoring frequency by changing the sleep
duration:
python
await asyncio.sleep(60) # Checks every minute
3. You can customize the maximum wait time:
python
cancelled = await check_task_wait_time(task_run_id, max_wait_minutes=15) # 15 minutes
4. Make sure to enable enhanced cancellation for better cancellation support:
bash
export PREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION=True
Alternative Approach:
If you want a simpler solution, you could also use Prefect's task timeouts, though this would apply to the total task runtime, not just the waiting period:
python
from prefect import task, flow, tags
@task(timeout_seconds=600) # 10 minutes
def my_task():
with tags("my-concurrency-tag"):
# Your task logic here
pass
@flow
def my_flow():
my_task()
The main difference is:
- The first approach specifically monitors the waiting time due to concurrency limits
- The task timeout approach will cancel after a total time period, regardless of whether the task was waiting or executing
Choose the approach that best fits your needs:
- Use the monitoring approach if you specifically want to track concurrency wait times
- Use task timeouts if you're okay with a simpler solution that limits total task durationScott Pedersen
07/09/2025, 1:22 AMwith tag
with all tasks under it. If we have a flow that runs for 3 hours we don't want the next flow to wait for 3 hours before if a flow for the same deployment is already running. We would want to cancel it after a short time periodMarvin
07/09/2025, 1:22 AMMarvin
07/09/2025, 1:22 AMpython
from prefect import flow, task, tags, get_run_logger
from prefect.client import get_client
from prefect.states import Cancelled, Pending
from datetime import datetime, timedelta
import asyncio
async def check_concurrency_wait(concurrency_tag, max_wait_minutes=10):
"""Check if we've been waiting too long due to concurrency limits"""
async with get_client() as client:
# Get the current flow run
flow_run = await client.get_running_flow()
start_time = datetime.utcnow()
while True:
# Get all running flows with this tag
flow_runs = await client.read_flow_runs(
tags=[concurrency_tag],
states=["RUNNING"]
)
# If we're the only one, or no other flows are running, we're good
other_runs = [run for run in flow_runs if run.id != flow_run.id]
if not other_runs:
return True
# Check if we've waited too long
current_time = datetime.utcnow()
if current_time - start_time > timedelta(minutes=max_wait_minutes):
# Cancel our flow run
await client.set_flow_run_state(
flow_run_id=flow_run.id,
state=Cancelled(
message=f"Cancelled due to waiting on concurrent flow for more than {max_wait_minutes} minutes"
)
)
return False
# Wait before checking again
await asyncio.sleep(30) # Check every 30 seconds
@flow
async def my_long_running_flow():
logger = get_run_logger()
concurrency_tag = "my-data-product-deployment1"
# Check if we should proceed or cancel
can_proceed = await check_concurrency_wait(concurrency_tag, max_wait_minutes=5)
if not can_proceed:
<http://logger.info|logger.info>("Flow cancelled due to concurrent flow running")
return
# If we get here, we're clear to run
with tags(concurrency_tag):
# All your tasks go here
await task_1()
await task_2()
# ... more tasks
await task_that_takes_3_hours()
This approach:
1. Checks immediately at flow start if another flow with the same tag is running
2. If another flow is running:
- Waits for a short period (5 minutes in the example)
- If the other flow is still running after that time, cancels this flow
- If the other flow finishes during that time, proceeds with execution
3. Once a flow gains the concurrency slot, it can run for its full duration (3 hours in your case)
You can adjust the timing by changing:
python
max_wait_minutes=5 # How long to wait before cancelling
await asyncio.sleep(30) # How frequently to check
Usage example:
python
@task
async def task_1():
await asyncio.sleep(10)
return "task 1 done"
@task
async def task_that_takes_3_hours():
await asyncio.sleep(10800) # 3 hours
return "long task done"
# Deploy and run the flow
deployment = DeploymentSpec(
flow=my_long_running_flow,
name="long-running-deployment",
tags=["production"],
)
This solution gives you:
1. Quick cancellation if a concurrent flow is already running
2. No long waits for resources that might be tied up for hours
3. Clear logging of why the flow was cancelled
4. The ability to adjust wait times based on your needs
The first flow to get the concurrency slot will run to completion, while subsequent flows will quickly cancel if they can't start within the specified wait time, rather than waiting for hours.Scott Pedersen
07/09/2025, 4:02 AMMarvin
07/09/2025, 4:02 AMMarvin
07/09/2025, 4:03 AMpython
from prefect import flow
from prefect.client import get_client
from prefect.context import get_run_context
async def get_deployment_id():
# Get the current run context
context = get_run_context()
if context.flow_run.deployment_id:
return context.flow_run.deployment_id
# If not found in context, we can look it up by name
async with get_client() as client:
# Get flow name from context
flow_name = context.flow.name
deployment_name = "your-deployment-name" # Replace with your deployment name
deployments = await client.read_deployments(
flows={flow_name},
name={"any_": [deployment_name]}
)
if deployments:
return deployments[0].id
return None
@flow
async def my_flow():
deployment_id = await get_deployment_id()
# Use the deployment_id here
Even simpler, if you just want to check for concurrent runs of the same deployment, you can use the deployment name directly without needing the ID:
python
from prefect import flow
from prefect.client import get_client
from prefect.context import get_run_context
from prefect.states import Cancelled
from datetime import datetime, timedelta
import asyncio
async def check_concurrent_flows(max_wait_minutes=5):
context = get_run_context()
current_flow_run_id = context.flow_run.id
deployment_name = context.flow_run.deployment.name
async with get_client() as client:
start_time = datetime.utcnow()
while True:
# Get all running flows for this deployment
flow_runs = await client.read_flow_runs(
deployment_name=deployment_name,
states=["RUNNING", "PENDING"]
)
# Filter out our own flow run
other_runs = [run for run in flow_runs if run.id != current_flow_run_id]
if not other_runs:
return True
# Check if we've waited too long
if datetime.utcnow() - start_time > timedelta(minutes=max_wait_minutes):
await client.set_flow_run_state(
flow_run_id=current_flow_run_id,
state=Cancelled(
message=f"Cancelled after waiting {max_wait_minutes} minutes for concurrent flow"
)
)
return False
await asyncio.sleep(30)
@flow
async def my_flow():
# Check for concurrent flows at the start
can_proceed = await check_concurrent_flows(max_wait_minutes=5)
if not can_proceed:
return
# Your flow logic here
await task_1()
await task_2()
# etc...
This approach:
1. Automatically gets the deployment name from the flow run context
2. Doesn't require you to manually specify deployment names or IDs
3. Works reliably when the flow is run from a deployment
4. Will cancel the flow if another instance of the same deployment is running
The key advantages are:
- No need to manually track deployment IDs
- Works automatically across all your deployments
- Self-contained and easier to maintain
Remember that this only works when running the flow from a deployment. If you're running the flow directly (not from a deployment), there won't be a deployment context available.Scott Pedersen
07/09/2025, 4:47 AMMarvin
07/09/2025, 4:47 AMMarvin
07/09/2025, 4:48 AMpython
from prefect import flow
from prefect.client import get_client
from prefect.context import get_run_context
from prefect.states import Cancelled
from datetime import datetime, timedelta
import asyncio
async def check_concurrent_flows(max_wait_minutes=5):
context = get_run_context()
current_flow_run_id = context.flow_run.id
deployment_name = context.flow_run.deployment.name
async with get_client() as client:
start_time = datetime.utcnow()
while True:
# Get all running flows for this deployment
flow_runs = await client.read_flow_runs(
deployment_name=deployment_name,
states=["RUNNING", "PENDING"]
)
# Filter out our own flow run and sort by start_time
other_runs = [
run for run in flow_runs
if run.id != current_flow_run_id
]
if not other_runs:
return True
# Sort runs by start time
sorted_runs = sorted(
other_runs,
key=lambda x: x.start_time or x.expected_start_time
)
# Get the oldest running flow
oldest_run = sorted_runs[0]
# If our flow started after the oldest running flow, we should cancel
our_flow = next(run for run in flow_runs if run.id == current_flow_run_id)
our_start_time = our_flow.start_time or our_flow.expected_start_time
oldest_start_time = oldest_run.start_time or oldest_run.expected_start_time
if our_start_time > oldest_start_time:
# We're newer, so we should cancel
if datetime.utcnow() - start_time > timedelta(minutes=max_wait_minutes):
await client.set_flow_run_state(
flow_run_id=current_flow_run_id,
state=Cancelled(
message=f"Cancelled after waiting {max_wait_minutes} minutes: detected older running flow from same deployment"
)
)
return False
else:
# We're the oldest running flow, so we should continue
return True
await asyncio.sleep(30)
@flow
async def my_flow():
logger = get_run_logger()
# Check for concurrent flows at the start
can_proceed = await check_concurrent_flows(max_wait_minutes=5)
if not can_proceed:
<http://logger.info|logger.info>("Flow cancelled due to older concurrent flow running")
return
<http://logger.info|logger.info>("No concurrent flows or we are the oldest flow - proceeding with execution")
# Your flow logic here
await task_1()
await task_2()
# etc...
Key improvements in this version:
1. We sort the flows by start time to determine which is oldest
python
sorted_runs = sorted(
other_runs,
key=lambda x: x.start_time or x.expected_start_time
)
2. We explicitly compare our start time with the oldest running flow
python
if our_start_time > oldest_start_time:
# We're newer, so we should cancel
...
else:
# We're the oldest running flow, so we should continue
return True
3. Only the newer flow(s) will cancel themselves, while the oldest flow continues running
This ensures:
- The first (oldest) flow will always continue running
- Any newer flows will cancel themselves after the wait period
- There's no race condition where both flows might cancel themselves
- Clear logging of why a flow was cancelled
You can adjust the timing parameters as needed:
python
can_proceed = await check_concurrent_flows(max_wait_minutes=2) # Shorter wait time
Marvin
07/09/2025, 4:48 AMpython
<http://logger.info|logger.info>(f"Found concurrent flow run {oldest_run.id} that started at {oldest_start_time}")
This approach provides a robust solution to ensure:
1. Only one flow runs at a time
2. The oldest flow always gets priority
3. Newer flows cancel themselves in a controlled manner
4. No deadlock situation where all flows cancel themselvesScott Pedersen
07/09/2025, 4:50 AMMarvin
07/09/2025, 4:50 AMMarvin
07/09/2025, 4:51 AMpython
from prefect import flow, get_run_logger
from prefect.client import get_client
from prefect.context import get_run_context
from prefect.states import Cancelled
from datetime import datetime, timedelta
import asyncio
async def check_concurrent_flows(max_wait_minutes=5, grace_period_seconds=30):
context = get_run_context()
current_flow_run_id = context.flow_run.id
deployment_name = context.flow_run.deployment.name
logger = get_run_logger()
async with get_client() as client:
# First, wait for a short grace period to ensure all near-simultaneous
# flows have had a chance to start and register
await asyncio.sleep(grace_period_seconds)
start_time = datetime.utcnow()
while True:
# Get all running flows for this deployment
flow_runs = await client.read_flow_runs(
deployment_name=deployment_name,
states=["RUNNING", "PENDING"]
)
# Filter out our own flow run
other_runs = [
run for run in flow_runs
if run.id != current_flow_run_id
]
if not other_runs:
return True
# Get our flow run details
our_flow = next(run for run in flow_runs if run.id == current_flow_run_id)
our_start_time = our_flow.start_time or our_flow.expected_start_time
# Sort all runs by ID (which is guaranteed to be unique and ordered)
# This provides a consistent ordering even for near-simultaneous starts
all_concurrent_runs = sorted(
[our_flow] + other_runs,
key=lambda x: x.id
)
# If we're not the first flow in the sorted list
if all_concurrent_runs[0].id != current_flow_run_id:
if datetime.utcnow() - start_time > timedelta(minutes=max_wait_minutes):
# Log details about the concurrent flows
for run in all_concurrent_runs:
<http://logger.info|logger.info>(
f"Flow run {run.id} started at {run.start_time or run.expected_start_time} "
f"({'current' if run.id == current_flow_run_id else 'other'})"
)
await client.set_flow_run_state(
flow_run_id=current_flow_run_id,
state=Cancelled(
message=(
f"Cancelled after waiting {max_wait_minutes} minutes: "
f"detected concurrent flow run {all_concurrent_runs[0].id}"
)
)
)
return False
else:
# We're the first flow, so we should continue
<http://logger.info|logger.info>(f"We are the primary flow run (ID: {current_flow_run_id})")
return True
await asyncio.sleep(10) # Check more frequently
@flow
async def my_flow():
logger = get_run_logger()
# Check for concurrent flows at the start
can_proceed = await check_concurrent_flows(
max_wait_minutes=5,
grace_period_seconds=30 # Adjust based on your Lambda retry timing
)
if not can_proceed:
<http://logger.info|logger.info>("Flow cancelled due to concurrent flow running")
return
<http://logger.info|logger.info>("Proceeding with flow execution")
# Your flow logic here
await task_1()
await task_2()
# etc...
Key improvements for handling near-simultaneous starts:Marvin
07/09/2025, 4:51 AMpython
await asyncio.sleep(grace_period_seconds)
This allows all near-simultaneous flows to start and register before making any decisions.
2. Use flow run ID for ordering instead of start time:
python
all_concurrent_runs = sorted(
[our_flow] + other_runs,
key=lambda x: x.id
)
Flow run IDs are guaranteed to be unique and ordered, providing a more reliable way to determine which flow should continue.
3. More frequent checking:
python
await asyncio.sleep(10) # Check every 10 seconds instead of 30
4. Added detailed logging:
python
for run in all_concurrent_runs:
<http://logger.info|logger.info>(
f"Flow run {run.id} started at {run.start_time or run.expected_start_time} "
f"({'current' if run.id == current_flow_run_id else 'other'})"
)
You can adjust the timing parameters based on your specific Lambda retry behavior:
python
can_proceed = await check_concurrent_flows(
max_wait_minutes=2, # How long to wait before cancelling
grace_period_seconds=10 # How long to wait for near-simultaneous starts
)
This improved version:
1. Handles near-simultaneous starts reliably
2. Provides consistent ordering even when flows start seconds apart
3. Includes a grace period to account for slight delays in flow registration
4. Gives you detailed logging to track what's happening
5. Uses flow run IDs which are more reliable than timestamps for ordering
For AWS Lambda retries specifically, you might want to:
1. Set the grace_period_seconds
to match your Lambda retry interval
2. Keep the max_wait_minutes
relatively short to avoid unnecessary Lambda execution time
3. Consider adding additional logging about Lambda context if availableScott Pedersen
07/09/2025, 5:28 AMMarvin
07/09/2025, 5:28 AMMarvin
07/09/2025, 5:28 AMpython
from prefect import flow, get_run_logger
from prefect.client import get_client
from prefect.context import get_run_context
from prefect.states import Cancelled
from datetime import datetime, timedelta
import time
def check_concurrent_flows(max_wait_minutes=5, grace_period_seconds=30):
context = get_run_context()
current_flow_run_id = context.flow_run.id
deployment_name = context.flow_run.deployment.name
logger = get_run_logger()
# First, wait for a short grace period to ensure all near-simultaneous
# flows have had a chance to start and register
time.sleep(grace_period_seconds)
with get_client() as client:
start_time = datetime.utcnow()
while True:
# Get all running flows for this deployment
flow_runs = client.read_flow_runs(
deployment_name=deployment_name,
states=["RUNNING", "PENDING"]
)
# Filter out our own flow run
other_runs = [
run for run in flow_runs
if run.id != current_flow_run_id
]
if not other_runs:
return True
# Get our flow run details
our_flow = next(run for run in flow_runs if run.id == current_flow_run_id)
# Sort all runs by ID (which is guaranteed to be unique and ordered)
# This provides a consistent ordering even for near-simultaneous starts
all_concurrent_runs = sorted(
[our_flow] + other_runs,
key=lambda x: x.id
)
# If we're not the first flow in the sorted list
if all_concurrent_runs[0].id != current_flow_run_id:
if datetime.utcnow() - start_time > timedelta(minutes=max_wait_minutes):
# Log details about the concurrent flows
for run in all_concurrent_runs:
<http://logger.info|logger.info>(
f"Flow run {run.id} started at {run.start_time or run.expected_start_time} "
f"({'current' if run.id == current_flow_run_id else 'other'})"
)
client.set_flow_run_state(
flow_run_id=current_flow_run_id,
state=Cancelled(
message=(
f"Cancelled after waiting {max_wait_minutes} minutes: "
f"detected concurrent flow run {all_concurrent_runs[0].id}"
)
)
)
return False
else:
# We're the first flow, so we should continue
<http://logger.info|logger.info>(f"We are the primary flow run (ID: {current_flow_run_id})")
return True
time.sleep(10) # Check more frequently
@flow
def my_flow():
logger = get_run_logger()
# Check for concurrent flows at the start
can_proceed = check_concurrent_flows(
max_wait_minutes=5,
grace_period_seconds=30 # Adjust based on your Lambda retry timing
)
if not can_proceed:
<http://logger.info|logger.info>("Flow cancelled due to concurrent flow running")
return
<http://logger.info|logger.info>("Proceeding with flow execution")
# Your flow logic here
task_1()
task_2()
# etc...
Key changes made:
1. Removed all async/await
keywords
2. Changed asyncio.sleep()
to time.sleep()
3. Changed async with
to regular with
for the client context manager
4. Removed async def
from the flow and function definitionsMarvin
07/09/2025, 5:28 AMpython
@flow
def my_sequential_flow():
logger = get_run_logger()
# Check for concurrent flows at the start
can_proceed = check_concurrent_flows(
max_wait_minutes=2, # Cancel after 2 minutes of waiting
grace_period_seconds=10 # Wait 10 seconds for near-simultaneous starts
)
if not can_proceed:
return
# Your existing sequential tasks
result_1 = task_1()
result_2 = task_2(result_1)
final_result = task_3(result_2)
return final_result
Scott Pedersen
07/09/2025, 5:54 AMread_flow_runs
doesn't have a deployment_id
or a states
parameterMarvin
07/09/2025, 5:54 AMMarvin
07/09/2025, 5:55 AMpython
from prefect import flow, get_run_logger
from prefect.client import get_client
from prefect.context import get_run_context
from prefect.states import Cancelled
from datetime import datetime, timedelta
import time
def check_concurrent_flows(max_wait_minutes=5, grace_period_seconds=30):
context = get_run_context()
current_flow_run_id = context.flow_run.id
deployment_id = context.flow_run.deployment_id
logger = get_run_logger()
# First, wait for a short grace period to ensure all near-simultaneous
# flows have had a chance to start and register
time.sleep(grace_period_seconds)
with get_client() as client:
start_time = datetime.utcnow()
while True:
# Get all running flows for this deployment
flow_runs = client.get_flow_runs(
deployment_id=deployment_id,
state={"type": {"any_": ["RUNNING", "PENDING"]}}
)
# Filter out our own flow run
other_runs = [
run for run in flow_runs
if run.id != current_flow_run_id
]
if not other_runs:
return True
# Get our flow run details
our_flow = next(run for run in flow_runs if run.id == current_flow_run_id)
# Sort all runs by ID (which is guaranteed to be unique and ordered)
all_concurrent_runs = sorted(
[our_flow] + other_runs,
key=lambda x: x.id
)
# If we're not the first flow in the sorted list
if all_concurrent_runs[0].id != current_flow_run_id:
if datetime.utcnow() - start_time > timedelta(minutes=max_wait_minutes):
# Log details about the concurrent flows
for run in all_concurrent_runs:
<http://logger.info|logger.info>(
f"Flow run {run.id} started at {run.start_time} "
f"({'current' if run.id == current_flow_run_id else 'other'})"
)
client.set_flow_run_state(
flow_run_id=current_flow_run_id,
state=Cancelled(
message=(
f"Cancelled after waiting {max_wait_minutes} minutes: "
f"detected concurrent flow run {all_concurrent_runs[0].id}"
)
)
)
return False
else:
# We're the first flow, so we should continue
<http://logger.info|logger.info>(f"We are the primary flow run (ID: {current_flow_run_id})")
return True
time.sleep(10)
@flow
def my_flow():
logger = get_run_logger()
# Check for concurrent flows at the start
can_proceed = check_concurrent_flows(
max_wait_minutes=5,
grace_period_seconds=30
)
if not can_proceed:
<http://logger.info|logger.info>("Flow cancelled due to concurrent flow running")
return
<http://logger.info|logger.info>("Proceeding with flow execution")
# Your flow logic here
task_1()
task_2()
# etc...
Key changes made:
1. Changed read_flow_runs
to get_flow_runs
2. Updated the state filtering syntax to use the correct format: state={"type": {"any_": ["RUNNING", "PENDING"]}}
3. Used deployment_id
instead of deployment_name
4. Removed references to expected_start_time
as it's not available in this version
This version should work correctly with Prefect 2.x. The logic remains the same, but it uses the correct API methods and parameter formats for version 2.x.
You can use it in your flow the same way:Marvin
07/09/2025, 5:55 AMpython
@flow
def my_sequential_flow():
logger = get_run_logger()
can_proceed = check_concurrent_flows(
max_wait_minutes=2,
grace_period_seconds=10
)
if not can_proceed:
return
# Your existing sequential tasks
result_1 = task_1()
result_2 = task_2(result_1)
final_result = task_3(result_2)
return final_result
Scott Pedersen
07/09/2025, 6:21 AMclient.get_flow_runs()
Marvin
07/09/2025, 6:21 AMMarvin
07/09/2025, 6:22 AMpython
from prefect import flow, get_run_logger
from prefect.client import get_client
from prefect.context import get_run_context
from prefect.states import Cancelled
from datetime import datetime, timedelta
import time
def check_concurrent_flows(max_wait_minutes=5, grace_period_seconds=30):
context = get_run_context()
current_flow_run_id = context.flow_run.id
deployment_id = context.flow_run.deployment_id
logger = get_run_logger()
# First, wait for a short grace period to ensure all near-simultaneous
# flows have had a chance to start and register
time.sleep(grace_period_seconds)
with get_client() as client:
start_time = datetime.utcnow()
while True:
# Get all running flows for this deployment
flow_runs = client.query_flow_runs(
deployment_id=deployment_id,
state={"type": ["RUNNING", "PENDING"]}
)
# Filter out our own flow run
other_runs = [
run for run in flow_runs
if run.id != current_flow_run_id
]
if not other_runs:
return True
# Get our flow run details
our_flow = next(run for run in flow_runs if run.id == current_flow_run_id)
# Sort all runs by ID (which is guaranteed to be unique and ordered)
all_concurrent_runs = sorted(
[our_flow] + other_runs,
key=lambda x: x.id
)
# If we're not the first flow in the sorted list
if all_concurrent_runs[0].id != current_flow_run_id:
if datetime.utcnow() - start_time > timedelta(minutes=max_wait_minutes):
# Log details about the concurrent flows
for run in all_concurrent_runs:
<http://logger.info|logger.info>(
f"Flow run {run.id} started at {run.start_time} "
f"({'current' if run.id == current_flow_run_id else 'other'})"
)
client.set_flow_run_state(
flow_run_id=current_flow_run_id,
state=Cancelled(
message=(
f"Cancelled after waiting {max_wait_minutes} minutes: "
f"detected concurrent flow run {all_concurrent_runs[0].id}"
)
)
)
return False
else:
# We're the first flow, so we should continue
<http://logger.info|logger.info>(f"We are the primary flow run (ID: {current_flow_run_id})")
return True
time.sleep(10)
@flow
def my_flow():
logger = get_run_logger()
# Check for concurrent flows at the start
can_proceed = check_concurrent_flows(
max_wait_minutes=5,
grace_period_seconds=30
)
if not can_proceed:
<http://logger.info|logger.info>("Flow cancelled due to concurrent flow running")
return
<http://logger.info|logger.info>("Proceeding with flow execution")
# Your flow logic here
task_1()
task_2()
# etc...
Key changes:
1. Changed get_flow_runs
to query_flow_runs
which is the correct method name in Prefect 2.x
2. Updated the state filtering syntax to match the correct format: state={"type": ["RUNNING", "PENDING"]}
This version should now work correctly with the Prefect 2.x client API. The overall logic remains the same, but it uses the correct method name and parameter format for querying flow runs.Scott Pedersen
07/09/2025, 6:25 AMquery_flow_runs
exists either for Prefect 2.xMarvin
07/09/2025, 6:25 AMMarvin
07/09/2025, 6:26 AMget_runs()
. Here's the corrected version:
python
from prefect import flow, get_run_logger
from prefect.client import get_client
from prefect.context import get_run_context
from prefect.states import Cancelled
from datetime import datetime, timedelta
import time
def check_concurrent_flows(max_wait_minutes=5, grace_period_seconds=30):
context = get_run_context()
current_flow_run_id = context.flow_run.id
deployment_id = context.flow_run.deployment_id
logger = get_run_logger()
# First, wait for a short grace period to ensure all near-simultaneous
# flows have had a chance to start and register
time.sleep(grace_period_seconds)
with get_client() as client:
start_time = datetime.utcnow()
while True:
# Get all running flows for this deployment
flow_runs = client.get_runs(
deployment_id=deployment_id,
state=["Running", "Pending"]
)
# Filter out our own flow run
other_runs = [
run for run in flow_runs
if run.id != current_flow_run_id
]
if not other_runs:
return True
# Get our flow run details
our_flow = next(run for run in flow_runs if run.id == current_flow_run_id)
# Sort all runs by ID (which is guaranteed to be unique and ordered)
all_concurrent_runs = sorted(
[our_flow] + other_runs,
key=lambda x: x.id
)
# If we're not the first flow in the sorted list
if all_concurrent_runs[0].id != current_flow_run_id:
if datetime.utcnow() - start_time > timedelta(minutes=max_wait_minutes):
# Log details about the concurrent flows
for run in all_concurrent_runs:
<http://logger.info|logger.info>(
f"Flow run {run.id} started at {run.start_time} "
f"({'current' if run.id == current_flow_run_id else 'other'})"
)
client.set_flow_run_state(
flow_run_id=current_flow_run_id,
state=Cancelled(
message=(
f"Cancelled after waiting {max_wait_minutes} minutes: "
f"detected concurrent flow run {all_concurrent_runs[0].id}"
)
)
)
return False
else:
# We're the first flow, so we should continue
<http://logger.info|logger.info>(f"We are the primary flow run (ID: {current_flow_run_id})")
return True
time.sleep(10)
@flow
def my_flow():
logger = get_run_logger()
# Check for concurrent flows at the start
can_proceed = check_concurrent_flows(
max_wait_minutes=5,
grace_period_seconds=30
)
if not can_proceed:
<http://logger.info|logger.info>("Flow cancelled due to concurrent flow running")
return
<http://logger.info|logger.info>("Proceeding with flow execution")
# Your flow logic here
task_1()
task_2()
# etc...
Key changes:
1. Changed to use get_runs()
which is the correct method in Prefect 2.x
2. Updated the state parameter to use the correct state names: ["Running", "Pending"]
This version should now work with the Prefect 2.x client API. The logic remains the same, but it uses the correct method name and state names for querying flow runs.Scott Pedersen
07/09/2025, 6:28 AMread_flow_runs
here https://docs-2.prefect.io/latest/api-ref/prefect/client/orchestration/?h=get_runs#prefect.client.orchestration.PrefectClient.read_flow_runs that has a different signature.Marvin
07/09/2025, 6:28 AMMarvin
07/09/2025, 6:28 AMread_flow_runs
method with its correct signature from Prefect 2.x:
python
from prefect import flow, get_run_logger
from prefect.client import get_client
from prefect.context import get_run_context
from prefect.states import Cancelled
from datetime import datetime, timedelta
import time
def check_concurrent_flows(max_wait_minutes=5, grace_period_seconds=30):
context = get_run_context()
current_flow_run_id = context.flow_run.id
deployment_id = context.flow_run.deployment_id
logger = get_run_logger()
# First, wait for a short grace period to ensure all near-simultaneous
# flows have had a chance to start and register
time.sleep(grace_period_seconds)
with get_client() as client:
start_time = datetime.utcnow()
while True:
# Get all running flows for this deployment
flow_runs = client.read_flow_runs(
flow_filter=dict(
deployment_id={"any_": [deployment_id]},
state={"type": {"any_": ["RUNNING", "PENDING"]}}
)
)
# Filter out our own flow run
other_runs = [
run for run in flow_runs
if run.id != current_flow_run_id
]
if not other_runs:
return True
# Get our flow run details
our_flow = next(run for run in flow_runs if run.id == current_flow_run_id)
# Sort all runs by ID (which is guaranteed to be unique and ordered)
all_concurrent_runs = sorted(
[our_flow] + other_runs,
key=lambda x: x.id
)
# If we're not the first flow in the sorted list
if all_concurrent_runs[0].id != current_flow_run_id:
if datetime.utcnow() - start_time > timedelta(minutes=max_wait_minutes):
# Log details about the concurrent flows
for run in all_concurrent_runs:
<http://logger.info|logger.info>(
f"Flow run {run.id} started at {run.start_time} "
f"({'current' if run.id == current_flow_run_id else 'other'})"
)
client.set_flow_run_state(
flow_run_id=current_flow_run_id,
state=Cancelled(
message=(
f"Cancelled after waiting {max_wait_minutes} minutes: "
f"detected concurrent flow run {all_concurrent_runs[0].id}"
)
)
)
return False
else:
# We're the first flow, so we should continue
<http://logger.info|logger.info>(f"We are the primary flow run (ID: {current_flow_run_id})")
return True
time.sleep(10)
@flow
def my_flow():
logger = get_run_logger()
# Check for concurrent flows at the start
can_proceed = check_concurrent_flows(
max_wait_minutes=5,
grace_period_seconds=30
)
if not can_proceed:
<http://logger.info|logger.info>("Flow cancelled due to concurrent flow running")
return
<http://logger.info|logger.info>("Proceeding with flow execution")
# Your flow logic here
task_1()
task_2()
# etc...
Key changes:
1. Using the correct read_flow_runs
method with its proper signature
2. Updated the filter syntax to use the correct format:
python
flow_filter=dict(
deployment_id={"any_": [deployment_id]},
state={"type": {"any_": ["RUNNING", "PENDING"]}}
)
Marvin
07/09/2025, 6:28 AMScott Pedersen
07/09/2025, 6:55 AMFlowRunFilter
Marvin
07/09/2025, 6:55 AMMarvin
07/09/2025, 6:56 AMFlowRunFilter
with `read_flow_runs`:
python
from prefect import flow, get_run_logger
from prefect.client import get_client
from prefect.context import get_run_context
from prefect.states import Cancelled
from datetime import datetime, timedelta
import time
def check_concurrent_flows(max_wait_minutes=5, grace_period_seconds=30):
context = get_run_context()
current_flow_run_id = context.flow_run.id
deployment_id = context.flow_run.deployment_id
logger = get_run_logger()
# First, wait for a short grace period to ensure all near-simultaneous
# flows have had a chance to start and register
time.sleep(grace_period_seconds)
with get_client() as client:
start_time = datetime.utcnow()
while True:
# Get all running flows for this deployment using proper filter
flow_runs = client.read_flow_runs(
deployment_id=deployment_id, # Simple filter for deployment
state_names=["Running", "Pending"] # Filter for states
)
# Filter out our own flow run
other_runs = [
run for run in flow_runs
if run.id != current_flow_run_id
]
if not other_runs:
return True
# Get our flow run details
our_flow = next(run for run in flow_runs if run.id == current_flow_run_id)
# Sort all runs by ID (which is guaranteed to be unique and ordered)
all_concurrent_runs = sorted(
[our_flow] + other_runs,
key=lambda x: x.id
)
# If we're not the first flow in the sorted list
if all_concurrent_runs[0].id != current_flow_run_id:
if datetime.utcnow() - start_time > timedelta(minutes=max_wait_minutes):
# Log details about the concurrent flows
for run in all_concurrent_runs:
<http://logger.info|logger.info>(
f"Flow run {run.id} started at {run.start_time} "
f"({'current' if run.id == current_flow_run_id else 'other'})"
)
client.set_flow_run_state(
flow_run_id=current_flow_run_id,
state=Cancelled(
message=(
f"Cancelled after waiting {max_wait_minutes} minutes: "
f"detected concurrent flow run {all_concurrent_runs[0].id}"
)
)
)
return False
else:
# We're the first flow, so we should continue
<http://logger.info|logger.info>(f"We are the primary flow run (ID: {current_flow_run_id})")
return True
time.sleep(10)
@flow
def my_flow():
logger = get_run_logger()
# Check for concurrent flows at the start
can_proceed = check_concurrent_flows(
max_wait_minutes=5,
grace_period_seconds=30
)
if not can_proceed:
<http://logger.info|logger.info>("Flow cancelled due to concurrent flow running")
return
<http://logger.info|logger.info>("Proceeding with flow execution")
# Your flow logic here
task_1()
task_2()
# etc...
The key changes are in the read_flow_runs
call, which now uses the correct parameters:
- `deployment_id`: Direct filter for the deployment
- `state_names`: List of state names to filter by
This is simpler and more direct than the previous complex filter dictionary approach. The documentation shows these are the correct parameters for filtering flow runs in Prefect 2.x.