<@ULVA73B9P> I want to know how to write two flows...
# ask-marvin
l
@Marvin I want to know how to write two flows and a task that when run show a dependency arrow between them.
m
Here's a simple example of two flows with a dependency between them:
Copy code
python
from prefect import flow, task

@task
def simple_task():
    print("I am a simple task.")
    return "simple_task_result"

@flow
def dependent_flow(data):
    print(f"Received data: {data}")
    simple_task()

@flow
def upstream_flow():
    result = simple_task()
    dependent_flow(result)

if __name__ == "__main__":
    upstream_flow()
In this example: 1.
simple_task
is a simple task that prints a message and returns a result. 2.
dependent_flow
is a flow that takes a parameter
data
and invokes
simple_task
. 3.
upstream_flow
runs
simple_task
, captures its result, and then runs
dependent_flow
passing the result from
simple_task
. To visualize a dependency arrow between the flows, you could create a diagram or use Prefect's UI if the flows are registered in Prefect Cloud or Prefect Server. When the flows are run, Prefect will track the dependencies and show them accordingly. Please note that the actual visualization of the dependency arrow is part of Prefect's UI capabilities, and this code does not directly create a visual dependency arrow.
l
@Marvin Can you explain to me why this doesn't show any dependency arrow? from prefect import task, flow from datetime import datetime import time from electracommons.log_config import PrefectLogger logger_prefect = _PrefectLogger_(file) @task def _print_message_(message="Hola"): logger = logger_prefect._obtener_logger_prefect_() logger.info("Iniciando tarea...") _print_(message) time._sleep_(2) return message @flow def _mano_derecha_(message="Hola"): time._sleep_(2) message = _print_message_(message) return message @flow def _mano_izquierda_(message="Hola"): time._sleep_(2) message = _mano_derecha_(message) return _print_message_(message) @flow def _basic_flow_(): logger = logger_prefect._obtener_logger_prefect_() logger.info("Iniciando flujo...") message = _mano_derecha_() _mano_izquierda_(message) return message if name == "__main__": _basic_flow_()
m
Your message was too long by 80 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Can you explain to me why this doesn't show any dependency arrow?
from prefect import task, flow from datetime import datetime import time from electracommons.log_config import PrefectLogger logger_prefect = _PrefectLogger_(file) _@task_(name="Printear Mensaje") def _print_message_(message="Hola"): logger = logger_prefect._obtener_logger_prefect_() logger.info("Iniciando tarea...") _print_(message) time._sleep_(2) return message _@flow_(name="Mano Derecha") def _mano_derecha_(message="Hola"): time._sleep_(2) message = _print_message_(message) return message _@flow_(name="Mano Izquierda") def _mano_izquierda_(message="Hola"): time._sleep_(2) message = _mano_derecha_(message) return _print_message_(message) _@flow_(name="Flujo Básico", version="v1.0", _flow_run_name_=_f_"{datetime._strftime_(datetime._now_(), "%d/%m/%Y - %H:%M:%S")} | Flu
Your message was too long by 27 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Can you explain to me why this doesn't show any dependency arrow?
from prefect import task, flow from datetime import datetime import time from electracommons.log_config import PrefectLogger logger_prefect = _PrefectLogger_(file) _@task_(name="Printear Mensaje") def _print_message_(message="Hola"): logger = logger_prefect._obtener_logger_prefect_() logger.info("Iniciando tarea...") _print_(message) time._sleep_(2) return message _@flow_(name="Mano Derecha") def _mano_derecha_(message="Hola"): time._sleep_(2) message = _print_message_(message) return message _@flow_(name="Mano Izquierda") def _mano_izquierda_(message="Hola"): time._sleep_(2) message = _mano_derecha_(message) return _print_message_(message) _@flow_(name="Flujo Básico") def _basic_flow_(): logger = logger_prefect._obtener_logger_prefect_() logger.info("Iniciando flujo...") message = _mano_derecha_()
Analyzing the code you've provided, I notice there's an attempt to create a flow with dependencies, but the flow definition might be leading to confusion regarding how Prefect tracks task and flow dependencies. Here's an explanation: 1.
_print_message_
is a task that presumably logs and prints a message, then sleeps for 2 seconds before returning the message. 2.
_mano_derecha_
and
_mano_izquierda_
are flows. Each one executes
_print_message_
, sleeps for 2 seconds, and returns a message. 3.
_mano_izquierda_
additionally calls
_mano_derecha_
within its flow, creating a nested flow call, which should be creating a dependency between
_mano_izquierda_
and
_mano_derecha_
. 4.
_basic_flow_
calls
_mano_derecha_
, then
_mano_izquierda_
with the message returned from
_mano_derecha_
. From the theoretical perspective, there should be a dependency between
_mano_izquierda_
and
_mano_derecha_
because
_mano_izquierda_
calls
_mano_derecha_
. However, dependencies are tracked via Prefect's execution model, which utilizes the Prefect backend to visualize. There are a few potential reasons why the dependency arrow might not be showing: 1. If you're running the flows locally and not using Prefect Cloud or Server, then you won't see any visualization of the dependency arrows. The Prefect UI is necessary for visualizing such dependencies. 2. If you expect direct visualization in code without any external tool or UI, then it won't happen. Prefect's dependencies are logical and managed through its execution engine, and visualization is via the UI. 3. If there's a mistake in how tasks and subflows are called, or if the results are not correctly passed between them, this could disrupt the flow of data and dependency tracking. If you are using Prefect Cloud or Server and still don't see the dependency arrows, ensure that: - The backend is correctly set up to track flow runs. - The flows and tasks are registered with the backend. - There are not any errors that are preventing the flows and tasks from executing correctly, which would also prevent the dependencies from being tracked. To debug further, you might want to make sure you are correctly specifying the flow and task dependencies, and use Prefect's diagnostic tools or logs to check whether there are issues with task return values being passed appropriately. Remember, the visualization of dependencies happens once the flows are registered with and executed through the Prefect backend (Cloud or Server). Here is an adjusted version of your code (note that I removed the angle brackets and underscores because they might cause syntax issues):
Copy code
python
from prefect import task, flow
from datetime import datetime
import time

from electracommons.log_config import PrefectLogger

logger_prefect = PrefectLogger(__file__)

@task
def print_message(message="Hola"):
    logger = logger_prefect.obtener_logger_prefect()
    <http://logger.info|logger.info>("Iniciando tarea...")
    print(message)
    time.sleep(2)
    return message

@flow
def mano_derecha(message="Hola"):
    time.sleep(2)
    message = print_message(message)
    return message

@flow
def mano_izquierda(message="Hola"):
    time.sleep(2)
    message = mano_derecha(message)
    return print_message(message)

@flow
def basic_flow():
    logger = logger_prefect.obtener_logger_prefect()
    <http://logger.info|logger.info>("Iniciando flujo...")

    message = mano_derecha()
    mano_izquierda(message)
    return message

if __name__ == "__main__":
    basic_flow()
Remember, Prefect's tracking and visualization rely on the Prefect backend to display any dependencies. If you're running this code outside of Prefect Cloud or Server, you won't see a visual representation of the dependencies.
l
@Marvin I added/modified these functions: @task def _print_message_(numero): logger = logger_prefect._obtener_logger_prefect_() logger.info("Iniciando tarea...") time._sleep_(1) message = "La respuesta es: " + _str_(numero) logger.info(message) return message @task def _sumar_numeros_(_a_: int, _b_: int) -> _int_: time._sleep_(1) return a + b @task def _duplicar_numero_(_a_: int) -> _int_: time._sleep_(1) return a * 2 @flow def _mano_derecha_(numero): time._sleep_(1) message = _print_message_(numero) return message @flow def _mano_izquierda_(numero): time._sleep_(1) message = _mano_derecha_(numero) return message Is it possible to generate those dependency arrows? If so how?
m
To generate a visualization of the dependency arrows in Prefect, tasks and flows must be orchestrated in such a way that Prefect can track their interdependencies. I would like to emphasize that the visualization aspect, including arrows showing task dependencies, is handled by the Prefect UI, which requires the registration of flows and tasks with Prefect Cloud or Server. Simply running the flows locally without such registration will not produce a visual dependency graph. Here is the updated script based on your added and modified functions (with angle brackets and underscores corrected for proper syntax):
Copy code
python
from prefect import task, flow
import time
from electracommons.log_config import PrefectLogger

logger_prefect = PrefectLogger(__file__)

@task
def print_message(numero):
    logger = logger_prefect.obtener_logger_prefect()
    <http://logger.info|logger.info>("Iniciando tarea...")
    time.sleep(1)

    message = f"La respuesta es: {numero}"
    <http://logger.info|logger.info>(message)

    return message

@task
def sumar_numeros(a: int, b: int) -> int:
    time.sleep(1)
    return a + b

@task
def duplicar_numero(a: int) -> int:
    time.sleep(1)
    return a * 2

@flow
def mano_derecha(numero):
    time.sleep(1)
    sum_result = sumar_numeros(numero, 10)  # Added a task call to show dependency
    result = duplicar_numero(sum_result)     # Added another task call to show dependency
    message = print_message(result)
    return message

@flow
def mano_izquierda(numero):
    time.sleep(1)
    message = mano_derecha(numero)
    return message

@flow
def basic_flow(numero: int):
    logger = logger_prefect.obtener_logger_prefect()
    <http://logger.info|logger.info>("Iniciando flujo...")
    message = mano_izquierda(numero)
    return message

if __name__ == "__main__":
    basic_flow(5)
To visualize the dependencies as arrows between tasks and flows, you need to: 1. Register your flows with Prefect Cloud/Server using
prefect register
. 2. Execute the flows either locally or via an agent that reports to Prefect Cloud/Server. 3. Check the Prefect UI for the visual representation of task and flow dependencies. The UI will display the dependencies as arrows based on the execution graph that Prefect builds when the flow is run. Each task and subflow will be represented as a node in the graph, with arrows depicting the dependencies between them - for instance, which task's output is used as input to another task. Remember, simply running the code will not produce visual dependencies. For the visual representation, you must register and run the flows in an environment where Prefect Cloud/Server can track and visualize that execution graph.
l
@Marvin I know its in the Prefect UI. Also that code you sent me only shows the dependency in blocks. No arrows
m
The code example provided doesn't directly address the creation of visible dependencies in terms of arrows in the Prefect UI, as it is a basic flow setup. However, the Prefect UI represents dependencies through its visualization of the flow run, and this is determined by the structure of your flow and tasks when they are executed within the Prefect ecosystem. Dependencies in Prefect are determined by the order of task execution and the passing of data between tasks. When you have tasks that depend on the results of other tasks, Prefect automatically infers this relationship and will represent it with arrows in the UI when the flow is run through Prefect Cloud or Server. To create tasks with dependencies, you should chain tasks such that the output of one task is the input to another, like this:
Copy code
python
from prefect import task, flow

@task
def task1(x):
    # Some processing...
    return x + 1

@task
def task2(y):
    # Some processing...
    return y * 2

@flow
def my_flow():
    result1 = task1(1)  # Task 1 runs
    result2 = task2(result1)  # Task 2 runs after Task 1, depends on its result
    
    return result2

if __name__ == "__main__":
    my_flow()
In the Prefect UI, if
task2
uses the output of
task1
, there will be an arrow connecting
task1
to
task2
once the flow is executed and the UI updates according to the actual flow run. This is because `task1`'s output is directly being passed as input to
task2
, creating a clear dependency that Prefect's UI will visualize. If you are using the Prefect UI and not observing these dependency arrows after running the flows, the issue might be with registration, execution, or the specific version and configuration of Prefect you are using. It may be helpful to review Prefect's documentation on flow visualization or contact Prefect's support if you believe there is a bug or issue with the UI not displaying flow and task dependencies correctly. Remember to ensure: 1. Tasks are registered and executed in an environment connected to Prefect Cloud/Server. 2. Dependencies are explicitly defined by passing the output of tasks as inputs to others. 3. You take advantage of Prefect's logging to debug and understand how your flows and tasks are being executed.
l
@Marvin I want to teach you something for the record. When looking to generate dependency arrows for the tasks and flows you should send more complex objects, like strings. When sending small ints between functions that dependency will not be showed as dependency arrows.
m
Thank you for the clarification. In Prefect 2.0 (previously known as Orion), dependencies are indeed tracked more explicitly when dealing with more complex objects, as you mentioned. When tasks exchange simple, immutable data types such as integers, Prefect may optimize the flow's execution by "inlining" the values, which can result in dependencies not being visualized as arrows in the UI. To ensure that dependencies between tasks are visualized properly, one should consider passing more complex objects like strings, lists, dictionaries, custom objects, or even the result of a task itself rather than simple types like integers, whenever possible. Here’s an example where strings are passed between tasks that could then result in visible dependency arrows:
Copy code
python
from prefect import task, flow

@task
def generate_message(number):
    return f"The number is: {number}"

@task
def modify_message(message):
    return f"{message} And it's quite an interesting number, isn't it?"

@flow
def my_flow():
    initial_message = generate_message(42)  # Task 1 runs and returns a string
    final_message = modify_message(initial_message)  # Task 2 runs with a string input
    return final_message

if __name__ == "__main__":
    my_flow()
Upon executing this flow in an environment connected to Prefect Cloud or Server, and assuming tasks are not inlined, you would see a dependency arrow from
generate_message
to
modify_message
in the Prefect UI. The usage of a string that is returned by
generate_message
and then passed to
modify_message
helps Prefect’s engine create a visual representation of the dependency between these tasks.
l
@Marvin I have an error using prefect and pyodbc: @flow def conectar_sql_server(server, database, username): logger = logger_global.obtener_logger_prefect() credencial = kr.get_credential(server, username) if credencial: username = credencial.username password = credencial.password else: logger.warning(f"No se encontraron credenciales para {username} para {os.getlogin()}") try: key = "DRIVER={ODBC Driver 17 for SQL Server};SERVER="+ server + ';DATABASE=' + database + ';UID=' + username + ';PWD=' + password conn = pyodbc.connect(key) cursor = conn.cursor() logger.info("¡Conectado exitosamente a SQL Server!") except Exception as _: send_mail(mail_subject, mail_body) return (None, None) return (conn, cursor) When it returns I get this error: 45153.744 | INFO | Flow 'conectar-sql-server' - View at http://127.0.0.2:5000/flow-runs/... 145207.834 | ERROR | Flow 'unknown' - Crash detected! Execution was interrupted by an unexpected exception: ProgrammingError: No results. Previous SQL was not a query.
m
Your message was too long by 244 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
I have an error using prefect and pyodbc:
@flow def conectar_sql_server(server, database, username): logger = logger_global.obtener_logger_prefect() # Obtenemos credenciales desde WCM logger.info(f"Obteniendo credenciales para {username}, {server}") credencial = kr.get_credential(server, username) if credencial: username = credencial.username password = credencial.password logger.info(f"Se obtuvieron las credenciales para {username}") else: logger.warning(f"No se encontraron credenciales para {username} en Win Credential Manager para el usuario de Windows {os.getlogin()}") try: # inicializar conexión key = "DRIVER={ODBC Driver 17 for SQL Server};SERVER="+ server + ';DATABASE=' + database + ';UID=' + username + ';PWD=' + password conn = pyodbc.connect(key) # guardar cursor cursor = conn.cursor() #logear logger.info("¡Conectado exitosamente a SQL Server!") except Exception as _: mail_subject = '¡Error en la conexion a la BD!' mail_body = f'<h1>Verificar la conexion con la BD en {os.path.basename(file)}</h1
Your message was too long by 8 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
I have an error using prefect and pyodbc:
@flow def conectar_sql_server(server, database, username): logger = logger_global.obtener_logger_prefect() credencial = kr.get_credential(server, username) if credencial: username = credencial.username password = credencial.password logger.info(f"Se obtuvieron para {username}") else: logger.warning(f"No se encontraron credenciales para {username} para {os.getlogin()}") try: key = "DRIVER={ODBC Driver 17 for SQL Server};SERVER="+ server + ';DATABASE=' + database + ';UID=' + username + ';PWD=' + password conn = pyodbc.connect(key) cursor = conn.cursor() logger.info("¡Conectado exitosamente a SQL Server!") except Exception as _: send_mail(mail_subject, mail_body) return (None, None) return (conn, cursor) When it returns I get this error: 45153.744 | INFO | Flow 'conectar-sql-server' - View at http://127.0.0.2:5000/flow-runs/... 145207.834 | ERROR | Flow '<unknown>' - Crash detected! Execution was interrupted by an unexpected exception: ProgrammingError: No results.
The error message you're encountering,
ProgrammingError: No results. Previous SQL was not a query
, indicates that there is an issue with the SQL command sent through
pyodbc
. This type of error is often raised when the SQL command executed is not returning any results because it is not a SELECT statement or similar query operation that retrieves data. Here are some steps you can take to troubleshoot this issue: 1. Ensure that any SQL command executed with
cursor.execute()
that is expected to return data is indeed a correct SELECT query or other command that produces a result set. 2. Check if your cursor or connection is timing out or being closed prematurely, which might result in a ProgrammingError if you try to use it after the closure. 3. Validate that any DML (Data Manipulation Language) commands such as INSERT, UPDATE, DELETE, etc., are followed up with a
conn.commit()
to ensure they're committed to the database if they precede your SELECT or data retrieval command. 4. Add more detailed error handling to catch and log the exact SQL command causing the error for easier debugging. Unfortunately, the Prefect documentation search didn't return any specific guidance on handling
pyodbc
connection errors or
ProgrammingError
in flows, and the GitHub issues search revealed no relevant issues. The links provided from the general Prefect information do not address this specific error either. For handling these types of errors in Prefect, you might want to use specific error handling patterns like
try/except
blocks within your flow and ensure that any failure can be captured and managed according to your needs. Make sure to include logging in the exception handling to give you more insight into what went wrong. Since the documents aren't presenting a solution, you may need to explore the solution based on standard
pyodbc
error handling practices and make sure the SQL commands executed are correct and handled properly within the flow.
l
@Marvin when using @task it doesnt fails. But it does with @flow.
m
When a function decorated with
@flow
in Prefect raises an error, it typically indicates that the flow's execution is encountering issues that don't occur at the task level. Since you've mentioned that using
@task
doesn't cause a failure but using
@flow
does, this suggests that the error is related to how the flow is managing resources or state, such as database connections or cursors. Here are a couple of points to consider: 1. Context Switching and Serialization: Flows may involve context switching and potentially run in different environments or under different conditions than tasks. Prefect might attempt to serialize state, which, in the case of database connections or cursors, is not possible because they are not serializable objects. When Prefect executes a flow, it may be occurring in a context where it expects objects can be passed around and persist across different states, which isn't true for database connections. 2. Error Handling: It might be that the error handling within the flow isn't capturing exceptions as you expect. It's possible that there's a disconnect in the exception handling between the flow and the database operation logic. 3. Database Transaction Management: If the flow initiates a transaction but fails to commit or roll back properly, subsequent operations could fail or raise exceptions due to the uncommitted state. Here's what you can do to further troubleshoot: - Add Detailed Logging: Within your flow, add detailed logging that can help you pinpoint exactly where the error is occurring. Ensure you log SQL statements being executed, as well as entry and exit points in your flow.
Copy code
python
# Before attempting to connect
<http://logger.info|logger.info>(f"Attempting to connect with the following credentials: Server={server}; Database={database}; User={username}")

# After the connection attempt
if conn:
    <http://logger.info|logger.info>("Connection established successfully")
else:
    logger.error("Failed to establish a connection")
- Transactional Scope: Make sure that each database transaction is self-contained within a task that includes any required
commit()
or
rollback()
operations. - Error States: Check if the Prefect flow is halted due to state propagation. In Prefect, the state of a flow can depend on the state of the tasks within it. If a flow task fails and that state is not properly handled, it might cause the flow to enter an unexpected state. If you are still unable to resolve the error, consider adjusting the logic to ensure that the database connection and cursor used by the flow are managed within tasks that handle their lifecycle appropriately. Avoid passing these non-serializable objects between tasks within the flow, which could be causing serialization issues. If you're able to share more detailed logs or the full stack trace of the error, that would also help in pinpointing the specific cause.
l
@Marvin I had upgraded prefect to the new 2.16.6 version. When I restarted the local prefect server I dont have any of my deployments and configurations. When checking the prefect home path the db file is there
m
When you restart a local Prefect server, you should expect your deployments and configurations to persist across restarts as long as the underlying database (which by default is a SQLite database) remains intact. If you have upgraded Prefect to version 2.16.6 and find that your deployments and configurations are not present, there are a few things you could check: 1. Prefect Home Path: Make sure that the Prefect home path is correctly set and that Prefect is referencing the correct
prefect.db
file. The default location for the database is
~/.prefect/prefect.db
. If Prefect's home directory was changed, ensure that the Prefect configuration points to the correct path. 2. Database Migration: Verify if a database migration is required after the upgrade. Typically, Prefect would handle migrations automatically, but it's worth checking the release notes and documentation for any specifics about database changes between versions. 3. Database Integrity: Ensure the database file has not been corrupted or modified improperly, which may cause Prefect to be unable to read your previous configurations. 4. Prefect Configuration: The configuration of Prefect, including storage for flows, may be stored in various Prefect settings. Check your Prefect configurations to make sure they align with where your data is stored. If after checking these points you are still facing issues, you might want to: - Backup
prefect.db
before attempting any fixes. - Consult the Prefect release notes and documentation for version 2.16.6 to see if there are any specific instructions or notes about migrations or breaking changes (Release Notes). Finally, if you've exhausted these options and are still unable to restore your deployments and configurations, it could be a good idea to reach out to Prefect support or the Prefect community for assistance, as they may be able to provide more contextual help or determine if there's an issue with the new release. For the long term, consider implementing backups for your Prefect server database, especially before performing upgrades, to ensure that you can restore to a previous state in case of issues. If you want to explore or reset your database, you can use Prefect CLI commands to interact with the database. However, be cautious as resetting the database will remove all existing data. Always back up your database before running any commands that could alter or delete data.
l
@Marvin in windows. How to set the prefect environment variable Prefect Home Path
m
To set the
PREFECT_HOME
environment variable on Windows, which specifies the local Prefect directory for configuration files, profiles, and the location of the default Prefect SQLite database, follow these steps: 1. Search for "Environment Variables" in the Start Menu or open the Control Panel, go to System and Security => System => Advanced system settings => Environment Variables. 2. Under the "System Variables" section (for all users) or the "User Variables" section (for the current user only), click "New..." to create a new environment variable. 3. Enter
PREFECT_HOME
as the variable name and set the desired path as the variable value. For example: Variable name:
PREFECT_HOME
Variable value:
C:\path\to\your\prefect\directory
4. Click "OK" to create the variable, then "OK" again to close the Environment Variables window, and one last time to close the System Properties window. 5. Restart any open command prompts or applications to apply the change. If you prefer to set the environment variable temporarily or just for the current session, you can also do it from the command line using the
set
command. Open a command prompt and run:
Copy code
cmd
set PREFECT_HOME=C:\path\to\your\prefect\directory
For this change to persist across sessions, you'll want to use the first method and set it through the Environment Variables dialog in the system settings. For more information on Prefect settings and environment variables, refer to the Prefect documentation on settings. Remember to use an absolute path and not a relative path when setting the environment variable. After setting the
PREFECT_HOME
variable, you may need to restart the Prefect server for the changes to take effect.
l
@Marvin I'd like to export a deployment configuration to a yaml file
m
To export a Prefect deployment configuration to a YAML file, such as
deployment.yaml
, you'll typically use Prefect's deployment CLI capabilities. The YAML file will define various aspects of your deployment including the flow entrypoint, infrastructure, and parameters. Here's a step-by-step guide on how to create and export a deployment configuration to a YAML file: 1. Ensure Your Prefect CLI is Up-to-Date: Confirm that you have the latest version of Prefect installed.
Copy code
bash
pip install --upgrade prefect
2. Define Your Deployment Configuration in Code: Make sure your flow is defined in a way Prefect can recognize. For example:
Copy code
python
# my_flow.py
from prefect import flow

@flow(name="My Example Flow")
def my_example_flow(param1: str):
    print(f"Parameter: {param1}")

if __name__ == "__main__":
    my_example_flow()
3. Create the Deployment YAML File: Run the Prefect CLI command to create a YAML representation of your deployment configuration. You can do this by using the
deployment build
command.
Copy code
bash
prefect deployment build my_flow.py:my_example_flow --name "My Deployment" --output deployment.yaml
This command builds a deployment file
deployment.yaml
for the
my_example_flow
in the
my_flow.py
module. Here is an example command with additional typical flags:
Copy code
bash
prefect deployment build my_flow.py:my_example_flow \
    --name "My Deployment" \
    --version 1 \
    --work-queue "default" \
    --output deployment.yaml
4. Content of deployment.yaml: The generated
deployment.yaml
will contain the configuration for your deployment. It includes entries for the flow's entrypoint, any parameters, schedule details, infrastructure settings, etc. An example of what this file might look like is:
Copy code
yaml
name: My Deployment
version: 1
entrypoint: my_flow.py:my_example_flow
schedule: null
parameters: {}
infrastructure:
  type: Process
  env: {}
tags: []
work_queue_name: default
5. Apply the Deployment: Once you have the
deployment.yaml
file configured and edited as needed, you can apply it to register the deployment with Prefect.
Copy code
bash
prefect deployment apply deployment.yaml
This process registers your deployment with Prefect so that it can be scheduled and run as specified in the YAML configuration. For further reading and detailed instructions, refer to the Prefect Deployment Documentation. This should cover the steps needed to export and manage your deployment configuration using YAML files with Prefect.
l
@Marvin It's already deployed using prefect deploy. I dont have that yaml. How to edit the working directory?
m
To edit the working directory of an existing deployment without having the original
deployment.yaml
file, you will need to make changes directly using the Prefect CLI and the deployment's configuration. Assuming you have already deployed a flow using
prefect deploy
, here’s how you can update its working directory: ### Step-by-Step Guide to Edit the Deployment Configuration 1. Retrieve Deployment Configuration: First, you need to retrieve the current configuration of the deployment. Unfortunately, Prefect does not have a direct CLI command to export a deployment to YAML after deployment. However, you can list the configurations and interact with the deployment settings using the Prefect API or CLI. 2. Edit Deployment Configuration: Once you know your deployment name, you can create or modify the deployment configuration using a new YAML file. If you didn't save the original YAML, you'll have to recreate it with the updated working directory. 3. Create a New YAML Configuration File: Here is an example of how your YAML deployment configuration might look, including the updated working directory:
Copy code
yaml
   name: my-deployment
   version: 1
   entrypoint: my_flow.py:my_flow_function
   schedule: null
   parameters: {}
   tags: []
   work_queue_name: default
   infrastructure:
     type: Process
     env: {}
     working_directory: "C:/new/working/directory"
4. Apply the Updated Configuration: Save this YAML configuration to a file, for example,
deployment.yaml
. Then apply this new configuration using the Prefect CLI:
Copy code
bash
   prefect deployment apply deployment.yaml
### Example of Creating and Applying a New Deployment Configuration First, create your YAML configuration file. Let’s name it
new_deployment.yaml
.
Copy code
yaml
name: my-deployment
entrypoint: my_flow.py:my_example_flow
parameters: {}
schedule: null
tags: []
work_queue_name: default
infrastructure:
  type: Process
  working_directory: "C:/new/working/directory"
  env: {}
Next, apply the new deployment configuration:
Copy code
bash
prefect deployment apply new_deployment.yaml
Alternatively, if you need to specify the working directory for immediate deployment without creating a YAML file, run:
Copy code
bash
prefect deployment build my_flow.py:my_example_flow \
    --name "My Deployment" \
    --version 1 \
    --work-queue "default" \
    --output new_deployment.yaml

# Edit the new_deployment.yaml file to update the working directory

prefect deployment apply new_deployment.yaml
### Summary By using the Prefect CLI to build and apply deployment configurations, you can easily update attributes such as the working directory. Always make sure to back up your configurations and carefully review the changes in your YAML file before applying them. For the most detailed and accurate steps, refer to the official Prefect Deployment Documentation which covers deployment configurations and available options.
l
@Marvin I want to execute a flow from fastapi and show it in the UI. How to do that?
m
To execute a Prefect flow from a FastAPI application and have it displayed in the Prefect UI, you need to integrate FastAPI with Prefect. The following steps outline this process: ### Step-by-Step Guide 1. Define Your Prefect Flow: Create a Python script with your Prefect flow.
Copy code
python
    # my_flow.py
    from prefect import flow, task

    @task
    def say_hello(name: str) -> str:
        return f"Hello, {name}!"

    @flow(name="My Example Flow")
    def my_example_flow(name: str):
        greeting = say_hello(name)
        print(greeting)

    if __name__ == "__main__":
        my_example_flow("World")
2. Setup FastAPI Application: Create a FastAPI application where the Prefect flow can be triggered.
Copy code
python
    # main.py
    from fastapi import FastAPI
    from my_flow import my_example_flow
    from prefect import flow

    app = FastAPI()

    @app.post("/run-flow/")
    async def run_flow(name: str):
        flow_run = my_example_flow(name)
        return {"message": f"Flow run started for {name}!"}

    if __name__ == "__main__":
        import uvicorn
        uvicorn.run(app, host="127.0.0.1", port=8000)
3. Configure Prefect to Use the Server: Ensure Prefect is pointing to your Prefect Server or Prefect Cloud instance. Configure this in your environment or use code to set it.
Copy code
bash
    prefect config set PREFECT_API_URL=<http://127.0.0.1:4200/api>
4. Run the FastAPI Application: Start the FastAPI application.
Copy code
bash
    uvicorn main:app --reload
5. Trigger the Flow: You can now trigger the flow by making a POST request to
/run-flow/
endpoint.
Copy code
bash
    curl -X POST "<http://127.0.0.1:8000/run-flow/?name=John>"
### Code Breakdown ####
my_flow.py
- Defines a simple Prefect flow
my_example_flow
and a task
say_hello
. ####
main.py
- Defines a FastAPI application with an endpoint
/run-flow/
where the flow can be triggered via an HTTP POST request. - When the endpoint is hit, the
my_example_flow
is called, and a new flow run is initiated. ### Displaying Flow in Prefect UI - Ensure you have Prefect Server running locally or use Prefect Cloud. - Your flow runs should now appear in the Prefect UI, as Prefect is configured to communicate with the server based on
PREFECT_API_URL
. ### References For more detailed instructions and examples, the Prefect documentation and FastAPI documentation are good resources. By integrating FastAPI and Prefect as shown, you’ll be able to trigger and manage flow runs from your FastAPI endpoints and visualize them in the Prefect UI.
l
@Marvin I already tried that and it executes but doesnt show in the ui
m
If the flow executes but does not show in the Prefect UI, it's likely due to how the flow run is being initiated or a configuration issue with Prefect. Here are a few additional steps and checks to ensure the flow run is tracked and displayed in the Prefect UI: ### Ensure Prefect is Configured to Use the Server 1. Set the Prefect API URL: Make sure your environment is correctly pointing to the Prefect Server or Prefect Cloud.
Copy code
bash
    prefect config set PREFECT_API_URL=<http://127.0.0.1:4200/api>  # Adjust the URL accordingly
2. Ensure Auth is Correct: If using Prefect Cloud, ensure your API Key is correctly set.
Copy code
bash
    prefect config set PREFECT_API_KEY="your-prefect-api-key"
### Use Prefect's New Flow Run Management (
flow_run
) 3. Register and Create Flow Runs: It's often more reliable to use Prefect's new flow run management utilities to configure explicit tracking.
Copy code
python
    # Register the Flow (only once needed)
    prefect deployment build my_flow.py:my_example_flow -n "my-example-flow"
    prefect deployment apply my_example_flow-deployment.yaml
