<@ULVA73B9P> how would the task method `submit` an...
# ask-marvin
e
@Marvin how would the task method
submit
and
result
impact the task to run? Since i implemented that, All the task are running sequentially
m
thought for 42.6 seconds
Based on your description of tasks running sequentially, I think I understand the issue. When you use
submit()
and
result()
together immediately, you're essentially negating the benefits of concurrent execution. Let me explain: 1. The Issue with Sequential Execution If you're doing something like this:
Copy code
python
result1 = task1.submit()
result1.result()  # This blocks until task1 completes
result2 = task2.submit()
result2.result()  # This blocks until task2 completes
This pattern will cause sequential execution because each
.result()
call blocks until the task completes before moving on to the next task. 2. The Correct Pattern for Concurrent Execution To achieve concurrent execution, you should submit all tasks first, then collect results later:
Copy code
python
# Submit all tasks first
future1 = task1.submit()
future2 = task2.submit()
future3 = task3.submit()

# Only collect results when you need them
result1 = future1.result()
result2 = future2.result()
result3 = future3.result()
3. How It Works - `submit()`: Schedules the task for execution and returns immediately with a future object - `result()`: Blocks and waits for the task to complete, returning the final result 4. Best Practices - Only call
.result()
when you actually need the result - Submit all independent tasks before collecting any results - If a task depends on another task's result, you'll naturally need to call
.result()
before submitting the dependent task Here's a more concrete example showing the difference:
Copy code
python
from prefect import flow, task
import time

@task
def slow_task(name):
    time.sleep(2)
    return f"Completed {name}"

@flow
def sequential_flow():
    # This will run sequentially (slower)
    result1 = slow_task.submit("A").result()  # Blocks for 2 seconds
    result2 = slow_task.submit("B").result()  # Blocks for another 2 seconds
    result3 = slow_task.submit("C").result()  # Blocks for another 2 seconds
    # Total time: ~6 seconds

@flow
def concurrent_flow():
    # This will run concurrently (faster)
    future1 = slow_task.submit("A")
    future2 = slow_task.submit("B")
    future3 = slow_task.submit("C")
    
    # Get results only when needed
    results = [future1.result(), future2.result(), future3.result()]
    # Total time: ~2 seconds
The key is to think about
submit()
as "start this task" and
result()
as "wait for this task to finish." By submitting all tasks before waiting for any results, you allow them to run concurrently within the constraints of your task runner. If you're still seeing sequential execution after implementing this pattern, you might want to check: 1. That you're using an appropriate task runner (like
ConcurrentTaskRunner
) 2. That your tasks are truly independent and don't have hidden dependencies 3. That you're not calling
.result()
too early in your flow Would you like to share your current implementation? That way, I can help identify specific areas where concurrency might be blocked.
n
.result() will block until the future is resolved so typically you want to submit or map a bunch of stuff to get your futures, and then
wait()
for them all at once, which is better than what marvin just suggested, as a list comp where where you call .result() for each item is no better than sequentially calling .result()
e
Hi @Nate, thanks for the reply. The issue is that I have a flow with 10 subflow running in parallel. each subflow has 2 tasks. but now all the subflows wait for the one to complete all its tasks. Before, I was using `async`and
await
, and when i was watching the flow run, no tasks were blocked. Should i use
async
/
await
instead of
.result()
? In my use case, I don’t need to
wait
, they are in 2 separate
flows
n
Copy code
from prefect import flow, task


@task
def foo(): ...


@task
def bar(): ...


@task
def child(i: int):
    print(f"Child {i} starting")
    foo()
    bar()


@flow
def parent():
    child.map(range(10)).result()


if __name__ == "__main__":
    parent()
is there any reason you can't do this ?
generally i recommend either • use async/await (normal async python) across the board • our simplified concurrency interfaces (map, submit, wait etc) but not both
e
@Nate we can have nested Prefect tasks? I thought we couldn’t according to the documentation
n
you can indeed, can you point me to where you saw docs suggesting otherwise?
e
ok my bad, i didn’t read well: https://docs.prefect.io/v3/concepts/tasks
Like flows, tasks are free to call other tasks or flows, there is no required nesting pattern.
👍 1
can we use
wait
with flows?
n
no, wait is for futures. flows don't have a futures interface what are you trying to do?
e
I am doing an ELT from multiple sources. i had 10 flows (it is flows because i didn’t know task can contain another tasks) running concurrently (each is just an extract then a load), then i need to wait for the 10 flows to complete, to run dbt models. Also I don’t get if with
wait
I can just wait, and if a task failed, the error will be raised automatically or not? In the doc, I see:
Copy code
@flow
    def flow():
        futures = random_task.map(range(10))
        done, not_done = wait(futures, timeout=5)
        print(f"Done: {len(done)}")
        print(f"Not Done: {len(not_done)}")
