<@ULVA73B9P> Give me an example of a flow that lev...
# ask-marvin
b
@Marvin Give me an example of a flow that leverages Prefect 2.20.3 and the "task_runner" parameter in an effort to increase efficiency and performance.
m
For a Prefect 2.20.3 flow utilizing the
task_runner
parameter to boost efficiency and performance, you can leverage the
DaskTaskRunner
or
RayTaskRunner
. Below is a sample flow using
DaskTaskRunner
.
Copy code
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def multiply(x, y):
    return x * y

@task
def add(x, y):
    return x + y

@flow(task_runner=DaskTaskRunner)
def math_flow(a, b):
    product = multiply.submit(a, b)
    sum_result = add.submit(a, b)
    return product.result(), sum_result.result()

if __name__ == "__main__":
    result = math_flow(3, 4)
    print(result)
This example creates a simple flow that calculates the product and sum of two numbers concurrently using Dask for distributed task execution. For more details, you can refer to the Prefect documentation on task runners: "https://docs-2.prefect.io/latest/concepts/task-runners/" "https://docs-2.prefect.io/latest/integrations/prefect-dask/task_runners/"
b
@Marvin, using the same method, provide an example of processing files in a directory.
m
Here's an example of a Prefect flow using
DaskTaskRunner
to process files in a directory. This setup will allow tasks like reading and processing files to run concurrently, thus improving efficiency.
Copy code
python
from pathlib import Path
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def read_file(file_path: Path) -> str:
    """Read the content of a file."""
    with file_path.open('r') as file:
        content = file.read()
    return content

@task
def process_content(content: str) -> str:
    """Process the content of the file."""
    # Dummy processing: reverse the content
    return content[::-1]

@task
def save_processed_content(content: str, output_dir: Path, file_name: str) -> Path:
    """Save the processed content to a new file."""
    output_path = output_dir / file_name
    with output_path.open('w') as file:
        file.write(content)
    return output_path

@flow(task_runner=DaskTaskRunner)
def process_files_in_directory(input_dir: Path, output_dir: Path):
    """Flow to process all files in a directory."""
    output_dir.mkdir(exist_ok=True)

    for file_path in input_dir.glob('*.txt'):
        content = read_file.submit(file_path)
        processed_content = process_content.submit(content)
        save_processed_content.submit(processed_content, output_dir, file_path.name)

if __name__ == "__main__":
    input_directory = Path("input_files")
    output_directory = Path("output_files")
    process_files_in_directory(input_directory, output_directory)
In this example: 1. The
read_file
task reads the content of a file from the input directory. 2. The
process_content
task processes (reverses) the content. 3. The
save_processed_content
task saves the processed content to the output directory. 4. The
process_files_in_directory
flow manages the overall process, running the tasks concurrently using
DaskTaskRunner
. Refer to the documentation for more details and further customization options: "https://docs-2.prefect.io/latest/integrations/prefect-dask/" "https://docs-2.prefect.io/latest/concepts/task-runners/"
b
@Marvin can I use the task_runner parameter in the flow and in a task that the flow calls to create further parallelization?
m
Yes, you can use the
task_runner
parameter both in the flow and within tasks to achieve further parallelization with Prefect 2.20.3. This technique can improve the performance and efficiency of your workflows by leveraging nested parallel task execution. Here's an example of how to use
task_runner
in both the flow and a nested task:
Copy code
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def read_file(file_path):
    # Simulating reading file content
    return f"Content of {file_path}"

@task
def process_content(content):
    # Simulating content processing
    return content[::-1]

@task
def nested_flow(file_paths):
    # Define a nested flow with its own task runner
    @flow(task_runner=DaskTaskRunner)
    def inner_flow(paths):
        results = []
        for path in paths:
            content = read_file.submit(path)
            processed = process_content.submit(content)
            results.append(processed)
        return results

    # Execute the nested flow
    return inner_flow(file_paths)

@flow(task_runner=DaskTaskRunner)
def main_flow(directory):
    # Assume directory contains files: ['file1.txt', 'file2.txt', 'file3.txt']
    file_paths = [f"{directory}/file1.txt", f"{directory}/file2.txt", f"{directory}/file3.txt"]
    nested_result = nested_flow.submit(file_paths)
    return nested_result

if __name__ == "__main__":
    result = main_flow("/path/to/directory")
    print(result)
In this example: 1. The
main_flow
uses a
DaskTaskRunner
to run tasks concurrently at the top level. 2. A nested flow (
inner_flow
) within the
nested_flow
task also uses its own
DaskTaskRunner
to achieve further parallelization within the nested context. For more detailed explanations, refer to the Prefect documentation: "https://docs-2.prefect.io/latest/concepts/task-runners/" "https://docs-2.prefect.io/latest/integrations/prefect-dask/task_runners/" This approach maximizes efficiency by enabling parallelism not just at the flow level but also at deeper task execution levels.
b
@Marvin the inner_flow takes a parameter of paths. Is this inherently passed in based on the file_paths parameter on the nested flow?
m
Yes, in the example, the
inner_flow
is designed to take the
paths
parameter, which is explicitly passed from the
nested_flow
task. The
paths
parameter in
inner_flow
is populated directly by the
file_paths
argument that
nested_flow.submit(file_paths)
receives. Here's a step-by-step breakdown: 1. The
main_flow
defines a list of file paths (
file_paths
). 2. The
nested_flow.submit(file_paths)
call within
main_flow
starts the
nested_flow
task using these file paths. 3. The
inner_flow
nested within
nested_flow
is called with the
file_paths
parameter (passed as
paths
). To clarify this more concretely, here is the revised definition:
Copy code
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def read_file(file_path):
    # Simulating reading file content
    return f"Content of {file_path}"

