Roger Sousa
01/30/2025, 5:05 PMMarvin
01/30/2025, 5:05 PM.submit
directly on a flow object, as flows do not have this method. Instead, you need to use the run_deployment
method, which allows you to execute flows asynchronously and in parallel. Here's how you can modify your code to achieve parallel execution:
1. Create a Deployment: First, you need to create a deployment for flow_A
.
2. Run Deployment: Use run_deployment
to execute the deployment in parallel for each item in lista_escola
.
Here is a concise example:
python
from prefect import flow, task
from prefect.deployments import run_deployment
@task
def task_a(escola):
url = f"<https://api.exemplo.com/escolas/{escola}/dados>"
response = httpx.get(url=url, timeout=10)
response.raise_for_status()
return response.json() # Assuming you want the JSON content
@task
def task_b(json_recuperado):
df = pd.DataFrame(json_recuperado)
df_filtrado = df[df['variable_x'] > 10]
return df_filtrado
@task
def task_c(df, escola):
output_dir = f"data/escolas"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"escola_{escola}.parquet")
df.to_parquet(output_path)
@flow
def flow_A(escola):
response_json = task_a(escola)
df = task_b(response_json)
task_c(df, escola)
@flow
def flow_B(lista_escola):
for escola in lista_escola:
try:
run_deployment(name="flow_A", parameters={"escola": escola})
except Exception as e:
print(e)
raise
# Deployment creation is assumed to be handled outside this script
Note: You will need to ensure that flow_A
is deployed and accessible by name. The run_deployment
method allows you to execute the flow asynchronously, enabling parallel execution.
For more details on running deployments, you can refer to the Prefect 3.x documentation: Prefect 3.x Deployments.
If you have any more questions or need further assistance, feel free to ask.Roger Sousa
01/30/2025, 5:09 PMMarvin
01/30/2025, 5:09 PMtags
context manager to assign tags to flow runs. This allows you to categorize and filter your flow runs based on the tags assigned. Here's an example of how you can modify your code to include tags:
python
from prefect import flow, task, tags
from prefect.deployments import run_deployment
@task
def task_a(escola):
url = f"<https://api.exemplo.com/escolas/{escola}/dados>"
response = httpx.get(url=url, timeout=10)
response.raise_for_status()
return response.json()
@task
def task_b(json_recuperado):
df = pd.DataFrame(json_recuperado)
df_filtrado = df[df['variable_x'] > 10]
return df_filtrado
@task
def task_c(df, escola):
output_dir = f"data/escolas"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"escola_{escola}.parquet")
df.to_parquet(output_path)
@flow
def flow_A(escola):
response_json = task_a(escola)
df = task_b(response_json)
task_c(df, escola)
@flow
def flow_B(lista_escola):
for escola in lista_escola:
with tags(f"Escola-{escola}"):
try:
run_deployment(name="flow_A", parameters={"escola": escola})
except Exception as e:
print(e)
raise
# Deployment creation is assumed to be handled outside this script
In this example, each invocation of flow_A
is tagged with Escola-{escola}
, allowing you to track and filter the flow runs by school in the Prefect UI or logs.
For more detailed information about using tags in Prefect, you can refer to the example here.Roger Sousa
01/30/2025, 5:17 PMMarvin
01/30/2025, 5:17 PMbash
prefect concurrency-limit create <limit> --tag <tag_name>
For example, to set a concurrency limit of 3 for flows tagged with Escola
, use:
bash
prefect concurrency-limit create 3 --tag Escola
3. Using Prefect API: If you prefer using the Python API, you can use the PrefectClient
to set concurrency limits programmatically.
Here's a brief example of using tags and setting concurrency limits:
python
from prefect import flow, task, tags
from prefect.deployments import run_deployment
@task
def task_a(escola):
# Your task code here
pass
@task
def task_b(json_recuperado):
# Your task code here
pass
@task
def task_c(df, escola):
# Your task code here
pass
@flow
def flow_A(escola):
# Your flow code here
pass
@flow
def flow_B(lista_escola):
for escola in lista_escola:
with tags("Escola"):
try:
run_deployment(name="flow_A", parameters={"escola": escola})
except Exception as e:
print(e)
raise
# Ensure to set the concurrency limit via CLI or API beforehand
For more detailed information on configuring concurrency limits, you can refer to the Prefect 3.x documentation on concurrency limits.Roger Sousa
01/30/2025, 7:02 PMMarvin
01/30/2025, 7:02 PMflow_A
and flow_B
, you can use Prefect's deployment capabilities. Here's how you can set it up:
1. Define Flows: First, define your flows flow_A
and flow_B
.
2. Create Deployments: Use Prefect's deployment functionality to deploy these flows.
3. Run Deployments in Parallel: Utilize run_deployment
to execute the flows in parallel.
Here's a complete example:
python
from prefect import flow, task, tags
from prefect.deployments import run_deployment
@task
def task_a(escola):
# Sample task logic
pass
@task
def task_b(json_recuperado):
# Sample task logic
pass
@task
def task_c(df, escola):
# Sample task logic
pass
@flow
def flow_A(escola):
response_json = task_a(escola)
df = task_b(response_json)
task_c(df, escola)
@flow
def flow_B(lista_escola):
deployment_ids = []
for escola in lista_escola:
with tags("Escola"):
deployment_id = run_deployment(name="flow_A", parameters={"escola": escola})
deployment_ids.append(deployment_id)
return deployment_ids
# Deploy the flows
if __name__ == "__main__":
# Assuming you have deployment configurations set up
flow_A_deployment_id = run_deployment(name="flow_A")
flow_B_deployment_id = run_deployment(name="flow_B", parameters={"lista_escola": range(200)})
# Run flow_B to execute flow_A in parallel
flow_B(range(200))
Steps to Deploy:
- Deploy flow_A
and flow_B
: You need to create deployments for these flows using Prefect's CLI or API. This involves specifying the flow's entry point, parameters, and any other configurations such as schedules or tags.
- Parallel Execution: By using run_deployment
, you initiate the execution of flow_A
for each item in lista_escola
in parallel.
For more information on creating and managing deployments, you can explore the Prefect documentation on deployments.Roger Sousa
01/30/2025, 7:11 PMMarvin
01/30/2025, 7:11 PMflow_A.serve(name="flow-escolas")
to deploy your flow, you can still manage parallel execution and concurrency limits. Here's how you can structure your code:
1. Deploy Using serve
: Use the serve
method to deploy flow_A
.
2. Run Deployments in Parallel: Use run_deployment
to execute the deployed flow in parallel.
Here's an example:
python
from prefect import flow, task, tags
from prefect.deployments import run_deployment
@task
def task_a(escola):
# Sample task logic
pass
@task
def task_b(json_recuperado):
# Sample task logic
pass
@task
def task_c(df, escola):
# Sample task logic
pass
@flow
def flow_A(escola):
response_json = task_a(escola)
df = task_b(response_json)
task_c(df, escola)
@flow
def flow_B(lista_escola):
deployment_ids = []
for escola in lista_escola:
with tags("Escola"):
deployment_id = run_deployment(name="flow-escolas", parameters={"escola": escola})
deployment_ids.append(deployment_id)
return deployment_ids
if __name__ == "__main__":
# Deploy the flow using serve
flow_A.serve(name="flow-escolas")
# Run flow_B to execute flow_A in parallel
flow_B(range(200))
Important Notes:
- Serve Method: The serve
method is used to expose a flow for execution. Ensure that this is correctly configured to allow external calls.
- Concurrency Limits: If you want to limit the concurrency to 3, you will need to set this up using Prefect's CLI or API to enforce concurrency limits at the deployment or work pool level.
For detailed instructions on setting concurrency limits and using the serve
method, you can refer to the Prefect 3.x documentation.Roger Sousa
01/30/2025, 7:39 PMMarvin
01/30/2025, 7:40 PMflow_A
and flow_B
using serve
, and specifically deploy flow_B
with the name "flows-todas-escolas", you can follow this setup:
1. Deploy flow_A
: Use the serve
method to deploy flow_A
with a specific name.
2. Deploy flow_B
: Similarly, deploy flow_B
using the serve
method with the desired name.
Here's how you can structure your code:
python
from prefect import flow, task, tags
from prefect.deployments import run_deployment
@task
def task_a(escola):
# Sample task logic
pass
@task
def task_b(json_recuperado):
# Sample task logic
pass
@task
def task_c(df, escola):
# Sample task logic
pass
@flow
def flow_A(escola):
response_json = task_a(escola)
df = task_b(response_json)
task_c(df, escola)
@flow
def flow_B(lista_escola):
deployment_ids = []
for escola in lista_escola:
with tags("Escola"):
deployment_id = run_deployment(name="flow-escolas", parameters={"escola": escola})
deployment_ids.append(deployment_id)
return deployment_ids
if __name__ == "__main__":
# Deploy flow_A using serve
flow_A.serve(name="flow-escolas")
# Deploy flow_B using serve
flow_B.serve(name="flows-todas-escolas")
# Run flow_B to execute flow_A in parallel
# This can be triggered externally or via deployment
Key Points:
- Serve Method: The serve
method is used to deploy the flows, making them available for execution with the specified names.
- Deployment Names: Ensure that the names ("flow-escolas" and "flows-todas-escolas") are unique and correctly referenced when using run_deployment
.
- Concurrent Execution: To manage concurrency, you might need to configure your environment (e.g., Prefect Cloud or server) to handle the desired number of parallel runs.
This setup allows you to deploy both flows and manage their execution as needed. If you need further configuration details or assistance, refer to the Prefect documentation.Roger Sousa
01/30/2025, 7:45 PMMarvin
01/30/2025, 7:45 PMserve
method is designed to create a deployment for a single flow and immediately begin listening for scheduled runs. When you use it in a script, it acts as a long-running process, which means it will block further execution once called. This is why only the deployment of flow_B
occurs if it's the last serve
call in your script.
To deploy both flow_A
and flow_B
using serve
, you need to handle them separately, potentially in separate scripts or processes. Here's how you can manage this:
1. Deploy flow_A
: Use a script to deploy flow_A
and run it.
2. Deploy flow_B
: Use another script or process to deploy flow_B
.
Here's an example of how you might structure the deployment scripts:
### Script for flow_A
python
from prefect import flow, task
@task
def task_a(escola):
# Sample task logic
pass
@task
def task_b(json_recuperado):
# Sample task logic
pass
@task
def task_c(df, escola):
# Sample task logic
pass
@flow
def flow_A(escola):
response_json = task_a(escola)
df = task_b(response_json)
task_c(df, escola)
if __name__ == "__main__":
# Deploy flow_A using serve
flow_A.serve(name="flow-escolas")
### Script for flow_B
python
from prefect import flow, task, tags
from prefect.deployments import run_deployment
@flow
def flow_B(lista_escola):
deployment_ids = []
for escola in lista_escola:
with tags("Escola"):
deployment_id = run_deployment(name="flow-escolas", parameters={"escola": escola})
deployment_ids.append(deployment_id)
return deployment_ids
if __name__ == "__main__":
# Deploy flow_B using serve
flow_B.serve(name="flows-todas-escolas")
Considerations:
- Separate Processes: Deploying each flow in separate scripts ensures that each serve
call can run uninterrupted.
- Concurrency Management: If you need to manage concurrency across these flows, consider using Prefect's work pools or queues to handle the execution environment.
This approach allows both flows to be deployed and run as needed without blocking each other. For more details on deploying flows with serve
, refer to the Prefect documentation.