4. Update FastAPI to Create Flow Runs Using Prefect's
create_flow_run
Utility
: Ensure you are creating a flow run using Prefect's infrastructure:
Copy code
python
    # main.py
    from fastapi import FastAPI
    from prefect.client import get_client
    from prefect.orion.schemas.states import StateType
    import anyio

    # Presuming Prefect flow naming conventions already follow setup guidelines
    FLOW_NAME = "My Example Flow"

    app = FastAPI()

    @app.post("/run-flow/")
    async def run_flow(name: str):
        async with get_client() as client:
            flow_run_id = await client.create_flow_run_from_deployment(
                flow_name=FLOW_NAME,
                parameters={"name": name},
            )
            flow_run = await client.read_flow_run(flow_run_id)
            while flow_run.state.type not in {StateType.COMPLETED, StateType.FAILED, StateType.CANCELLED}:
                await anyio.sleep(1)
                flow_run = await client.read_flow_run(flow_run_id)
            return {"message": f"Flow run started for {name}!", "flow_run_id": str(flow_run_id)}

    if __name__ == "__main__":
        import uvicorn
        uvicorn.run(app, host="127.0.0.1", port=8000)
### Check Flow Run Visibility - Prefect Server: Make sure the Prefect Server is running. - Flow Runs: Ensure the flow runs are listed and being updated as new runs are initiated. ### Debugging Steps 1. Check Prefect Logs: Inspect logs to see if there are any messages indicating issues with the flow run or server connection. 2. API Connectivity: Ensure that the Prefect client within FastAPI can communicate with the Prefect API. 3. Network Configuration: Verify that there are no network issues obstructing communication between your FastAPI application and the Prefect server. ### Final Checks - Make sure
prefect server start
is initialized and fully running. - Validate that the flow and associated runs appear on the Prefect UI by visiting the appropriate dashboard URL, such as
<http://127.0.0.1:4200>
. - Confirm that any API keys or secrets have been correctly configured and are accessible to both FastAPI and Prefect. Following these steps should help resolve visibility issues with Prefect flow runs not appearing in the UI when initiated from FastAPI.
l
@Marvin I want to create a deployment with start and end date as parameters. I want the start to have a default date and the end_date to have the execution date
m
To create a Prefect deployment where you can set a start date and end date as parameters with the start date having a default value and the end date being set to the execution date, you will need to configure these parameters in your flow and then create the deployment using the Prefect CLI. Here's a step-by-step guide to achieving this: ### Step 1: Define Your Flow Define a flow that accepts
start_date
and
end_date
as parameters. The
start_date
will have a default value, and the
end_date
will use the execution date.
Copy code
python
# my_flow.py

