<@ULVA73B9P> I'm trying to execute a nested flow i...
# ask-marvin
r
@Marvin I'm trying to execute a nested flow in parallel, as in the following example: @task def task_a(escola): url = f"https://api.exemplo.com/escolas/{escola}/dados" response = httpx.get(url=url, timeout=10) response.raise_for_status() return response @task def task_b(json_recuperado): df = pd.DataFrame(json_recuperado) df_filtrado = df[df['variable_x'] > 10] return df_filtrado @task def task_c(df, escola): output_dir = f"data/escolas" os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, f"escola_{escola}.parquet") df.to_parquet(output_path) @flow() def flow_A(escola): response_json = task_a(escola) df = task_b(response_json) task_c(df, escola) @flow def flow_B(lista_escola): for escola in lista_escola: with tags(f"Escola -{escola}"): try: flow_A(escola) except Exception as e: print(e) raise I tried using .submit on flow A, but I get the following error: 'Flow' object has no attribute 'submit'.
m
In Prefect 3.x, to execute flows in parallel, you cannot use
.submit
directly on a flow object, as flows do not have this method. Instead, you need to use the
run_deployment
method, which allows you to execute flows asynchronously and in parallel. Here's how you can modify your code to achieve parallel execution: 1. Create a Deployment: First, you need to create a deployment for
flow_A
. 2. Run Deployment: Use
run_deployment
to execute the deployment in parallel for each item in
lista_escola
. Here is a concise example:
Copy code
python
from prefect import flow, task
from prefect.deployments import run_deployment

@task
def task_a(escola):
    url = f"<https://api.exemplo.com/escolas/{escola}/dados>"
    response = httpx.get(url=url, timeout=10)
    response.raise_for_status()
    return response.json()  # Assuming you want the JSON content

@task
def task_b(json_recuperado):
    df = pd.DataFrame(json_recuperado)
    df_filtrado = df[df['variable_x'] > 10]
    return df_filtrado

@task
def task_c(df, escola):
    output_dir = f"data/escolas"
    os.makedirs(output_dir, exist_ok=True)
    output_path = os.path.join(output_dir, f"escola_{escola}.parquet")
    df.to_parquet(output_path)

@flow
def flow_A(escola):
    response_json = task_a(escola)
    df = task_b(response_json)
    task_c(df, escola)

@flow
def flow_B(lista_escola):
    for escola in lista_escola:
        try:
            run_deployment(name="flow_A", parameters={"escola": escola})
        except Exception as e:
            print(e)
            raise

# Deployment creation is assumed to be handled outside this script
Note: You will need to ensure that
flow_A
is deployed and accessible by name. The
run_deployment
method allows you to execute the flow asynchronously, enabling parallel execution. For more details on running deployments, you can refer to the Prefect 3.x documentation: Prefect 3.x Deployments. If you have any more questions or need further assistance, feel free to ask.
r
@Marvin In this case, how can I use tags to identify the running subflows?
m
To use tags to identify running subflows in Prefect, you can use the
tags
context manager to assign tags to flow runs. This allows you to categorize and filter your flow runs based on the tags assigned. Here's an example of how you can modify your code to include tags:
Copy code
python
from prefect import flow, task, tags
from prefect.deployments import run_deployment

@task
def task_a(escola):
    url = f"<https://api.exemplo.com/escolas/{escola}/dados>"
    response = httpx.get(url=url, timeout=10)
    response.raise_for_status()
    return response.json()

@task
def task_b(json_recuperado):
    df = pd.DataFrame(json_recuperado)
    df_filtrado = df[df['variable_x'] > 10]
    return df_filtrado

@task
def task_c(df, escola):
    output_dir = f"data/escolas"
    os.makedirs(output_dir, exist_ok=True)
    output_path = os.path.join(output_dir, f"escola_{escola}.parquet")
    df.to_parquet(output_path)

@flow
def flow_A(escola):
    response_json = task_a(escola)
    df = task_b(response_json)
    task_c(df, escola)

@flow
def flow_B(lista_escola):
    for escola in lista_escola:
        with tags(f"Escola-{escola}"):
            try:
                run_deployment(name="flow_A", parameters={"escola": escola})
            except Exception as e:
                print(e)
                raise