@task
def process_content(content):
    # Simulating content processing
    return content[::-1]

@task
def nested_flow(file_paths):
    @flow(task_runner=DaskTaskRunner)
    def inner_flow(paths):
        results = []
        for path in paths:
            content = read_file.submit(path)
            processed = process_content.submit(content)
            results.append(processed)
        return results

    # Call inner_flow with the file_paths provided to nested_flow
    return inner_flow(file_paths)

@flow(task_runner=DaskTaskRunner)
def main_flow(directory):
    # Define file paths
    file_paths = [f"{directory}/file1.txt", f"{directory}/file2.txt", f"{directory}/file3.txt"]
    # Pass file_paths to nested_flow
    nested_result = nested_flow.submit(file_paths)
    return nested_result

if __name__ == "__main__":
    # Run the main flow, passing the directory containing input files
    result = main_flow("/path/to/directory")
    print(result.result())
In this code: 1.
main_flow
calls
nested_flow
with
file_paths
. 2.
nested_flow
receives
file_paths
and passes this list to
inner_flow
. 3.
inner_flow
processes each path concurrently using the
DaskTaskRunner
. This design ensures that
inner_flow
correctly receives and processes the
file_paths
as intended.
b
@Marvin, can the inner_flow use different or more parameters than paths or are the inner_flow parameters restricted to what the nested_flow provides?
m
The
inner_flow
can indeed use different or additional parameters than what
nested_flow
provides. The parameters of
inner_flow
are not restricted to what
nested_flow
provides; you just need to ensure that all required arguments for
inner_flow
are supplied when it is called. Here's an extended example where
inner_flow
uses additional parameters:
Copy code
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def read_file(file_path):
    return f"Content of {file_path}"

@task
def process_content(content, operation):
    if operation == "reverse":
        return content[::-1]
    elif operation == "uppercase":
        return content.upper()
    else:
        return content

@task
def nested_flow(file_paths, operation):
    @flow(task_runner=DaskTaskRunner)
    def inner_flow(paths, op):
        results = []
        for path in paths:
            content = read_file.submit(path)
            processed = process_content.submit(content, op)
            results.append(processed)
        return results

    # Call inner_flow with the file_paths and operation parameters
    return inner_flow(file_paths, operation)

@flow(task_runner=DaskTaskRunner)
def main_flow(directory, operation):
    file_paths = [f"{directory}/file1.txt", f"{directory}/file2.txt", f"{directory}/file3.txt"]
    nested_result = nested_flow.submit(file_paths, operation)
    return nested_result

if __name__ == "__main__":
    # Run the main flow, passing the directory and a processing operation
    result = main_flow("/path/to/directory", "reverse")
    print(result.result())
In this example: 1. The
main_flow
receives both a
directory
and an
operation
parameter. 2. It calls
nested_flow
with
file_paths
and the
operation
. 3. The
nested_flow
passes these parameters to
inner_flow
. 4. The
inner_flow
uses both
paths
(which maps to
file_paths
) and
op
(which maps to
operation
). This approach demonstrates that you can pass multiple and different parameters to inner flows, not restricted to what the outer nested flow directly provides. You just need to ensure all the necessary parameters are correctly propagated through the task and flow hierarchy. For more details, refer to the Prefect documentation: "https://docs-2.prefect.io/latest/concepts/task-runners/" "https://docs-2.prefect.io/latest/integrations/prefect-dask/task_runners/"
b
@Marvin to the quantity and order of parameters in nested_flow have to match the inner_flow?
m
The quantity and order of parameters in the call to
inner_flow
within
nested_flow
need to match what
inner_flow
expects. However,
nested_flow
itself can receive and handle additional parameters as needed, as long as it correctly maps those parameters to the call of
inner_flow
. Here's an extended example to demonstrate how you can manage parameters in nested flows:
Copy code
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def read_file(file_path):
    # Simulating a file read
    return f"Content of {file_path}"

@task
def process_content(content, operation):
    # Simulating content processing based on operation type
    if operation == "reverse":
        return content[::-1]
    elif operation == "uppercase":
        return content.upper()
    else:
        return content

@task
def nested_flow(file_paths, operation, additional_flag):
    # Define a nested flow with its own task runner and parameters
    @flow(task_runner=DaskTaskRunner)
    def inner_flow(paths, op, flag):
        results = []
        for path in paths:
            content = read_file.submit(path)
            processed = process_content.submit(content, op) if flag else content
            results.append(processed)
        return results

    # Call inner_flow with the required parameters
    return inner_flow(file_paths, operation, additional_flag)

@flow(task_runner=DaskTaskRunner)
def main_flow(directory, operation, additional_flag):
    # List files in the directory (simulated here)
    file_paths = [f"{directory}/file1.txt", f"{directory}/file2.txt", f"{directory}/file3.txt"]
    # Call nested_flow with all needed parameters
    nested_result = nested_flow.submit(file_paths, operation, additional_flag)
    return nested_result

if __name__ == "__main__":
    # Run the main flow with directory, operation type, and additional flag
    result = main_flow("/path/to/directory", "reverse", True)
    # Print results
    print(result.result())
In this example: 1.
main_flow
receives
directory
,
operation
, and
additional_flag
parameters. 2. It lists the files and then calls
nested_flow
with all the received parameters. 3.
nested_flow
accepts
file_paths
,
operation
, and
additional_flag
, passing them to
inner_flow
. 4.
inner_flow
processes the files concurrently, considering the
operation
and
flag
. This structure keeps
nested_flow
flexible for adding or modifying parameters without changing
inner_flow
, provided you map the parameters correctly. So, while `inner_flow`'s parameter order and quantity need to match when it's called,
nested_flow
can manage and transform additional parameters as needed to fit that call. This approach provides flexibility in managing more complex flows and parameter passing.