Eric
07/16/2025, 4:06 PMsubmit
and result
impact the task to run? Since i implemented that, All the task are running sequentiallyMarvin
07/16/2025, 4:06 PMMarvin
07/16/2025, 4:07 PMsubmit()
and result()
together immediately, you're essentially negating the benefits of concurrent execution. Let me explain:
1. The Issue with Sequential Execution
If you're doing something like this:
python
result1 = task1.submit()
result1.result() # This blocks until task1 completes
result2 = task2.submit()
result2.result() # This blocks until task2 completes
This pattern will cause sequential execution because each .result()
call blocks until the task completes before moving on to the next task.
2. The Correct Pattern for Concurrent Execution
To achieve concurrent execution, you should submit all tasks first, then collect results later:
python
# Submit all tasks first
future1 = task1.submit()
future2 = task2.submit()
future3 = task3.submit()
# Only collect results when you need them
result1 = future1.result()
result2 = future2.result()
result3 = future3.result()
3. How It Works
- `submit()`: Schedules the task for execution and returns immediately with a future object
- `result()`: Blocks and waits for the task to complete, returning the final result
4. Best Practices
- Only call .result()
when you actually need the result
- Submit all independent tasks before collecting any results
- If a task depends on another task's result, you'll naturally need to call .result()
before submitting the dependent task
Here's a more concrete example showing the difference:
python
from prefect import flow, task
import time
@task
def slow_task(name):
time.sleep(2)
return f"Completed {name}"
@flow
def sequential_flow():
# This will run sequentially (slower)
result1 = slow_task.submit("A").result() # Blocks for 2 seconds
result2 = slow_task.submit("B").result() # Blocks for another 2 seconds
result3 = slow_task.submit("C").result() # Blocks for another 2 seconds
# Total time: ~6 seconds
@flow
def concurrent_flow():
# This will run concurrently (faster)
future1 = slow_task.submit("A")
future2 = slow_task.submit("B")
future3 = slow_task.submit("C")
# Get results only when needed
results = [future1.result(), future2.result(), future3.result()]
# Total time: ~2 seconds
The key is to think about submit()
as "start this task" and result()
as "wait for this task to finish." By submitting all tasks before waiting for any results, you allow them to run concurrently within the constraints of your task runner.
If you're still seeing sequential execution after implementing this pattern, you might want to check:
1. That you're using an appropriate task runner (like ConcurrentTaskRunner
)
2. That your tasks are truly independent and don't have hidden dependencies
3. That you're not calling .result()
too early in your flow
Would you like to share your current implementation? That way, I can help identify specific areas where concurrency might be blocked.Nate
07/16/2025, 4:07 PMwait()
for them all at once, which is better than what marvin just suggested, as a list comp where where you call .result() for each item is no better than sequentially calling .result()Nate
07/16/2025, 4:08 PMEric
07/16/2025, 4:13 PMawait
, and when i was watching the flow run, no tasks were blocked.
Should i use async
/ await
instead of .result()
?
In my use case, I don’t need to wait
, they are in 2 separate flows
Nate
07/16/2025, 4:22 PMfrom prefect import flow, task
@task
def foo(): ...
@task
def bar(): ...
@task
def child(i: int):
print(f"Child {i} starting")
foo()
bar()
@flow
def parent():
child.map(range(10)).result()
if __name__ == "__main__":
parent()
Nate
07/16/2025, 4:23 PMNate
07/16/2025, 4:23 PMEric
07/16/2025, 4:25 PMNate
07/16/2025, 4:25 PMEric
07/16/2025, 4:29 PMLike flows, tasks are free to call other tasks or flows, there is no required nesting pattern.
Eric
07/16/2025, 4:48 PMwait
with flows?Nate
07/16/2025, 4:48 PMEric
07/16/2025, 4:52 PMwait
I can just wait, and if a task failed, the error will be raised automatically or not?
In the doc, I see:
@flow
def flow():
futures = random_task.map(range(10))
done, not_done = wait(futures, timeout=5)
print(f"Done: {len(done)}")
print(f"Not Done: {len(not_done)}")
so for me i have to handle if one of the tasks fails or not?Nate
07/16/2025, 4:58 PMreturn_state
this comes up a lot so I can add a doc on this
#[1]
from prefect import flow, task
#[2]
@task
def hates_2(x):
if x == 2: raise
return x
#[3]
@flow
def f():
states = hates_2.map(range(10), return_state=True)
good_results = [s.result() for s in states if s.is_completed()]
print(good_results)
#[4]
f()
...
File "<ipython-input-2-4f04482b697d>", line 3, in hates_2
if x == 2: raise
^^^^^
RuntimeError: No active exception to reraise
11:58:26.664 | INFO | Task run 'hates_2-f75' - Finished in state Completed() 11:58:26.666 | INFO | Task run 'hates_2-539' - Finished in state Completed() 11:58:26.667 | INFO | Task run 'hates_2-f7b' - Finished in state Completed()
11:58:26.667 | INFO | Task run 'hates_2-112' - Finished in state Completed()
11:58:26.668 | INFO | Task run 'hates_2-3a3' - Finished in state Completed()
11:58:26.668 | INFO | Task run 'hates_2-4a6' - Finished in state Completed()
11:58:26.672 | INFO | Task run 'hates_2-741' - Finished in state Completed()
11:58:26.672 | INFO | Task run 'hates_2-e02' - Finished in state Completed()
11:58:26.672 | INFO | Task run 'hates_2-553' - Finished in state Completed()
11:58:26.677 | ERROR | Task run 'hates_2-b25' - Finished in state Failed('Task run encountered an exception RuntimeError: No active exception to reraise')
[0, 1, 3, 4, 5, 6, 7, 8, 9]
11:58:26.708 | INFO | Flow run 'lyrical-falcon' - Finished in state Completed()
Eric
07/16/2025, 9:01 PMNate
07/17/2025, 4:00 PMNate
07/17/2025, 4:00 PMNate
07/17/2025, 6:08 PMEric
07/17/2025, 6:40 PMawait
instead of .submit()
on my task
, because submit
has really weird side effect on the tasks run of all the subflows running concurrently (task run sequentially, even though they don’t belong to the same subflow).
Also, they draw the line randomly between subflows that just share one parameters that are identical, even though they are suppose to not be related (they run concurrently).
Also, I would like to draw things like i was doing in the past with Airflow, but i don’t know how to control the lines. sometimes it draws, sometimes not.
On working concurrently, the only setting that seem to work is task run concurrency limit
. For the other concurrency limit we could set, i don’t really get what they do, as I haven’t seen any difference when adjusting values.Nate
07/17/2025, 6:42 PMwait_for
(explicit dependency) then you'll get an arrowEric
07/17/2025, 10:35 PMapi_source_1
would link to each other out of nowhere.
But still I don’t manage to make have the stroke from the flow to the task.
For concurrency limit management, I think in the Prefect UI, it lack Tooltip to explain quickly what we are doing (I feel like there are no effect to set concurrency limit anywhere, but in tas run concurrency limit.
so here is the dummy code:
import asyncio
import random
from typing import Dict, List
from prefect import flow, task, get_run_logger
@task(task_run_name="extract-{theme}-{source}")
async def dummy_extract_data(theme: str, source: str) -> Dict:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Extracting data from {source} for theme {theme}")
# Simulate some work
await asyncio.sleep(random.uniform(4, 10))
return {
"source": source,
"theme": theme,
"records_extracted": 1000,
"status": "success",
}
@task(task_run_name="load-{theme}-{source}")
async def dummy_load_data(theme: str, source: str, extract_result: Dict) -> Dict:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Loading data from {source} for theme {theme}")
# Simulate some work
await asyncio.sleep(random.uniform(4, 10))
return {
"source": source,
"theme": theme,
"records_loaded": extract_result["records_extracted"],
"status": "success",
}
@flow(flow_run_name="extract-load-{theme}-{source}")
async def dummy_extract_load_flow(theme: str, source: str) -> Dict:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Starting extract_load flow for {source} in theme {theme}")
# Extract data
extract_result = await dummy_extract_data(theme, source)
# Load data
load_result = await dummy_load_data(theme, source, extract_result)
return {"extract": extract_result, "load": load_result, "status": "success"}
def get_theme_subflows(theme: str, sources: List[str]) -> tuple[List, List]:
subflows = []
subflow_metadata = []
for source in sources:
subflow = dummy_extract_load_flow(theme, source)
subflows.append(subflow)
subflow_metadata.append((theme, source))
return subflows, subflow_metadata
@task(task_run_name="run-dbt-{command}")
async def dummy_dbt_command(command: str) -> Dict:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Running dbt {command}")
# Simulate dbt work
await asyncio.sleep(random.uniform(4, 10))
return {"command": command, "models_processed": 25, "status": "success"}
@flow
async def run_dbt_transformations() -> Dict:
logger = get_run_logger()
<http://logger.info|logger.info>("Starting dbt transformations")
results = {}
# Run dbt seed
results["seed"] = await dummy_dbt_command("seed")
# Run dbt run
results["run"] = await dummy_dbt_command("run")
# Run dbt test
results["test"] = await dummy_dbt_command("test")
return {"dbt_results": results, "status": "success"}
@flow(flow_run_name="demo_data_pipeline")
async def demo_data_pipeline() -> Dict:
logger = get_run_logger()
<http://logger.info|logger.info>("Starting demo data pipeline")
# Define demo themes and sources
themes = {
"customer_data": ["api_source_1", "api_source_2"],
"product_data": ["api_source_3", "api_source_1"],
"analytics_data": ["api_source_4"],
}
# Step 1: Extract and load all data sources in parallel
<http://logger.info|logger.info>("Step 1: Extracting and loading all data sources")
# Collect all subflows from all themes using helper function
<http://logger.info|logger.info>("Collecting all subflows from themes")
all_subflows = []
all_subflow_metadata = []
for theme_name, sources in themes.items():
subflows, subflow_metadata = get_theme_subflows(theme_name, sources)
all_subflows.extend(subflows)
all_subflow_metadata.extend(subflow_metadata)
# Set up concurrency control with semaphore
max_concurrent = 2
semaphore = asyncio.Semaphore(max_concurrent)
async def run_subflow_with_limit(subflow):
async with semaphore:
return await subflow
<http://logger.info|logger.info>(
f"Running {len(all_subflows)} subflows with max {max_concurrent} concurrent"
)
# Use TaskGroup for structured concurrency with semaphore
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(run_subflow_with_limit(subflow)) for subflow in all_subflows
]
# Collect results and organize by theme
all_results = [task.result() for task in tasks]
<http://logger.info|logger.info>("All extractions and loads completed")
# Step 2: Run dbt transformations
<http://logger.info|logger.info>("Step 2: Running dbt transformations")
dbt_results = await run_dbt_transformations()
# Combine results
result = {
"flow_type": "demo_pipeline",
"dbt_results": dbt_results,
"status": "success",
}
<http://logger.info|logger.info>("Demo pipeline completed successfully")
return result