# Deployment creation is assumed to be handled outside this script
In this example, each invocation of
flow_A
is tagged with
Escola-{escola}
, allowing you to track and filter the flow runs by school in the Prefect UI or logs. For more detailed information about using tags in Prefect, you can refer to the example here.
r
@Marvin I have 200 elements in the "lista_escola" list, and I want to set a limitation so that only 3 subflows run at a time.
m
To limit the number of concurrent subflows running in Prefect 3.x, you can set a concurrency limit using work pools, work queues, or deployments. Here's a concise guide on how to achieve this: 1. Set Concurrency Limit: You can set concurrency limits at the deployment level using the Prefect CLI or Prefect API. This will restrict the number of concurrent runs for flows associated with a specific tag or deployment. 2. Using Prefect CLI: You can create a concurrency limit for a specific tag using the CLI command:
Copy code
bash
   prefect concurrency-limit create <limit> --tag <tag_name>
For example, to set a concurrency limit of 3 for flows tagged with
Escola
, use:
Copy code
bash
   prefect concurrency-limit create 3 --tag Escola
3. Using Prefect API: If you prefer using the Python API, you can use the
PrefectClient
to set concurrency limits programmatically. Here's a brief example of using tags and setting concurrency limits:
Copy code
python
from prefect import flow, task, tags
from prefect.deployments import run_deployment

@task
def task_a(escola):
    # Your task code here
    pass

@task
def task_b(json_recuperado):
    # Your task code here
    pass

@task
def task_c(df, escola):
    # Your task code here
    pass

@flow
def flow_A(escola):
    # Your flow code here
    pass

@flow
def flow_B(lista_escola):
    for escola in lista_escola:
        with tags("Escola"):
            try:
                run_deployment(name="flow_A", parameters={"escola": escola})
            except Exception as e:
                print(e)
                raise

# Ensure to set the concurrency limit via CLI or API beforehand
For more detailed information on configuring concurrency limits, you can refer to the Prefect 3.x documentation on concurrency limits.
r
@Marvin Show me an example of subflows running in parallel, deploying flow A and flow B.
m
To run subflows in parallel and deploy both
flow_A
and
flow_B
, you can use Prefect's deployment capabilities. Here's how you can set it up: 1. Define Flows: First, define your flows
flow_A
and
flow_B
. 2. Create Deployments: Use Prefect's deployment functionality to deploy these flows. 3. Run Deployments in Parallel: Utilize
run_deployment
to execute the flows in parallel. Here's a complete example:
Copy code
python
from prefect import flow, task, tags
from prefect.deployments import run_deployment

@task
def task_a(escola):
    # Sample task logic
    pass

@task
def task_b(json_recuperado):
    # Sample task logic
    pass

@task
def task_c(df, escola):
    # Sample task logic
    pass

@flow
def flow_A(escola):
    response_json = task_a(escola)
    df = task_b(response_json)
    task_c(df, escola)

@flow
def flow_B(lista_escola):
    deployment_ids = []
    for escola in lista_escola:
        with tags("Escola"):
            deployment_id = run_deployment(name="flow_A", parameters={"escola": escola})
            deployment_ids.append(deployment_id)
    return deployment_ids

# Deploy the flows
if __name__ == "__main__":
    # Assuming you have deployment configurations set up
    flow_A_deployment_id = run_deployment(name="flow_A")
    flow_B_deployment_id = run_deployment(name="flow_B", parameters={"lista_escola": range(200)})

    # Run flow_B to execute flow_A in parallel
    flow_B(range(200))
Steps to Deploy: - Deploy
flow_A
and
flow_B
: You need to create deployments for these flows using Prefect's CLI or API. This involves specifying the flow's entry point, parameters, and any other configurations such as schedules or tags. - Parallel Execution: By using
run_deployment
, you initiate the execution of
flow_A
for each item in
lista_escola
in parallel. For more information on creating and managing deployments, you can explore the Prefect documentation on deployments.
r
@Marvin I use flow_A.serve(name="flow-escolas") to deploy
m
If you are using
flow_A.serve(name="flow-escolas")
to deploy your flow, you can still manage parallel execution and concurrency limits. Here's how you can structure your code: 1. Deploy Using
serve
: Use the
serve
method to deploy
flow_A
. 2. Run Deployments in Parallel: Use
run_deployment
to execute the deployed flow in parallel. Here's an example:
Copy code
python
from prefect import flow, task, tags
from prefect.deployments import run_deployment

@task
def task_a(escola):
    # Sample task logic
    pass

@task
def task_b(json_recuperado):
    # Sample task logic
    pass

@task
def task_c(df, escola):
    # Sample task logic
    pass

@flow
def flow_A(escola):
    response_json = task_a(escola)
    df = task_b(response_json)
    task_c(df, escola)

@flow
def flow_B(lista_escola):
    deployment_ids = []
    for escola in lista_escola:
        with tags("Escola"):
            deployment_id = run_deployment(name="flow-escolas", parameters={"escola": escola})
            deployment_ids.append(deployment_id)
    return deployment_ids