from prefect import flow, task
from datetime import datetime, timedelta

@task
def process_dates(start_date, end_date):
    print(f"Start Date: {start_date}")
    print(f"End Date: {end_date}")

@flow(name="Date Parameters Flow")
def my_date_flow(start_date: str = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d"), end_date: str = None):
    if end_date is None:
        end_date = datetime.now().strftime("%Y-%m-%d")
    
    process_dates(start_date, end_date)

if __name__ == "__main__":
    my_date_flow()
### Step 2: Define and Build Deployment Use the Prefect CLI to create and apply a deployment definition in YAML format. 1. Create the YAML Deployment Configuration:
Copy code
yaml
    # deployment.yaml

    name: date-parameters-deployment
    version: 1
    entrypoint: my_flow.py:my_date_flow
    schedule: null
    parameters:
      start_date: ((datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d"))
      end_date: null
    tags: []
    work_queue_name: default
    infra_overrides: {}
    infrastructure:
      type: process
      env: {}
    labels: {}
2. Building and Applying the Deployment: Run the following commands to build and apply the deployment from the
deployment.yaml
.
Copy code
bash
    prefect deployment build my_flow.py:my_date_flow -n "date-parameters-deployment" -o deployment.yaml
    prefect deployment apply deployment.yaml
### Step 3: Execution To trigger the flow and see it in the Prefect UI, simply start running your deployment. Using the Prefect CLI:
Copy code
bash
prefect deployment run date-parameters-deployment
Using FastAPI: If you want to trigger this from a FastAPI endpoint and ensure the parameters are appropriately set:
Copy code
python
# main.py

from fastapi import FastAPI
from prefect.client import get_client
from prefect.orion.schemas.states import StateType
import anyio

app = FastAPI()

@app.post("/run-flow/")
async def run_flow(start_date: str = None, end_date: str = None):
    async with get_client() as client:
        parameters = {
            "start_date": start_date,
            "end_date": end_date
        }
        flow_run = await client.create_flow_run_from_deployment(
            deployment_id="date-parameters-deployment-id",
            parameters=parameters
        )
        flow_run_id = flow_run.id

        # Optionally wait for run to finish
        while flow_run.state.type not in {StateType.COMPLETED, StateType.FAILED, StateType.CANCELED}:
            await anyio.sleep(1)
            flow_run = await client.read_flow_run(flow_run_id)
        
        return {"message": "Flow run started!", "flow_run_id": flow_run_id, "state": flow_run.state.type}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)
Replace
"date-parameters-deployment-id"
with the actual deployment ID from the Prefect UI. ### Summary By setting
start_date
and
end_date
as parameters in the flow definition, and configuring the deployment YAML to handle default values and overrides, you ensure that the flow can be run via the Prefect UI, CLI, or an integrated API endpoint while keeping track of such parameters. For more detailed information, you can refer to the Prefect documentation on deployment configurations and task parameters.
l
@Marvin I want to create a script that monitors the last week's flow runs and send an email with the results if any failed and the links for getting more information
m
To create a script that monitors the last week's flow runs and sends an email with the results if any failed, you can use Prefect's API along with its email integration to achieve this. Below is a step-by-step guide to accomplish this: 1. Set Up Email Notification: Configure the email credentials in Prefect. 2. Query Prefect API to Get Flow Runs: Fetch flow run data from the last week and check for any failures. 3. Send Email Notification if Any Failures: Use Prefect's email integration to send out a detailed report if there are any failures. ### Step-by-Step Guide #### Step 1: Set Up Email Credentials First, ensure you have Prefect's
prefect-email
block set up with your email credentials. You can create these credentials through Prefect's UI or use Prefect's CLI to create a block. #### Step 2: Script to Monitor Flow Runs The following script demonstrates how to query flow runs from the last week and send an email notification if any runs failed:
Copy code
python
import prefect
from prefect import flow, task, get_run_logger
from prefect.client import get_client
from prefect.email import EmailServerCredentials, email_send_message
from datetime import datetime, timedelta

@task
async def get_failed_flow_runs():
    client = get_client()
    one_week_ago = datetime.utcnow() - timedelta(days=7)
    flow_runs = await client.read_flow_runs(
        flow_run_filter={
            "start_time": {"after_": one_week_ago.isoformat()},
            "state": {"type": {"any_": ["FAILED"]}}
        }
    )
    return flow_runs

@task
def send_email_report(failed_flow_runs):
    email_server_credentials = EmailServerCredentials.load("email-credentials-block")
    if not failed_flow_runs:
        return "No failed flow runs in the last week."

    subject = "Prefect: Failed Flow Runs Report"
    msg = "The following flow runs have failed in the past week:\n\n"
    
    for flow_run in failed_flow_runs:
        flow_run_url = f"<http://127.0.0.1:4200/flow-runs/{flow_run.id}>"  # Adjust URL to your Prefect UI
        msg += f"- Flow Run ID: {flow_run.id}\n"
        msg += f"  Flow Name: {flow_run.flow_name}\n"
        msg += f"  State Message: {flow_run.state.message}\n"
        msg += f"  Details: {flow_run_url}\n\n"

    email_send_message(
        email_server_credentials=email_server_credentials,
        subject=subject,
        msg=msg,
        email_to="<mailto:your-email@example.com|your-email@example.com>"
    )

    return "Email sent."

@flow
def monitor_and_notify():
    failed_flow_runs = get_failed_flow_runs()
    email_status = send_email_report(failed_flow_runs)
    logger = get_run_logger()
    <http://logger.info|logger.info>(email_status)

if __name__ == "__main__":
    monitor_and_notify()
### Explanation 1.
get_failed_flow_runs
Task
: - Fetches flow runs from the last week that have failed using Prefect's client. 2.
send_email_report
Task
: - Loads email server credentials from a Prefect block. - Creates a message with details of the failed flow runs, including links to the UI for each flow run. - Sends an email with the constructed report. 3.
monitor_and_notify
Flow
: - Coordinates the execution of the
get_failed_flow_runs
and
send_email_report
tasks. - Logs the status of the email sending task. #### Step 3: Schedule the Flow You can schedule this flow to run periodically using Prefect's scheduling capabilities:
Copy code
bash
prefect deployment build monitor_and_notify.py:monitor_and_notify -n "Monitor and Notify"
prefect deployment apply monitor_and_notify-deployment.yaml

prefect agent start  # Start the agent to execute the scheduled flow
For more information on scheduling, see Prefect Deployment Documentation. This approach ensures that your script monitors the flow runs for the past week, identifies any failures, and sends an email with detailed information on these failures, including links to the Prefect UI for further investigation.
l
@Marvin I need to call a deployment from the api to create a flow run. Only need it to execute now no extra configuration
m
To create a flow run from a deployment using the Prefect API, you can use the
create_flow_run_from_deployment
method. This allows you to trigger a deployment programmatically and have it execute immediately. Here's the step-by-step process to achieve this using Prefect's Python client within a FastAPI endpoint: 1. Set Up Your Prefect API Client: Ensure your environment is set to communicate with the Prefect server. 2. Define the FastAPI Endpoint: Create an endpoint that calls the Prefect API to create a flow run from a deployment. ### Step-by-Step Implementation #### Step 1: Set Up Your Environment Ensure your Prefect client is correctly pointing to the Prefect API:
Copy code
bash
prefect config set PREFECT_API_URL=<http://127.0.0.1:4200/api>  # Adjust the URL as necessary.
You may also need to set your API key if using Prefect Cloud:
Copy code
bash
prefect config set PREFECT_API_KEY="your-prefect-api-key"
#### Step 2: Create FastAPI Endpoint to Trigger Deployment Here’s a FastAPI example that triggers a deployment and creates a flow run:
Copy code
python
# main.py

from fastapi import FastAPI
from prefect.client import get_client
from prefect.orion.schemas.states import StateType
import anyio

app = FastAPI()

@app.post("/run-deployment/")
async def run_deployment(deployment_id: str):
    async with get_client() as client:
        flow_run = await client.create_flow_run_from_deployment(
            deployment_id=deployment_id
        )
        flow_run_id = flow_run.id

        # Optionally wait for run to finish; this can be removed if not needed.
        while flow_run.state.type not in {StateType.COMPLETED, StateType.FAILED, StateType.CANCELED}:
            await anyio.sleep(1)
            flow_run = await client.read_flow_run(flow_run_id)
        
        return {
            "message": "Flow run started!",
            "flow_run_id": flow_run_id,
            "state": flow_run.state.type
        }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)
### Explanation 1. FastAPI Endpoint: - The
/run-deployment/
endpoint creates a flow run from the specified deployment ID. 2. Prefect Client: -
get_client()
is used to get the Prefect client. -
create_flow_run_from_deployment()
initiates the flow run based on the deployment ID. - This example optionally waits for the flow run to finish, which could be tailored based on your requirements. ### Testing the Endpoint To trigger the deployment and create a flow run, you can test with a POST request:
Copy code
bash
curl -X POST "<http://127.0.0.1:8000/run-deployment/>" -d '{"deployment_id": "deployment-id-here"}' -H "Content-Type: application/json"
Replace
"deployment-id-here"
with your actual deployment ID. ### Final Notes This approach leverages the Prefect API to programmatically manage and trigger deployments, facilitating integration with other systems and workflows. Adjust the endpoint and logic as needed to fit your specific use case and deployment architecture.
l
@Marvin prefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available. Here: @flow async def reporte_semanal(): failed_flow_runs = await get_failed_flow_runs() email_status = send_email_report(failed_flow_runs) logger = logger_global.obtener_logger_prefect() logger.info(email_status) if name == "__main__": asyncio.run(reporte_semanal())
m
The
UnfinishedRun
exception occurs because you're attempting to retrieve the result of a flow run that is still in the
PENDING
state, indicating it hasn't started executing yet. This is especially common with asynchronous flows and tasks. To address this, you should wait for the task to complete before trying to access its result. Also, make sure that everything is properly awaited, considering that your workflow involves async operations. Here's how to properly structure your async flow and ensure all tasks are awaited correctly: 1. Ensure all tasks and methods are awaited properly. 2. Avoid context reset issues by managing tasks and flow runs correctly. Here’s the revised code:
Copy code
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.client import get_client
from prefect.email import EmailServerCredentials, email_send_message
from datetime import datetime, timedelta

@task
async def get_failed_flow_runs():
    async with get_client() as client:
        one_week_ago = datetime.utcnow() - timedelta(days=7)
        flow_runs = await client.read_flow_runs(
            flow_run_filter={
                "start_time": {"after_": one_week_ago.isoformat()},
                "state": {"type": {"any_": ["FAILED"]}}
            }
        )
    return flow_runs

@task
def send_email_report(failed_flow_runs):
    email_server_credentials = EmailServerCredentials.load("email-credentials-block")
    if not failed_flow_runs:
        return "No failed flow runs in the last week."

    subject = "Prefect: Failed Flow Runs Report"
    msg = "The following flow runs have failed in the past week:\n\n"
    
    for flow_run in failed_flow_runs:
        flow_run_url = f"<http://127.0.0.1:4200/flow-runs/{flow_run['id']}>"  # Adjust URL to your Prefect UI
        msg += f"- Flow Run ID: {flow_run['id']}\n"
        msg += f"  Flow Name: {flow_run['flow_name']}\n"
        msg += f"  State Message: {flow_run['state']['message']}\n"
        msg += f"  Details: {flow_run_url}\n\n"

    email_send_message(
        email_server_credentials=email_server_credentials,
        subject=subject,
        msg=msg,
        email_to="<mailto:your-email@example.com|your-email@example.com>"
    )

    return "Email sent."

@flow
async def reporte_semanal():
    failed_flow_runs = await get_failed_flow_runs()
    email_status = await send_email_report(failed_flow_runs)
    logger = get_run_logger()
    <http://logger.info|logger.info>(email_status)

if __name__ == "__main__":
    asyncio.run(reporte_semanal())
### Adjustments 1. Async Task Execution: - Await all tasks within the flow to ensure they complete. 2. Logger Usage: - You can use Prefect's built-in logger for logging within flows and tasks. 3. Awaited Email Sending: - Ensure that the email sending task is properly awaited to handle async execution. ### Explanation - get_failed_flow_runs: Asynchronously fetches failed flow runs from the Prefect client. - send_email_report: Synchronously sends an email (note: this depends on your email library; if it's async, update accordingly). - reporte_semanal: Coordinates the tasks, ensuring that results are awaited appropriately. By ensuring that each async operation is properly awaited, you prevent the
UnfinishedRun
exception and ensure smooth execution of your monitoring script.
l
@Marvin @task def get_failed_flow_runs(): one_week_ago = datetime.now(timezone.utc) - timedelta(days=7) async def _get_flow_runs(): async with get_client() as client: try: flow_runs = await client.read_flow_runs( flow_run_filter=FlowRunFilter( state=FlowRunFilterState( type=FlowRunFilterStateType(any_=[StateType.FAILED]), ), start_time=FlowRunFilterStartTime( after_=one_week_ago ) ) ) return flow_runs except Exception as e: logger_global.obtener_logger_prefect().error(f"Error fetching flow runs: {e}") return [] return asyncio.run(_get_flow_runs()) @task def send_email_report(failed_flow_runs): if not failed_flow_runs: return "No failed flow runs in the last week." subject = "Prefect: Failed Flow Runs Report" for flow_run in failed_flow_runs: msg += f"- Flow Run ID: {flow_run.id}\n" msg += f" Flow Name: {flow_run.flow_name}\n" msg += f" State Message: {flow_run.state.message}\n" try: pass except Exception as e: logger_global.obtener_logger_prefect().error(f"Error sending email: {e}") return "Email sent." @flow def reporte_semanal(): try: failed_flow_runs = get_failed_flow_runs() email_status = send_email_report(failed_flow_runs) except Exception as e: logger_global.obtener_logger_prefect().error(f"Error in reporte_semanal: {e}") if name == "__main__": reporte_semanal()
m
Your message was too long by 327 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
import os
import asyncio from datetime import datetime, timedelta, timezone from prefect import flow, task, get_client from prefect.server.schemas.filters import ( FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType, FlowRunFilterStartTime, ) from prefect.server.schemas.states import StateType from electracommons.log_config import PrefectLogger from dev.commons.email_tools.send_email import send_email logger_global = PrefectLogger(file) MAIL_TO_REPORTE = "lucas.depetris@consulters.com.ar" @task def get_failed_flow_runs(): one_week_ago = datetime.now(timezone.utc) - timedelta(days=7) async def _get_flow_runs(): async with get_client() as client: try: flow_runs = await client.read_flow_runs( flow_run_filter=FlowRunFilter( state=FlowRunFilterState( type=FlowRunFilterStateType(any_=[StateType.FAILED]), ), start_time=FlowRunFilterStartTime( after_=one_week_ago ) ) ) return flow_runs except Exception as e: logger_global.obtener_logger_prefect().error(f"Error fetching flow runs: {e}") return [] return asyncio.run(_get_flow_runs()) @task def send_email_report(failed_flow_runs
Your message was too long by 130 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
logger_global = PrefectLogger(file)
@task def get_failed_flow_runs(): one_week_ago = datetime.now(timezone.utc) - timedelta(days=7) async def _get_flow_runs(): async with get_client() as client: try: flow_runs = await client.read_flow_runs( flow_run_filter=FlowRunFilter( state=FlowRunFilterState( type=FlowRunFilterStateType(any_=[StateType.FAILED]), ), start_time=FlowRunFilterStartTime( after_=one_week_ago ) ) ) return flow_runs except Exception as e: logger_global.obtener_logger_prefect().error(f"Error fetching flow runs: {e}") return [] return asyncio.run(_get_flow_runs()) @task def send_email_report(failed_flow_runs): if not failed_flow_runs: return "No failed flow runs in the last week." subject = "Prefect: Failed Flow Runs Report" for flow_run in failed_flow_runs: flow_run_url = f"http://127.0.0.1:4200/flow-runs/{flow_run.id}" msg += f"- Flow Run ID: {flow_run.id}\n" msg += f" Flow Name: {flow_run.flow_name}\n" msg += f" State Message: {flow_run.state.message}\n"
Your message was too long by 72 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
@task
def get_failed_flow_runs(): one_week_ago = datetime.now(timezone.utc) - timedelta(days=7) async def _get_flow_runs(): async with get_client() as client: try: flow_runs = await client.read_flow_runs( flow_run_filter=FlowRunFilter( state=FlowRunFilterState( type=FlowRunFilterStateType(any_=[StateType.FAILED]), ), start_time=FlowRunFilterStartTime( after_=one_week_ago ) ) ) return flow_runs except Exception as e: logger_global.obtener_logger_prefect().error(f"Error fetching flow runs: {e}") return [] return asyncio.run(_get_flow_runs()) @task def send_email_report(failed_flow_runs): if not failed_flow_runs: return "No failed flow runs in the last week." subject = "Prefect: Failed Flow Runs Report" for flow_run in failed_flow_runs: msg += f"- Flow Run ID: {flow_run.id}\n" msg += f" Flow Name: {flow_run.flow_name}\n" msg += f" State Message: {flow_run.state.message}\n" try: pass except Exception as e: logger_global.obtener_logger_prefect().error(f"Error sending email: {e}") return "Email sent." @flow
l
@Marvin The problem arises when @task is in the send_email_report. Without it it works correctly
m
It appears that the issue arises when using
@task
on
send_email_report
, which may involve complexities related to how Prefect handles state and execution for tasks within an async flow. To ensure smooth execution, particularly in an async flow context, we should manage tasks and their invocations carefully. Since the direct issue arises by decorating
send_email_report
with
@task
, we can simplify the process by keeping
send_email_report
as a regular function and handling async operations prudently. Here’s how you can structure it:
Copy code
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.client import get_client
from prefect.email import EmailServerCredentials, email_send_message
from datetime import datetime, timedelta

@task
async def get_failed_flow_runs():
    async with get_client() as client:
        one_week_ago = datetime.utcnow() - timedelta(days=7)
        flow_runs = await client.read_flow_runs(
            # Adapt the filtering to the proper client's options
            flow_run_filter={
                "start_time": {"after_": one_week_ago.isoformat()},
                "state": {"type": {"any_": ["FAILED"]}}
            }
        )
    return flow_runs

def send_email_report(failed_flow_runs):
    email_server_credentials = EmailServerCredentials.load("email-credentials-block")
    if not failed_flow_runs:
        return "No failed flow runs in the last week."

    subject = "Prefect: Failed Flow Runs Report"
    msg = "The following flow runs have failed in the past week:\n\n"
    
    for flow_run in failed_flow_runs:
        flow_run_url = f"<http://127.0.0.1:4200/flow-runs/{flow_run.id}>"  # Adjust URL to your Prefect UI
        msg += f"- Flow Run ID: {flow_run.id}\n"
        msg += f"  Flow Name: {flow_run.flow_name}\n"
        msg += f"  State Message: {flow_run.state.message}\n"
        msg += f"  Details: {flow_run_url}\n\n"

    email_send_message(
        email_server_credentials=email_server_credentials,
        subject=subject,
        msg=msg,
        email_to="<mailto:your-email@example.com|your-email@example.com>"
    )

    return "Email sent."

@flow
async def reporte_semanal():
    failed_flow_runs = await get_failed_flow_runs()
    failed_flow_runs = failed_flow_runs.result()  # Explicitly resolving task result
    email_status = send_email_report(failed_flow_runs)
    logger = get_run_logger()
    <http://logger.info|logger.info>(email_status)

if __name__ == "__main__":
    asyncio.run(reporte_semanal())
### Breakdown 1. Async Task Execution: -
get_failed_flow_runs
asynchronously fetches failed flow runs. 2. Regular Function for Sending Emails: -
send_email_report
is a normal function, not a task, ensuring simpler execution since it doesn't require async management. 3. Awaiting Task Results: - Explicitly resolve the result of
get_failed_flow_runs
with
.result()
to pass it to
send_email_report
. ### Explanation - Prefect Client:
get_client()
is used to access flow run data. - Email Notification: The
send_email_report
function constructs an email and sends it; this function does not need to be an async task. - Logging: Using Prefect's
get_run_logger()
to log the status of the email sending operation. By handling
send_email_report
as a regular function and ensuring proper state resolution for async tasks, this structure avoids issues seen when decorating
send_email_report
with
@task
. This approach should ensure the flow runs smoothly and provides the intended email notification functionality.
l
@Marvin could you create an english minimal example of the error?
m
Sure, let's create a minimal example to illustrate the error. This example will include a simple flow that fetches failed flow runs and sends an email report. We will highlight the issue when the
send_email_report
function is decorated with
@task
. ### Error Example
Copy code
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.client import get_client
from prefect.orion.schemas.filters import FlowRunFilter
from prefect.orion.schemas.states import StateType
from prefect_email import EmailServerCredentials, email_send_message
from datetime import datetime, timedelta

@task
async def get_failed_flow_runs():
    async with get_client() as client:
        one_week_ago = datetime.utcnow() - timedelta(days=7)
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                start_time_after=one_week_ago.isoformat(),
                state={"type": {"any_": ["FAILED"]}}
            )
        )
    return flow_runs.items()

