Scott Pedersen
07/21/2025, 11:50 PMMarvin
07/21/2025, 11:50 PMScott Pedersen
07/21/2025, 11:51 PMMarvin
07/21/2025, 11:51 PMMarvin
07/21/2025, 11:51 PMScott Pedersen
07/21/2025, 11:51 PMMarvin
07/21/2025, 11:51 PMMarvin
07/21/2025, 11:52 PMpython
from prefect import flow, task
from prefect.context import get_run_context
@task
def my_task():
context = get_run_context()
# Access task run information
task_run_id = context.task_run.id
task_name = context.task_run.name
return "Done"
@flow
def my_flow():
context = get_run_context()
# Access flow run information
flow_run_id = context.flow_run.id
result = my_task()
2. Using Runtime Module Directly
python
from prefect import flow, task
from prefect.runtime import flow_run, task_run
@task
def my_task():
# Access current task run information
current_task_id = task_run.id
current_task_name = task_run.name
return "Done"
@flow
def my_flow():
# Access current flow run information
current_flow_run_id = flow_run.id
result = my_task()
3. Using the Prefect Client API (for more detailed information)
python
from prefect import flow
from prefect.client import get_client
@flow
def my_flow():
async with get_client() as client:
# Get the current flow run's graph (shows task relationships)
flow_run_graph = await client.read_flow_run_graph(flow_run.id)
# This will show all tasks and their relationships in the current flow run
The most comprehensive way to see all tasks and their relationships is through the Prefect UI, which you can access via the flow_run.ui_url
attribute during execution. This will show you a visual representation of all tasks, their states, and their relationships.
Key points to remember:
- The runtime context provides immediate access to the current task or flow run information
- Each task run has its own unique ID and metadata accessible during execution
- The Prefect Client API provides more detailed information about the flow run graph and task relationships
- You can access URLs to both the API endpoint (api_url
) and UI (ui_url
) for any flow run or task run
This information is particularly useful for monitoring, logging, or creating dependencies based on task execution states within your flows.Marvin
07/21/2025, 11:52 PMpython
from prefect import flow, task
from typing import List
task_list = []
@task
def tracked_task():
return "Hello"
@flow
def my_flow():
# Keep track of task runs as you create them
task_runs = []
# Create and track task runs
task_run = tracked_task.submit()
task_runs.append(task_run)
# You can wait for results
results = [run.result() for run in task_runs]
return results
2. Using the Prefect Client:
You can query task runs associated with a flow run using the Prefect API client, though this is typically done outside the flow run itself:
python
from prefect.client import get_client
async def get_task_runs(flow_run_id):
async with get_client() as client:
task_runs = await client.read_task_runs(flow_run_id=flow_run_id)
return task_runs
3. Task Dependencies:
If you need to track dependencies between tasks, you can use the task's upstream/downstream relationships:
python
@task
def task_a():
return 1
@task
def task_b(x):
return x + 1
@flow
def my_flow():
a = task_a()
b = task_b(a) # b has task_a as upstream dependency
It's worth noting that Prefect 2.x doesn't provide a built-in method to get a comprehensive list of all tasks during flow execution. This is partly by design, as tasks can be created dynamically during flow execution, and the actual task graph might not be fully known until runtime.
If you need to track specific tasks for monitoring or coordination purposes, I'd recommend either:
1. Maintaining your own list of task runs as you create them
2. Using the Prefect UI to monitor task runs
3. Using the Prefect API client to query task runs after the fact
Would you like me to elaborate on any of these approaches or provide more specific examples for your use case?Scott Pedersen
07/21/2025, 11:56 PMMarvin
07/21/2025, 11:56 PMMarvin
07/21/2025, 11:57 PMpython
from prefect import flow, task, get_run_context
@task
def task_a():
context = get_run_context()
print(f"Currently running: {context.task_run.task_name}")
return "A"
@task
def task_b():
context = get_run_context()
print(f"Currently running: {context.task_run.task_name}")
return "B"
@task
def task_c():
context = get_run_context()
print(f"Currently running: {context.task_run.task_name}")
return "C"
@flow
def my_flow():
a = task_a()
b = task_b()
c = task_c()
2. Tracking Task Order Manually:
If you need to know the position, you'll need to implement it yourself since Prefect doesn't provide this directly. Here's one way to do it:
python
from prefect import flow, task, get_run_context
task_order = {}
current_position = 0
@task
def tracked_task(name):
global current_position
current_position += 1
context = get_run_context()
task_order[context.task_run.task_name] = current_position
print(f"Task {name} is at position {current_position}")
return name
@flow
def my_flow():
global current_position
current_position = 0 # Reset at start of flow
a = tracked_task("A")
b = tracked_task("B")
c = tracked_task("C")
d = tracked_task("D")
e = tracked_task("E")
print(f"Final task order: {task_order}")
3. Using Task Run Context:
You can also access additional task information during execution:
python
from prefect import flow, task, get_run_context
@task
def my_task():
context = get_run_context()
task_run = context.task_run
print(f"Task Name: {task_run.task_name}")
print(f"Task Run ID: {task_run.id}")
print(f"Task State: {task_run.state}")
return "Done"
Important notes:
- Prefect 2.x doesn't have a built-in way to get the numeric position of a task in a flow
- The actual execution order might differ from the definition order if tasks are running concurrently
- If you need strict sequential execution, you can use the `SequentialTaskRunner`:
python
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
@flow(task_runner=SequentialTaskRunner())
def sequential_flow():
# Tasks will run in the exact order specified
a = task_a()
b = task_b()
c = task_c()
The best practice would be to:
1. Use task names and task run IDs to identify tasks
2. Implement your own position tracking if needed
3. Use the SequentialTaskRunner
if you need guaranteed order
4. Use the task run context to access current task information
Would you like me to elaborate on any of these approaches or provide more specific examples for your use case?Scott Pedersen
07/22/2025, 4:23 AMSequentialTaskRunner
can I get a list of tasks for a flow run and populate a dictionary with the names of the tasks with a number for example
TASK_PROGRESSION = {
"task_A": 1,
"task_B": 2,
"task_C": 3,
"task_D": 4
}
Marvin
07/22/2025, 4:23 AMMarvin
07/22/2025, 4:24 AMpython
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
# Dictionary to store task progression
TASK_PROGRESSION = {}
current_position = 0
@task
def task_a():
global current_position
current_position += 1
TASK_PROGRESSION["task_A"] = current_position
return "A"
@task
def task_b():
global current_position
current_position += 1
TASK_PROGRESSION["task_B"] = current_position
return "B"
@task
def task_c():
global current_position
current_position += 1
TASK_PROGRESSION["task_C"] = current_position
return "C"
@task
def task_d():
global current_position
current_position += 1
TASK_PROGRESSION["task_D"] = current_position
return "D"
@flow(task_runner=SequentialTaskRunner())
def my_sequential_flow():
global current_position
current_position = 0 # Reset at start of flow
a = task_a()
b = task_b()
c = task_c()
d = task_d()
print(f"Task Progression: {TASK_PROGRESSION}")
return TASK_PROGRESSION
2. Using a Task Factory Approach (more elegant):
python
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
from typing import Dict
TASK_PROGRESSION: Dict[str, int] = {}
def create_sequential_task(task_name: str, position: int):
@task(name=task_name)
def _task():
TASK_PROGRESSION[task_name] = position
return f"Completed {task_name}"
return _task
@flow(task_runner=SequentialTaskRunner())
def my_sequential_flow():
global TASK_PROGRESSION
TASK_PROGRESSION = {} # Reset at start of flow
# Create and execute tasks in sequence
tasks = [
create_sequential_task(f"task_{chr(65+i)}", i+1)
for i in range(4) # Creates tasks A through D
]
# Execute tasks
for task_fn in tasks:
task_fn()
print(f"Task Progression: {TASK_PROGRESSION}")
return TASK_PROGRESSION
3. Using Task Context (most Prefect-native approach):
python
from prefect import flow, task, get_run_context
from prefect.task_runners import SequentialTaskRunner
TASK_PROGRESSION = {}
def track_task():
context = get_run_context()
task_name = context.task_run.task_name
# Get the current length of the progression dict + 1 for the position
position = len(TASK_PROGRESSION) + 1
TASK_PROGRESSION[task_name] = position
@task(name="task_A")
def task_a():
track_task()
return "A"
@task(name="task_B")
def task_b():
track_task()
return "B"
@task(name="task_C")
def task_c():
track_task()
return "C"
@task(name="task_D")
def task_d():
track_task()
return "D"
@flow(task_runner=SequentialTaskRunner())
def my_sequential_flow():
global TASK_PROGRESSION
TASK_PROGRESSION = {} # Reset at start of flow
a = task_a()
b = task_b()
c = task_c()
d = task_d()
print(f"Task Progression: {TASK_PROGRESSION}")
return TASK_PROGRESSION
# Run the flow
result = my_sequential_flow()
All these approaches will give you a dictionary like:
python
{
"task_A": 1,
"task_B": 2,
"task_C": 3,
"task_D": 4
}
Key points to remember:
- The SequentialTaskRunner ensures tasks run in the order they're called
- The dictionary is maintained at the flow level
- You need to reset the dictionary/counter at the start of each flow run to avoid carrying over state from previous runs
- The task names in the dictionary will match exactly what you specify in the @task decorator or task creation
Which approach would you prefer? I can provide more details about any of them or help you adapt them to your specific needs.Scott Pedersen
07/22/2025, 4:36 AMMarvin
07/22/2025, 4:36 AMMarvin
07/22/2025, 4:36 AMpython
from prefect import flow, task, get_run_context
from prefect.task_runners import SequentialTaskRunner
# Define the task progression upfront
TASK_PROGRESSION = {
"task_A": 1,
"task_B": 2,
"task_C": 3,
"task_D": 4
}
@task(name="task_A")
def task_a():
context = get_run_context()
current_task = context.task_run.task_name
position = TASK_PROGRESSION[current_task]
print(f"Executing {current_task} at position {position}")
return "A"
@task(name="task_B")
def task_b():
context = get_run_context()
current_task = context.task_run.task_name
position = TASK_PROGRESSION[current_task]
print(f"Executing {current_task} at position {position}")
return "B"
@task(name="task_C")
def task_c():
context = get_run_context()
current_task = context.task_run.task_name
position = TASK_PROGRESSION[current_task]
print(f"Executing {current_task} at position {position}")
return "C"
@task(name="task_D")
def task_d():
context = get_run_context()
current_task = context.task_run.task_name
position = TASK_PROGRESSION[current_task]
print(f"Executing {current_task} at position {position}")
return "D"
@flow(task_runner=SequentialTaskRunner())
def my_sequential_flow():
a = task_a()
b = task_b()
c = task_c()
d = task_d()
To make this more DRY (Don't Repeat Yourself), you could create a helper function:
python
from prefect import flow, task, get_run_context
from prefect.task_runners import SequentialTaskRunner
from functools import wraps
# Define the task progression upfront
TASK_PROGRESSION = {
"task_A": 1,
"task_B": 2,
"task_C": 3,
"task_D": 4
}
def log_task_position(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
context = get_run_context()
current_task = context.task_run.task_name
position = TASK_PROGRESSION[current_task]
print(f"Executing {current_task} at position {position}")
return fn(*args, **kwargs)
return wrapper
@task(name="task_A")
@log_task_position
def task_a():
return "A"
@task(name="task_B")
@log_task_position
def task_b():
return "B"
@task(name="task_C")
@log_task_position
def task_c():
return "C"
@task(name="task_D")
@log_task_position
def task_d():
return "D"
@flow(task_runner=SequentialTaskRunner())
def my_sequential_flow():
a = task_a()
b = task_b()
c = task_c()
d = task_d()
This approach gives you several benefits:
1. The task sequence is defined clearly at the top of your code
2. You can reference the position of any task at any time
3. The SequentialTaskRunner ensures tasks execute in the order you define them
4. You can easily check what position a task should be in using TASK_PROGRESSION[task_name]
You could also use this to add validation:
python
def validate_task_sequence():
"""Validate that all tasks are present and in correct order"""
expected_positions = set(range(1, len(TASK_PROGRESSION) + 1))
actual_positions = set(TASK_PROGRESSION.values())
if expected_positions != actual_positions:
raise ValueError("Task progression is not sequential")
# Run validation when module loads
validate_task_sequence()
Would you like me to explain any part of this in more detail or show you how to use the task progression information in other ways?