so for me i have to handle if one of the tasks fails or not?
n
if you want work to finish even if one of them fails, you should use
return_state
this comes up a lot so I can add a doc on this
Copy code
#[1]
from prefect import flow, task

#[2]
@task
def hates_2(x):
    if x == 2: raise
    return x


#[3]
@flow
def f():
    states = hates_2.map(range(10), return_state=True)
    good_results = [s.result() for s in states if s.is_completed()]
    print(good_results)


#[4]
f()
...
  File "<ipython-input-2-4f04482b697d>", line 3, in hates_2
    if x == 2: raise
               ^^^^^
RuntimeError: No active exception to reraise
11:58:26.664 | INFO    | Task run 'hates_2-f75' - Finished in state Completed()                                                    11:58:26.666 | INFO    | Task run 'hates_2-539' - Finished in state Completed()                                                    11:58:26.667 | INFO    | Task run 'hates_2-f7b' - Finished in state Completed()
11:58:26.667 | INFO    | Task run 'hates_2-112' - Finished in state Completed()
11:58:26.668 | INFO    | Task run 'hates_2-3a3' - Finished in state Completed()
11:58:26.668 | INFO    | Task run 'hates_2-4a6' - Finished in state Completed()
11:58:26.672 | INFO    | Task run 'hates_2-741' - Finished in state Completed()
11:58:26.672 | INFO    | Task run 'hates_2-e02' - Finished in state Completed()
11:58:26.672 | INFO    | Task run 'hates_2-553' - Finished in state Completed()
11:58:26.677 | ERROR   | Task run 'hates_2-b25' - Finished in state Failed('Task run encountered an exception RuntimeError: No active exception to reraise')
[0, 1, 3, 4, 5, 6, 7, 8, 9]
11:58:26.708 | INFO    | Flow run 'lyrical-falcon' - Finished in state Completed()
👍 1
e
@Nate btw, for these 10 subflows, they are supposed to run in parallel, so they don’t have any relationships between each other, but Prefect draws a line with subflows that share a parameter of the function with the same name. I don’t really get why...
lmk if you have any thoughts
also, if you have a minimal example of the unexpected relationships between tasks, please feel free to share
e
@Nate OK thanks I am using
await
instead of
.submit()
on my
task
, because
submit
has really weird side effect on the tasks run of all the subflows running concurrently (task run sequentially, even though they don’t belong to the same subflow). Also, they draw the line randomly between subflows that just share one parameters that are identical, even though they are suppose to not be related (they run concurrently). Also, I would like to draw things like i was doing in the past with Airflow, but i don’t know how to control the lines. sometimes it draws, sometimes not. On working concurrently, the only setting that seem to work is
task run concurrency limit
. For the other concurrency limit we could set, i don’t really get what they do, as I haven’t seen any difference when adjusting values.
n
if you have a self-contained example where something behaves in a way that you don't expect, it would make it a lot easier for me to make a recommendation in general, if you pass data between tasks/flows (implicit dependency) or if you use
wait_for
(explicit dependency) then you'll get an arrow
e
@Nate my script looks like that (I replaced it with dummy things (see below). I ran this flow but you don’t have the issue i mentioned. Though, I would like to find a way to not use Semaphore, because the flow shouldn’t manage the queue of concurrent flows. I haven’t found the settings. In my flow, the subflow are becomming spaguettis even if they are not really linked to each other. In my example, in theory, the subflow with the source name
api_source_1
would link to each other out of nowhere. But still I don’t manage to make have the stroke from the flow to the task. For concurrency limit management, I think in the Prefect UI, it lack Tooltip to explain quickly what we are doing (I feel like there are no effect to set concurrency limit anywhere, but in tas run concurrency limit. so here is the dummy code:
Copy code
import asyncio
import random
from typing import Dict, List
from prefect import flow, task, get_run_logger


@task(task_run_name="extract-{theme}-{source}")
async def dummy_extract_data(theme: str, source: str) -> Dict:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Extracting data from {source} for theme {theme}")

    # Simulate some work
    await asyncio.sleep(random.uniform(4, 10))

    return {
        "source": source,
        "theme": theme,
        "records_extracted": 1000,
        "status": "success",
    }


@task(task_run_name="load-{theme}-{source}")
async def dummy_load_data(theme: str, source: str, extract_result: Dict) -> Dict:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Loading data from {source} for theme {theme}")

    # Simulate some work
    await asyncio.sleep(random.uniform(4, 10))

    return {
        "source": source,
        "theme": theme,
        "records_loaded": extract_result["records_extracted"],
        "status": "success",
    }


@flow(flow_run_name="extract-load-{theme}-{source}")
async def dummy_extract_load_flow(theme: str, source: str) -> Dict:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Starting extract_load flow for {source} in theme {theme}")

    # Extract data
    extract_result = await dummy_extract_data(theme, source)

    # Load data
    load_result = await dummy_load_data(theme, source, extract_result)

    return {"extract": extract_result, "load": load_result, "status": "success"}


def get_theme_subflows(theme: str, sources: List[str]) -> tuple[List, List]:
    subflows = []
    subflow_metadata = []

    for source in sources:
        subflow = dummy_extract_load_flow(theme, source)
        subflows.append(subflow)
        subflow_metadata.append((theme, source))

    return subflows, subflow_metadata


@task(task_run_name="run-dbt-{command}")
async def dummy_dbt_command(command: str) -> Dict:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Running dbt {command}")

    # Simulate dbt work
    await asyncio.sleep(random.uniform(4, 10))

    return {"command": command, "models_processed": 25, "status": "success"}


@flow
async def run_dbt_transformations() -> Dict:
    logger = get_run_logger()
    <http://logger.info|logger.info>("Starting dbt transformations")

    results = {}

    # Run dbt seed
    results["seed"] = await dummy_dbt_command("seed")

    # Run dbt run
    results["run"] = await dummy_dbt_command("run")

    # Run dbt test
    results["test"] = await dummy_dbt_command("test")

    return {"dbt_results": results, "status": "success"}


@flow(flow_run_name="demo_data_pipeline")
async def demo_data_pipeline() -> Dict:
    logger = get_run_logger()
    <http://logger.info|logger.info>("Starting demo data pipeline")

    # Define demo themes and sources
    themes = {
        "customer_data": ["api_source_1", "api_source_2"],
        "product_data": ["api_source_3", "api_source_1"],
        "analytics_data": ["api_source_4"],
    }

    # Step 1: Extract and load all data sources in parallel
    <http://logger.info|logger.info>("Step 1: Extracting and loading all data sources")

    # Collect all subflows from all themes using helper function
    <http://logger.info|logger.info>("Collecting all subflows from themes")
    all_subflows = []
    all_subflow_metadata = []

    for theme_name, sources in themes.items():
        subflows, subflow_metadata = get_theme_subflows(theme_name, sources)
        all_subflows.extend(subflows)
        all_subflow_metadata.extend(subflow_metadata)

    # Set up concurrency control with semaphore
    max_concurrent = 2
    semaphore = asyncio.Semaphore(max_concurrent)

    async def run_subflow_with_limit(subflow):
        async with semaphore:
            return await subflow

    <http://logger.info|logger.info>(
        f"Running {len(all_subflows)} subflows with max {max_concurrent} concurrent"
    )

    # Use TaskGroup for structured concurrency with semaphore
    async with asyncio.TaskGroup() as tg:
        tasks = [
            tg.create_task(run_subflow_with_limit(subflow)) for subflow in all_subflows
        ]

    # Collect results and organize by theme
    all_results = [task.result() for task in tasks]

    <http://logger.info|logger.info>("All extractions and loads completed")

    # Step 2: Run dbt transformations
    <http://logger.info|logger.info>("Step 2: Running dbt transformations")
    dbt_results = await run_dbt_transformations()

    # Combine results
    result = {
        "flow_type": "demo_pipeline",
        "dbt_results": dbt_results,
        "status": "success",
    }

    <http://logger.info|logger.info>("Demo pipeline completed successfully")
    return result