@task
def send_email_report(failed_flow_runs):
    email_server_credentials = EmailServerCredentials.load("email-credentials-block")
    if not failed_flow_runs:
        return "No failed flow runs in the last week."

    subject = "Prefect: Failed Flow Runs Report"
    msg = "The following flow runs have failed in the past week:\n\n"
    
    for flow_run in failed_flow_runs:
        flow_run_url = f"<http://127.0.0.1:4200/flow-runs/{flow_run.id}>"  # Adjust URL to your Prefect UI
        msg += f"- Flow Run ID: {flow_run.id}\n"
        msg += f"  Flow Name: {flow_run.name}\n"
        msg += f"  State Message: {flow_run.state.message}\n"
        msg += f"  Details: {flow_run_url}\n\n"

    email_send_message(
        email_server_credentials=email_server_credentials,
        subject=subject,
        msg=msg,
        email_to="<mailto:your-email@example.com|your-email@example.com>"
    )

    return "Email sent."

@flow
async def weekly_report():
    failed_flow_runs = await get_failed_flow_runs()
    email_status = await send_email_report(failed_flow_runs)
    logger = get_run_logger()
    <http://logger.info|logger.info>(email_status)

if __name__ == "__main__":
    asyncio.run(weekly_report())
### Explanation 1. get_failed_flow_runs Task: An async task that fetches failed flow runs from the last week. 2. send_email_report Task: A task for sending email reports (causes the issue). 3. weekly_report Flow: Coordinates the execution of tasks and attempts to log the email status. ### Problem Running this script will likely produce an
UnfinishedRun
exception because
send_email_report
is incorrectly awaited as a Prefect task when it shouldn't be. ### Fix Example Here's the minimal fixed version without the error: ```python import asyncio from prefect import flow, task, get_run_logger from prefect.client import get_client from prefect.orion.schemas.filters import FlowRunFilter from prefect.orion.schemas.states import StateType from prefect_email import EmailServerCredentials, email_send_message from datetime import datetime, timedelta @task async def get_failed_flow_runs(): async with get_client() as client: one_week_ago = datetime.utcnow() - timedelta(days=7) flow_runs = await client.read_flow_runs( flow_run_filter=FlowRunFilter( start_time_after=one_week_ago.isoformat(), state={"type": {"any_": ["FAILED"]}} ) ) return flow_runs.items() def send_email_report(failed_flow_runs): email_server_credentials = EmailServerCredentials.load("email-credentials-block") if not failed_flow_runs: return "No failed flow runs in the last week." subject = "Prefect: Failed Flow Runs Report" msg = "The following flow runs have failed in the past week:\n\n" for flow_run in failed_flow_runs: flow_run_url = f"http://127.0.0.1:4200/flow-runs/{flow_run.id}" # Adjust URL to your Prefect UI msg += f"- Flow Run ID: {flow_run.id}\n"
msg += f" Flow Name: {flow_run.name}\n" msg += f" State Message: {flow_run.state.message}\n" msg += f" Details: {flow_run_url}\n\n" email_send_message( email_server_credentials=email_server_credentials, subject=subject, msg=msg, email_to="your-email@example.com" ) return "Email sent." @flow async def weekly_report(): failed_flow_runs = await get_failed_flow_runs() failed_flow_runs = failed_flow_runs.result() # Get result explicitly email_status = send_email_report(failed_flow_runs) logger = get_run_logger() logger.info(email_status) if name == "__main__": asyncio.run(weekly_report()) ``` ### Explanation of the Fix - send_email_report is kept as a regular function instead of a task. - The
result()
method is used to explicitly resolve the
get_failed_flow_runs
task before passing its result to
send_email_report
. This structure ensures smooth execution and avoids the
UnfinishedRun
exception by managing asynchronous tasks and their results more effectively.
l
@Marvin Notice any problem? @task async def get_failed_flow_runs(last_n_days: int) -> list[FlowRun]: one_week_ago = datetime.now(timezone.utc) - timedelta(days=last_n_days) async with get_client() as client: flow_runs = await client.read_flow_runs( flow_run_filter=FlowRunFilter( state=FlowRunFilterState( type=FlowRunFilterStateType(any_=[StateType.FAILED]), ), start_time=FlowRunFilterStartTime( after_=one_week_ago ) ) ) return flow_runs @task def print_flows_info(flow_runs_list): for flow_run in flow_runs_list: print(f"Flow Run ID: {flow_run.id}") print(f"Flow Name: {flow_run.name}") print(f"State: {flow_run.state.name}") print(f"Start Time: {flow_run.start_time}") print(f"End Time: {flow_run.end_time}") @flow def weekly_report(): failed_flow_runs = asyncio.run(get_failed_flow_runs(last_n_days=7)) print_flows_info(failed_flow_runs) if name == "__main__": weekly_report()
m
Yes, there are a couple of issues in the code you provided: 1.
asyncio.run
in a
@flow
Decorated Function
: You cannot use
asyncio.run
inside an already asynchronous context like a Prefect flow. Prefect automatically handles the concurrency and event loop management. 2. Synchronous and Asynchronous Context Mixing:
asyncio.run()
should not be used in a flow decorated with
@flow
because it can result in errors related to event loop management. 3. Custom Types for Type Hints: Custom type hints like
list[FlowRun]
should use
typing.List
instead of
list
for compatibility with older Python versions. ### Fixed Example Here’s the corrected version of your flow and tasks:
Copy code
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.client import get_client
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType, FlowRunFilterStartTime
from prefect.orion.schemas.states import StateType
from datetime import datetime, timezone, timedelta
from typing import List

