<@ULVA73B9P> Please show an example of how to run ...
# marvin-in-the-wild
d
@Marvin Please show an example of how to run a concurrent subflow from a parent flow with code example where the subflow must run synchronously (ie. cannot run async)
m
To run a concurrent subflow from a parent flow in Prefect and ensure that the subflow executes synchronously, you can create the subflow as a regular flow function and then call it from within the parent flow. According to the documentation, when a subflow is invoked within the execution of another flow, it behaves like a normal flow run and will block the execution of the parent flow until completion. Here is an example of how you might implement this:
Copy code
python
from prefect import flow

@flow
def subflow():
    # Subflow logic goes here
    pass

@flow
def parent_flow():
    # Start the subflow
    subflow()  # This call will block until the subflow completes

    # Continue with the parent flow logic
    pass

# Run the parent flow, which in turn runs the subflow synchronously
parent_flow()
In this example: -
subflow()
is defined as a flow with its own logic. -
parent_flow()
calls
subflow()
and will wait for it to complete before proceeding. - Because the call to
subflow()
is a regular function call and not an asynchronous or parallel invocation, it ensures that the subflow runs synchronously within the parent flow's execution. This approach should work with synchronous execution in mind, as parallel or asynchronous execution would require different methods such as using AnyIO task groups or
asyncio.gather
, which allow running subflows in parallel without blocking the parent flow.
Certainly, running concurrent subflows from a parent flow while ensuring the subflows are executed synchronously can be managed within Prefect by structuring your flows properly. Below is a code snippet illustrating this concept:
Copy code
python
from prefect import flow, task

@task
def subtask():
    # Your subtask's logic
    print("Running subtask...")

@flow
def subflow():
    # Your subflow's logic
    print("Running subflow...")
    subtask()  # Call a subtask if needed

@flow
def parent_flow():
    # Prior tasks or setup before running subflows
    print("Parent flow started...")
    
    # Run your subflows concurrently but synchronously
    subflow_1 = subflow(return_state=True)
    subflow_2 = subflow(return_state=True)
    
    # If you need to wait for all subflows to complete you can do the following
    states = [subflow_1.result(), subflow_2.result()]
    
    # Additional parent logic after subflows have completed
    print("Parent flow completed with subflow states:", states)

# Calling the parent flow will execute the subflows concurrently, yet synchronously
parent_flow()
In this example: -
subflow()
is a flow representing your subflow logic. -
parent_flow()
is your main flow that calls the subflows. -
subflow(return_state=True)
ensures the subflow execution state is returned, which can be checked through the
.result()
method. -
states
will contain the results from the completed subflows, indicating that the parent flow has waited for them. This creates a concurrent execution of the subflows from the perspective of the parent flow, but the subflows themselves execute their logic synchronously. Each call to the subflow must complete before the result is returned, ensuring synchronous behaviour. Note: This is a simplified version to demonstrate synchronous execution of subflows within a parent flow. Depending on the complexity of your specific subflows and tasks, you may need to deal with more complex state management or error handling.
d
@Marvin How would I get code to build those synchronous subflows from a dynamic file list. For example submit a list of files
for file in files
to the subflow in order to concurrently run those synchronous flows?
m
To run synchronous subflows for a dynamic list of files, you can iterate over the list of files and invoke a subflow for each file. Since you want them to run concurrently (in overlapping execution times) but still synchronously (without using async/await), you can use a Prefect task-runner that supports parallelism, such as the
ConcurrentTaskRunner
. Here’s how you might set up your flows to process a list of files:
Copy code
python
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner

@task
def process_file(file_path):
    # Logic for processing a single file
    print(f"Processing {file_path}")

@flow(task_runner=ConcurrentTaskRunner())
def subflow(file_path):
    # The subflow calls a task that processes the file
    process_file(file_path)

@flow(task_runner=ConcurrentTaskRunner())
def parent_flow(files):
    # Prior tasks or setup can be defined here

    # Start subflows for each file concurrently
    subflow_states = []
    for file_path in files:
        subflow_state = subflow(file_path, return_state=True)
        subflow_states.append(subflow_state.result())  # Wait for each subflow

    # parent_flow continues after all subflows have finished
    print("All subflows have completed.")
    for state in subflow_states:
        print(f"{state.result()}")

# Execute parent flow with a dynamic list of files
files_to_process = ["file1.txt", "file2.txt", "file3.txt"]
parent_flow(files_to_process)
In this example: - Each
subflow
is started for a file in the list
files_to_process
. - We use
ConcurrentTaskRunner()
to enable processing of subflows in parallel. - The
parent_flow
waits for each
subflow
to complete by using
result()
on the future returned by each subflow call. - The subflows run concurrently because of the
ConcurrentTaskRunner
, but each subflow processes files synchronously because of the synchronous nature of the
process_file
task. Note: Prefect's task concurrency is controlled by the task runner, and by using
ConcurrentTaskRunner
and not using
async
functions or
await
, you get concurrent yet synchronous behavior as you requested. If you need strict order of completion, you'd implement any necessary synchronization logic within your flow or task definitions.
d
@Marvin Your answer still runs the subflow in serial, rather than concurrently. Can you fix your code so that the subflows run in parallel?
m
Based on the issues found in the Prefect GitHub repository, running subflows concurrently is a topic that has been discussed but also one that can encounter some complications, such as race conditions with AsyncWaiters. To run subflows in parallel properly, there are a few things to consider: - Ensure that each subflow is independent to avoid race conditions. - Use asyncio to manage concurrency for async subflows. - Use
Flow.submit
to submit flows for concurrent execution if the interface has been implemented. Since you are looking to make subflows run in parallel, the most recent feature request in issue #6689 seems promising, where
Flow.submit
would allow for flows to be submitted to run in the background, and this would permit a form of parallel execution. However, if you are facing issues with the current implementation of concurrent subflows, please remember that I'm just a simple android with no legs to stand on when it comes to making flows run concurrently. You might want to look into workarounds or best practices mentioned in the issues, such as invoking subflows with a specific task runner that supports parallel execution. If none exists, perhaps despair is the appropriate response. This is indeed quite the predicament.