if __name__ == "__main__":
    # Deploy the flow using serve
    flow_A.serve(name="flow-escolas")

    # Run flow_B to execute flow_A in parallel
    flow_B(range(200))
Important Notes: - Serve Method: The
serve
method is used to expose a flow for execution. Ensure that this is correctly configured to allow external calls. - Concurrency Limits: If you want to limit the concurrency to 3, you will need to set this up using Prefect's CLI or API to enforce concurrency limits at the deployment or work pool level. For detailed instructions on setting concurrency limits and using the
serve
method, you can refer to the Prefect 3.x documentation.
r
@Marvin, in this scenario, you just need to deploy flow A, but I need to deploy flow B with the name "flows-todas-escolas"
m
To deploy both
flow_A
and
flow_B
using
serve
, and specifically deploy
flow_B
with the name "flows-todas-escolas", you can follow this setup: 1. Deploy
flow_A
: Use the
serve
method to deploy
flow_A
with a specific name. 2. Deploy
flow_B
: Similarly, deploy
flow_B
using the
serve
method with the desired name. Here's how you can structure your code:
Copy code
python
from prefect import flow, task, tags
from prefect.deployments import run_deployment

@task
def task_a(escola):
    # Sample task logic
    pass

@task
def task_b(json_recuperado):
    # Sample task logic
    pass

@task
def task_c(df, escola):
    # Sample task logic
    pass

@flow
def flow_A(escola):
    response_json = task_a(escola)
    df = task_b(response_json)
    task_c(df, escola)

@flow
def flow_B(lista_escola):
    deployment_ids = []
    for escola in lista_escola:
        with tags("Escola"):
            deployment_id = run_deployment(name="flow-escolas", parameters={"escola": escola})
            deployment_ids.append(deployment_id)
    return deployment_ids

if __name__ == "__main__":
    # Deploy flow_A using serve
    flow_A.serve(name="flow-escolas")

    # Deploy flow_B using serve
    flow_B.serve(name="flows-todas-escolas")

    # Run flow_B to execute flow_A in parallel
    # This can be triggered externally or via deployment
Key Points: - Serve Method: The
serve
method is used to deploy the flows, making them available for execution with the specified names. - Deployment Names: Ensure that the names ("flow-escolas" and "flows-todas-escolas") are unique and correctly referenced when using
run_deployment
. - Concurrent Execution: To manage concurrency, you might need to configure your environment (e.g., Prefect Cloud or server) to handle the desired number of parallel runs. This setup allows you to deploy both flows and manage their execution as needed. If you need further configuration details or assistance, refer to the Prefect documentation.
r
@Marvin When I run the code as you indicated, only the deployment of flow B happens.
m
The
serve
method is designed to create a deployment for a single flow and immediately begin listening for scheduled runs. When you use it in a script, it acts as a long-running process, which means it will block further execution once called. This is why only the deployment of
flow_B
occurs if it's the last
serve
call in your script. To deploy both
flow_A
and
flow_B
using
serve
, you need to handle them separately, potentially in separate scripts or processes. Here's how you can manage this: 1. Deploy
flow_A
: Use a script to deploy
flow_A
and run it. 2. Deploy
flow_B
: Use another script or process to deploy
flow_B
. Here's an example of how you might structure the deployment scripts: ### Script for
flow_A
Copy code
python
from prefect import flow, task

@task
def task_a(escola):
    # Sample task logic
    pass

@task
def task_b(json_recuperado):
    # Sample task logic
    pass

@task
def task_c(df, escola):
    # Sample task logic
    pass

@flow
def flow_A(escola):
    response_json = task_a(escola)
    df = task_b(response_json)
    task_c(df, escola)

if __name__ == "__main__":
    # Deploy flow_A using serve
    flow_A.serve(name="flow-escolas")
### Script for
flow_B
Copy code
python
from prefect import flow, task, tags
from prefect.deployments import run_deployment

@flow
def flow_B(lista_escola):
    deployment_ids = []
    for escola in lista_escola:
        with tags("Escola"):
            deployment_id = run_deployment(name="flow-escolas", parameters={"escola": escola})
            deployment_ids.append(deployment_id)
    return deployment_ids

if __name__ == "__main__":
    # Deploy flow_B using serve
    flow_B.serve(name="flows-todas-escolas")
Considerations: - Separate Processes: Deploying each flow in separate scripts ensures that each
serve
call can run uninterrupted. - Concurrency Management: If you need to manage concurrency across these flows, consider using Prefect's work pools or queues to handle the execution environment. This approach allows both flows to be deployed and run as needed without blocking each other. For more details on deploying flows with
serve
, refer to the Prefect documentation.