@task
async def get_failed_flow_runs(last_n_days: int) -> List[dict]:
    one_week_ago = datetime.now(timezone.utc) - timedelta(days=last_n_days)
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=[StateType.FAILED]),
                ),
                start_time=FlowRunFilterStartTime(
                    after_=one_week_ago
                )
            )
        )
    return flow_runs.items  # assuming 'items' returns a list

@task
def print_flows_info(flow_runs_list: List[dict]):
    for flow_run in flow_runs_list:
        print(f"Flow Run ID: {flow_run['id']}")
        print(f"Flow Name: {flow_run['name']}")
        print(f"State: {flow_run['state']['name']}")
        print(f"Start Time: {flow_run['start_time']}")
        print(f"End Time: {flow_run['end_time']}")

@flow
async def weekly_report():
    failed_flow_runs = await get_failed_flow_runs(last_n_days=7)
    print_flows_info(failed_flow_runs)

if __name__ == "__main__":
    asyncio.run(weekly_report())
### Explanation 1.
get_failed_flow_runs
: This task uses Prefect's
get_client
to fetch flow runs from the last week. Note that it returns
flow_runs.items
assuming
items
method returns the flow runs as a list (adjust as needed based on actual return type). 2.
print_flows_info
: This task prints the details of each flow run from the list of flow runs. 3.
weekly_report
: This flow coordinates the tasks. Note that it is decorated with
@flow
and is asynchronous (
async def
). It uses
await
to call
get_failed_flow_runs
and retrieve its result. 4. Main Execution:
asyncio.run(weekly_report())
runs the flow in an asynchronous context. ### Important Points - Asynchronous Flow: If the flow method is asynchronous, you should
await
asynchronous tasks within it. - Asynchronous Event Loop: Use
async def
for the flow function and
await
for async tasks within it to handle concurrency correctly. - Compatibility and Type Hints: Use compatible type hints such as
List
from
typing
module. This should ensure that your monitoring script correctly fetches failed flow runs and processes them within an asynchronous context, avoiding the common pitfalls of mixing synchronous and asynchronous code. Yes, there is an issue with the way you've structured the tasks and flows. Specifically, you should not use
asyncio.run
inside a flow or task. Prefect manages the event loop and you should rely on Prefect to manage task execution. Here's a fixed example that avoids the use of
asyncio.run
and correctly structures the asynchronous and synchronous contexts: ```python from prefect import flow, task, get_run_logger from prefect.client import get_client
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType, FlowRunFilterStartTime from prefect.orion.schemas.states import StateType from datetime import datetime, timezone, timedelta from typing import List @task async def get_failed_flow_runs(last_n_days: int) -> List[dict]: one_week_ago = datetime.now(timezone.utc) - timedelta(days=last_n_days) async with get_client() as client: flow_runs = await client.read_flow_runs( filter=FlowRunFilter( state=FlowRunFilterState( type=FlowRunFilterStateType(any_=[StateType.FAILED]), ), start_time=FlowRunFilterStartTime( after_=one_week_ago ) ) ) return flow_runs @task def print_flows_info(flow_runs_list): for flow_run in flow_runs_list: print(f"Flow Run ID: {flow_run.id}") print(f"Flow Name: {flow_run.name}") print(f"State: {flow_run.state.type}") print(f"Start Time: {flow_run.start_time}") print(f"End Time: {flow_run.end_time}") @flow def weekly_report(): failed_flow_runs = get_failed_flow_runs(last_n_days=7) # No need to use asyncio.run here print_flows_info(failed_flow_runs) if name == "__main__": weekly_report() # Can be directly called without asyncio.run ``` ### Changes Made: 1. Removed
asyncio.run
: The
asyncio.run
call inside the flow
weekly_report
is removed. Prefect handles the event loop automatically. 2. Correct Flow and Task Definitions: The flow and tasks are correctly defined and used. ### Explanation -
get_failed_flow_runs
Task
: This asynchronous task fetches failed flow runs from the last week. -
print_flows_info
Task
: This synchronous task prints information about each flow run. -
weekly_report
Flow
: This flow calls the
get_failed_flow_runs
task and the
print_flows_info
task. By structuring your code this way, you avoid issues with mixing synchronous and asynchronous contexts and let Prefect manage the execution of tasks properly.
l
@Marvin I need to do this using .map for get_flow_info and get_deployment_info for flow_run in failed_flow_runs: flow_info = get_flow_info(flow_run['flow_id']) flow_run['flow_name'] = flow_info['flow_name'] if flow_run['deployment_id']: deployment_info = get_deployment_info(flow_run['deployment_id']) flow_run['deployment_name'] = deployment_info['deployment_name'] flow_run['deployment_entrypoint'] = deployment_info['deployment_entrypoint']
@Marvin I need to do this using .map for get_flow_info and get_deployment_info. Could you write that code? for flow_run in failed_flow_runs: flow_info = get_flow_info(flow_run['flow_id']) flow_run['flow_name'] = flow_info['flow_name'] if flow_run['deployment_id']: deployment_info = get_deployment_info(flow_run['deployment_id']) flow_run['deployment_name'] = deployment_info['deployment_name'] flow_run['deployment_entrypoint'] = deployment_info['deployment_entrypoint']
m
Understood. You can use
.map
to fetch the flow and deployment information in parallel, and then enrich the flow run information. Here is the complete code:
Copy code
python
from prefect import flow, task, get_run_logger
from prefect.client import get_client
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType, FlowRunFilterStartTime
from prefect.orion.schemas.states import StateType
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Optional

@task
async def get_failed_flow_runs(last_n_days: int) -> List[dict]:
    one_week_ago = datetime.now(timezone.utc) - timedelta(days=last_n_days)
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=[StateType.FAILED]),
                ),
                start_time=FlowRunFilterStartTime(
                    after_=one_week_ago
                )
            )
        )
    return flow_runs.items  # assuming 'items' returns a list

@task
async def get_flow_info(flow_id: str) -> dict:
    async with get_client() as client:
        flow_info = await client.read_flow(flow_id)
    return flow_info

@task
async def get_deployment_info(deployment_id: str) -> Optional[dict]:
    if not deployment_id:
        return None
    async with get_client() as client:
        deployment_info = await client.read_deployment(deployment_id)
    return deployment_info

@task
def enrich_flow_run_with_info(flow_runs_list: List[dict], flow_infos: List[dict], deployment_infos: List[Optional[dict]]) -> List[dict]:
    for flow_run, flow_info, deployment_info in zip(flow_runs_list, flow_infos, deployment_infos):
        flow_run['flow_name'] = flow_info['name']
        
        if deployment_info:
            flow_run['deployment_name'] = deployment_info['name']
            flow_run['deployment_entrypoint'] = deployment_info['entrypoint']

    return flow_runs_list

@task
def print_flows_info(flow_runs_list: List[Dict]):
    for flow_run in flow_runs_list:
        print(f"Flow Run ID: {flow_run['id']}")
        print(f"Flow Name: {flow_run['flow_name']}")
        print(f"State: {flow_run['state']['type']}")
        print(f"Start Time: {flow_run['start_time']}")
        print(f"End Time: {flow_run['end_time']}")
        if 'deployment_name' in flow_run:
            print(f"Deployment Name: {flow_run['deployment_name']}")
        if 'deployment_entrypoint' in flow_run:
            print(f"Deployment Entrypoint: {flow_run['deployment_entrypoint']}")

@flow
async def weekly_report():
    failed_flow_runs = await get_failed_flow_runs(last_n_days=7)
    
    # Get flow info and deployment info in parallel
    flow_ids = [flow_run['flow_id'] for flow_run in failed_flow_runs]
    deployment_ids = [flow_run.get('deployment_id') for flow_run in failed_flow_runs]

    flow_infos = await get_flow_info.map(flow_ids)
    deployment_infos = await get_deployment_info.map(deployment_ids)
    
    enriched_flow_runs = enrich_flow_run_with_info(failed_flow_runs, flow_infos, deployment_infos)
    
    # Wait for enrichment to complete
    enriched_flow_runs_result = await enriched_flow_runs

    # Print the enriched flow runs
    print_flows_info(enriched_flow_runs_result)

if __name__ == "__main__":
    import asyncio
    asyncio.run(weekly_report())
### Explanation 1. Tasks: -
get_failed_flow_runs
: Fetches failed flow runs from the last week. -
get_flow_info
: Retrieves information about a flow using the flow ID. -
get_deployment_info
: Retrieves information about a deployment using the deployment ID. -
enrich_flow_run_with_info
: Adds flow and deployment details to the flow run dictionary. -
print_flows_info
: Prints details of each flow run. 2. Flow: -
weekly_report
:
- Fetches the failed flow runs using
get_failed_flow_runs
. - Uses
.map
to fetch flow information and deployment information in parallel. - Enriches each flow run with flow and deployment information using
enrich_flow_run_with_info
. - Waits for the enrichment to complete. - Prints the enriched flow runs using
print_flows_info
. ### Notes -
Optional
Type for Deployment Info
: The
get_deployment_info
task returns
None
if there is no deployment ID, making the handling of the task safer. - .map for Parallel Execution: Using
.map
for
get_flow_info
and
get_deployment_info
allows for parallel execution. - Enrichment Logic: The enrichment logic is handled within a task to ensure the flow of information is managed correctly within Prefect's execution model. This code structure ensures that information is fetched and processed efficiently and that the tasks are properly managed by Prefect.
l
@Marvin i want to obtain a flow and its sublflows using the client
m
To retrieve a flow and its subflows using the Prefect client involves querying the Prefect API for flow runs and then fetching details recursively if applicable. Here’s a step-by-step guide to achieve this with Prefect’s Python client: 1. Retrieve the Main Flow Run: Fetch the specific flow run you are interested in. 2. Identify Subflows: Use the API to identify and retrieve subflow runs associated with the main flow run. 3. Fetch Details for Subflows: Repeat the process to fetch details for any subflows, if necessary. ### Example Code Below is an example script that demonstrates how to accomplish this:
Copy code
python
import asyncio
from prefect import get_client

async def fetch_subflows(flow_run_id):
    """
    Fetch subflow runs for a given flow run ID.
    """
    async with get_client() as client:
        # Fetch related flow runs (subflows)
        flow_runs = await client.read_flow_runs(
            flow_run_filter={
                "tags": {"all_": [f"parent:{flow_run_id}"]}
            }
        )
        return flow_runs.items

async def fetch_flow_and_subflows(flow_run_id):
    """
    Fetch a flow run and its subflows, including nested subflows.
    """
    async with get_client() as client:
        # Fetch the main flow run
        main_flow_run = await client.read_flow_run(flow_run_id)
        
        # Fetch subflows
        subflow_runs = await fetch_subflows(flow_run_id)
        
        # Fetch nested subflows recursively
        for subflow_run in subflow_runs:
            subflow_run['subflows'] = await fetch_subflows(subflow_run['id'])
        
        # Combine main flow run with its direct subflows
        main_flow_run_details = {
            "flow_run": main_flow_run,
            "subflows": subflow_runs,
        }
        
        return main_flow_run_details


async def main():
    # Replace with your actual flow run ID
    flow_run_id = "your_flow_run_id_here"
    flow_details = await fetch_flow_and_subflows(flow_run_id)
    # Print the fetched flow details including subflows
    print(flow_details)

if __name__ == "__main__":
    asyncio.run(main())
### Explanation 1. fetch_subflows Function: - This function retrieves subflow runs for a given flow run ID using the
tags
with a specific format to identify subflows. 2. fetch_flow_and_subflows Function: - This function first fetches the main flow run using its ID. - It then calls
fetch_subflows
to fetch subflow runs associated with the main flow run. - For each subflow retrieved, it recursively calls
fetch_subflows
to fetch any nested subflows. 3. Main Function: - Calls the
fetch_flow_and_subflows
function with a specific flow run ID and prints the details fetched, including subflows. ### Assumptions - Flow Run IDs: The example assumes you know the main
flow_run_id
for your primary flow. - Subflow Identification: The
tags
mechanism (
parent:{flow_run_id}
) is based on typical tagging practices to identify parent-child relationships between flows. Adjust the specifics depending on your tagging and flow structure. ### Important Note Ensure that your Prefect server is configured properly to handle these API requests and that you have the necessary permissions and settings to fetch flow run details programmatically. This approach allows you to recursively fetch and manage details of flows and their subflows, facilitating better tracking and monitoring.
l
@Marvin i'm not taggin in that way. I want to do it without tags
m
If you are not using tags to identify subflows, you can identify subflows based on their parent flow run ID. This ID is typically available in the flow run metadata. ### Step-by-Step Guide 1. Retrieve the Main Flow Run: Fetch the main flow run you are interested in. 2. Identify Subflows: Use the Prefect client to identify subflow runs associated with the main flow run by checking for the
parent_flow_run_id
. 3. Fetch Details for Subflows: Fetch details for any subflows, potentially recursively. ### Example Code Below is an example script that demonstrates how to accomplish this without relying on tags:
Copy code
python
import asyncio
from prefect.client import get_client

async def fetch_subflows(flow_run_id):
    """
    Fetch subflow runs for a given flow run ID.
    """
    async with get_client() as client:
        # Fetch subflow runs where the parent_flow_run_id matches the given flow_run_id
        flow_runs = await client.read_flow_runs(
            flow_run_filter={
                "parent_flow_run_id": {"equals": flow_run_id}
            }
        )
        return flow_runs.items

async def fetch_flow_and_subflows(flow_run_id):
    """
    Fetch a flow run and its subflows, including nested subflows.
    """
    async with get_client() as client:
        # Fetch the main flow run
        main_flow_run = await client.read_flow_run(flow_run_id)
        
        # Fetch subflows
        subflow_runs = await fetch_subflows(flow_run_id)
        
        # Fetch nested subflows recursively
        for subflow_run in subflow_runs:
            subflow_run['subflows'] = await fetch_subflows(subflow_run['id'])
        
        # Combine main flow run with its direct subflows
        main_flow_run_details = {
            "flow_run": main_flow_run,
            "subflows": subflow_runs,
        }
        
        return main_flow_run_details


async def main():
    # Replace with your actual flow run ID
    flow_run_id = "your_flow_run_id_here"
    flow_details = await fetch_flow_and_subflows(flow_run_id)
    # Print the fetched flow details including subflows
    print(flow_details)

if __name__ == "__main__":
    asyncio.run(main())
### Explanation 1. fetch_subflows Function: - Queries flow runs where the
parent_flow_run_id
matches the given
flow_run_id
, thus identifying subflows directly related to the parent flow. 2. fetch_flow_and_subflows Function: - Fetches the main flow run using its ID. - Calls
fetch_subflows
to retrieve any subflow runs associated with the main flow run. - Iterates over the subflows to fetch any nested subflows recursively. 3. Main Function: - Calls
fetch_flow_and_subflows
with the main
flow_run_id
and prints the details, including subflows. ### Adjustments Ensure that: - You replace
"your_flow_run_id_here"
with the actual flow run ID of the main flow you want to investigate. - Your Prefect server is configured and accessible. - You have necessary permissions to access the run details. This approach leverages Prefect's API to directly identify and fetch subflow information based on the
parent_flow_run_id
metadata property without relying on tags, thereby simplifying the process and making it more robust in different tagging configurations.
l
@Marvin I'd like to add some kind of metadata to my deployment or flow run that can be retrieved from another flow programatically using the client. How to do that? I have at the start of my flow's script this code DEVELOPER_INFO = {"developer": {"id": "MD", "name": "Matt Daens", "email": "matt.daens@adress.com"}}
m
To add custom metadata to your Prefect deployment or flow run, you can leverage Prefect's built-in capabilities for setting parameters and using runtime variables. Here's how you can achieve this: ### Setting Metadata at Deployment When creating a deployment, you can include custom metadata in the deployment YAML file. This metadata will then be available to the flow runs associated with that deployment. Here’s an example of how you can define a deployment with custom metadata:
Copy code
yaml
# deployment.yaml

name: example-deployment
version: 1
description: |
  This is an example deployment with custom metadata.
tags:
  - example
  - metadata
parameters:
  start_date: "2023-01-01"
  end_date: "2023-12-31"
  developer_info:
    id: "MD"
    name: "Matt Daens"
    email: "<mailto:matt.daens@adress.com|matt.daens@adress.com>"
schedule: null
entrypoint: my_flow.py:my_flow_function
work_queue_name: default
### Accessing Metadata in Your Flow You can access this metadata within your flow using Prefect's runtime variables. Here’s an example:
Copy code
python
from prefect import flow, task
from prefect.runtime import flow_run

@task
def print_metadata():
    # Access flow run parameters
    developer_info = flow_run.parameters.get("developer_info")
    if developer_info:
        print(f"Developer ID: {developer_info['id']}")
        print(f"Developer Name: {developer_info['name']}")
        print(f"Developer Email: {developer_info['email']}")

@flow
def my_flow(start_date: str, end_date: str, developer_info: dict):
    print_metadata()

if __name__ == "__main__":
    my_flow()
### Running the Deployment Create and apply the deployment from the YAML file:
Copy code
bash
prefect deployment build my_flow.py:my_flow -n example-deployment -o example-deployment.yaml
prefect deployment apply example-deployment.yaml
### Notes 1. Adding Metadata to Flow Runs: You can set custom parameters and metadata in the deployment file which will get inherited by flow runs. 2. Accessing Metadata: Use
prefect.runtime.flow_run
to access parameters and other runtime information within your flow. ### Using Additional Metadata If you want to add metadata directly within the script, you can use Prefect’s context variables. Here’s an example approach for directly passing and using metadata:
Copy code
python
from prefect import flow, task, get_run_logger

DEVELOPER_INFO = {"developer": {"id": "MD", "name": "Matt Daens", "email": "<mailto:matt.daens@adress.com|matt.daens@adress.com>"}}

@task
def print_metadata(metadata):
    logger = get_run_logger()
    developer_info = metadata.get("developer")
    if developer_info:
        <http://logger.info|logger.info>(f"Developer ID: {developer_info['id']}")
        <http://logger.info|logger.info>(f"Developer Name: {developer_info['name']}")
        <http://logger.info|logger.info>(f"Developer Email: {developer_info['email']}")

@flow(name="My Example Flow")
def my_flow(start_date: str, end_date: str):
    print_metadata(DEVELOPER_INFO)

if __name__ == "__main__":
    my_flow(start_date="2023-01-01", end_date="2023-12-31")
### Explanation 1. Defining Parameters: You can pass parameters like
start_date
,
end_date
, and
developer_info
through the deployment YAML file. These parameters are then accessible within your flow. 2. Printing Metadata: The
print_metadata
task fetches and prints the provided metadata. This approach ensures that your metadata is attached to the deployment and can be accessed and utilized within the flow runs, adding custom information relevant to your deployments and workflow executions.
l
@Marvin File "C:\Reportes_Power_BI\Python\dev\commons\email_tools\send_email.py", line 69, in send_email mail_password = secret_block_alertas.get() AttributeError: 'coroutine' object has no attribute 'get'
m
The error message you are seeing indicates that you are trying to call
.get()
on a coroutine object, which means that you likely need to
await
the coroutine to get the result. ### Steps to Fix the Error 1. Ensure Asynchronous Operations are Awaited: If
secret_block_alertas
is a Prefect block that involves IO operations, you need to await the asynchronous methods. ### Example Fix Let's modify your code to properly await the coroutine. #### Example with Asynchronous Fetching
Copy code
python
from prefect.blocks.system import Secret
from prefect import flow, task, get_run_logger
import asyncio

DEVELOPER_INFO = {"developer": {"id": "MD", "name": "Matt Daens", "email": "<mailto:matt.daens@adress.com|matt.daens@adress.com>"}}

@task
async def fetch_secret():
    # Assuming 'secret_block_alertas' is a secret block based on context of usage
    secret_block_alertas = await Secret.load("your-secret-block-name")
    mail_password = await secret_block_alertas.get()
    return mail_password

@task
def send_email(mail_password):
    # Placeholder for sending email using the mail password
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Sending email using password: {mail_password}")
    # Actual email logic goes here

@flow
async def my_flow():
    mail_password = await fetch_secret()
    send_email(mail_password)

if __name__ == "__main__":
    # Use asyncio.run to execute the flow
    asyncio.run(my_flow())
### Explanation - Async Task
fetch_secret
: This task fetches the secret by awaiting the appropriate methods. - Flow
my_flow
: The flow awaits the
fetch_secret
task and then uses the fetched mail password to send an email. - Await Coroutines: Ensures that all asynchronous operations are properly awaited to fetch results. ### Common Pattern for Using Prefect Blocks and Async Operations Whenever you are dealing with Prefect blocks or any asynchronous operations, always make sure to
await
the async methods, and use
async
functions where necessary. This change should resolve the
AttributeError
by ensuring that all asynchronous methods are awaited properly.