Lucas
12/26/2023, 7:34 PMMarvin
12/26/2023, 7:35 PMprefect config set PREFECT_API_URL="<http://127.0.0.1:4200/api>"
to set the API URL to point to a Prefect server instance.
- Note that if you are running Prefect inside a container, such as a Docker container, you must set the API server address as described above.
- You can save the API server address in a Prefect profile so that whenever that profile is active, the API endpoint will be set to that address.
2. Running Prefect on system startup:
- You can set up a Prefect worker as a systemd service on Linux systems. This allows the worker to start automatically when the system boots.
- You would need to create a service file for Prefect and then enable it using systemctl enable prefect-worker
to make sure it runs on boot.
- To start the service, use systemctl start prefect-worker
.
For a detailed guide on how to set up a Prefect 2 worker as a systemd service on Linux and to understand the necessary configurations, check out this discussion on Prefect's Discourse.
Keep in mind that the guide and steps mentioned above are platform-specific (Linux systems), and if you're using a different operating system, the steps to configure Prefect to start on system boot might be different. For comprehensive documentation and platform-specific instructions, always refer to the official Prefect documentation.Lucas
12/26/2023, 7:36 PMMarvin
12/26/2023, 7:41 PM--help
option to get the correct syntax for starting the agent and deploying flows.
For comprehensive instructions on using Prefect and deploying flows on a Windows system, always refer to the official Prefect documentation. If necessary documentation is not available, you may find more help on forums like Stack Overflow or Prefect community channels.Lucas
01/12/2024, 12:58 AMMarvin
01/12/2024, 12:59 AMprefect deploy
from the root of the directory containing your flow code.
- Create a new deployment, selecting your flow code entry point and naming your deployment.
- Select a process work pool.
Upon completion of these steps, you should be shown the path that your flow code will be fetched from when it runs.
For more details and other storage options such as Git-based storage (e.g., GitHub, GitLab, Bitbucket), you can refer to Prefect's documentation and run $ prefect init
for more options for managing your flow's code: Storage Guide in Prefect Docs.
To track such information explicitly or programmatically after deploying in Prefect Cloud, additional steps might be necessary as this workflow metadata is managed through the Prefect Cloud's user interface or through API interactions. If you require deeper assistance with Prefect Cloud, further documentation is available here: Prefect Cloud Quickstart.
Let me know if you need further information or have specific storage settings you're using.Lucas
01/12/2024, 1:59 AMMarvin
01/12/2024, 2:00 AMpython
import os
# Inside your flow or task
script_path = os.path.abspath(__file__)
print(f"The flow is running from: {script_path}")
This __file__
attribute contains the path to the script where it is written, and os.path.abspath
will resolve it to an absolute path. Please note that this will print the path where the flow script is currently located, which works as long as the location doesn't change after deployment. If it does or if you need to retrieve the path differently based on your deployment model (e.g., Docker, Kubernetes, etc.), you might have to implement custom logic to determine the flow's path at runtime.
Also, keep in mind when using Docker or similar environments, the printed path will be the path inside the container, not on your local machine.
Remember that in a cloud or server-based execution environment, this will give you the execution path on the server or execution environment, not the path where the flow was originally defined or deployed from.
When the flow runs, logs like "Downloading flow code from storage at '/my_path'" indicate where the flow code is stored or being executed from as part of the run logs. This can be found in the Prefect UI under the logs for a particular flow run.
Let me know if you need assistance with specific deployment models or further explanation on how to integrate this into your flow.Lucas
01/17/2024, 4:51 AMMarvin
01/17/2024, 4:52 AMprefect.yml
configuration file. However, typically, this would involve a few manual or automated steps.
Typically, the process would be to:
1. Manually review the changes made through the Prefect UI.
2. Update the prefect.yml
file to reflect those changes.
If specific commands or a tool is available in Prefect for this synchronization, it wasn't found in the documentation provided. Therefore, it may require manual intervention to ensure that the prefect.yml
file corresponds with the configuration made through the Prefect UI.
Here's a general approach you could take:
- Review the deployment configuration changes made in the Prefect UI, which could include updates to infrastructure settings, environment variables, concurrency limits, and so on.
- Open your prefect.yml
file in a text editor.
- Manually update the prefect.yml
file to mirror the changes made in the UI.
- Save the prefect.yml
file.
If Prefect 2.x offers an automated way to perform this synchronization or if the UI provides a way to export or view changes in a yaml or JSON format that could be copied into the prefect.yml
file, those details would be found within Prefect's more detailed documentation or in-product help.
It may be prudent to reach out to Prefect's support for further assistance or explore more detailed documentation on their website for specific features related to prefect.yml
and Prefect UI synchronization. If the documentation does exist, searching through the official Prefect documentation with relevant keywords could provide the necessary steps for this process.
Please let me know if you would like further investigation into this or help with another question.Lucas
01/22/2024, 4:11 PMMarvin
01/22/2024, 4:12 PMLucas
01/22/2024, 4:14 PMprefect deploy
.
Prefect buscará automaticamente, en los subdirectorios disponibles, scripts que posean la etiqueta @flow
y que tengan un __main__
configurado. Por ejemplo:
python
@flow
def basic_flow(message="Hola mundo!"):
print_message(message)
if __name__ == "__main__":
basic_flow()
Y prefect nos mostrará:
shell
> prefect deploy
? Select a flow to deploy [Use arrows to move; enter to select; n to select none]
┏━━━┳━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ ┃ Flow Name ┃ Location ┃
┡━━━╇━━━━━━━━━━━━╇━━━━━━━━━━┩
│ > │ basic_flow │ basic.py │
└───┴────────────┴──────────┘
Enter a flow entrypoint manually
Bastará con seleccionar el script o en caso que no se muestre para seleccionar ingresarlo manualmente:
shell
? Flow entrypoint (expected format path/to/file.py:function_name): ./project/basic.py:basic_flow
Luego se nos solicitaran diferentes configuraciones para el deploy:
- ***Nombre para el deploy:*** Debe ser un nombre descriptivo. Es el equivalente al nombre de las tareas.
- ***Ejecución programada:*** se puede hacer por intervalos (cada cierto tiempo) o utilizando cron ([leer sintáxis de cron](https://marquesfernandes.com/es/tecnologia-es/crontab-what-and-and-how-to-use-no-ubuntu-debian/) y [generador de cron](https://crontab.guru/#30_1,13,17_*_*_*)).
- Esto puede ser configurado luego y de manera mucho más sencilla desde la UI.
> [!TIP]
> La sintáxis en cron para ejecutar en los horarios usuales (1:30, 13:30 y 17:30) es (30 1,13,17 * * *)
> [!CAUTION]
> Tener en cuenta el huso horario ***NO UTILIZAR "UTC".* Se debe setear en "America / Buenos Aires"
- Luego podras elegir una Work pool para deployar el flujo. Aquí apareceran las pools disponibles para el servidor actual. Para más información sobre Work Pools visita la sección [Sobre Work Pools](#sobre-work-pools).
> [!NOTE]
> Para cada deploy utilizar una pool coherente. Por ejemplo para un deploy de tipo ***productivo*** para el área ***Compras*** utiliza la pool compras-prod
.
<!-- shell
? Deployment name (default): printear-mensaje # Ingreso un nombre para el deploy.
? Would you like to configure a schedule for this deployment? [y/n] (y): n # No configuro la ejecución automática
-->
## Work Pools
Las Work Pools o Grupos de Trabajos
## Logeo
Archivo logging.ymlMarvin
01/22/2024, 4:14 PMHelp me write a simple prefect documentation in markdown in spanish. So far I've written this:# Notas de Prefect Para el repositorio o copiar a un documento compartido ## Perfiles ## Deploys Los deploys son conexiones del servidor de prefect con nuestro código. Los deploys nos permiten establecer la ubicación del script (ya sea local o en git) y configurar como se ejecutará (de manera manual, programada, por intervalos, etc). Son paralelos a las tareas que utilizabamos en el Programador de Tareas de Windows. Crear un deploy es sencillo y se puede hacer con el comando:
prefect deploy
.
Prefect buscará automaticamente, en los subdirectorios disponibles, scripts que posean la etiqueta @flow
y que tengan un __main__
configurado. Por ejemplo:
python
@flow
def basic_flow(message="Hola mundo!"):
print_message(message)
if __name__ == "__main__":
basic_flow()
Y prefect nos mostrará:
```shell
prefect deploy? Select a flow to deploy [Use arrows to move; enter to select; n to select none] ┏━━━┳━━━━━━━━━━━━┳━━━━━━━━━━┓ ┃ ┃ Flow Name ┃ Location ┃ ┡━━
Lucas
01/22/2024, 4:16 PMMarvin
01/22/2024, 4:17 PMLucas
01/26/2024, 12:43 AMMarvin
01/26/2024, 12:43 AMlogging
). Los usuarios pueden configurar el nivel de logeo y el formato de los mensajes de log según sus necesidades. Por ejemplo, se pueden establecer niveles de logeo como DEBUG
, INFO
, WARNING
, ERROR
, y CRITICAL
.
### Uso de Logs en Flujos y Tareas
Para agregar logs personalizados dentro de los flujos y tareas de Prefect, se pueden utilizar los métodos logger.debug()
, <http://logger.info|logger.info>()
, logger.warning()
, logger.error()
, y logger.critical()
que corresponden a los distintos niveles de severidad.
### Visualización de Logs
Los logs generados durante la ejecución de un flujo se pueden visualizar en tiempo real a través de la interfaz de usuario de Prefect o mediante la interfaz de línea de comandos. También es posible configurar la persistencia de logs para almacenar y acceder a ellos en un momento posterior.
### Integraciones de Logeo
Prefect ofrece integraciones con sistemas de logeo centralizados como ELK, Stackdriver, Datadog, entre otros. Esta capacidad permite a los usuarios aprovechar herramientas de monitoreo y alertas avanzadas para mantener la operatividad de sus flujos de trabajo.
En resumen, un buen manejo del logeo en Prefect es esencial para la operación y supervisión de tareas y flujos, facilitando así la gestión de la ejecución de los mismos y la resolución de incidencias.Lucas
01/26/2024, 12:48 AMMarvin
01/26/2024, 12:49 AMlogging
de Python y obtener una instancia de logger
específico de Prefect. Luego, puedes utilizar esa instancia para registrar mensajes de log en diferentes niveles de severidad.
python
from prefect import task, Flow
import logging
# Obtiene una instancia del logger de Prefect
logger = logging.getLogger("prefect")
@task
def tarea1():
# Registra un mensaje informativo
<http://logger.info|logger.info>("Ejecutando la tarea 1")
@task
def tarea2(x):
# Registra un mensaje de debug
logger.debug(f"Tarea 2 recibió el argumento: {x}")
# Simula alguna validación y registra una advertencia si es necesario
if x < 0:
logger.warning("x es menor que 0; esto podría ser un problema.")
# Regresa algún valor
return x * 2
# Define un flujo que usa ambas tareas
with Flow("mi_flujo_de_ejemplo") as flow:
resultado1 = tarea1()
resultado2 = tarea2(resultado1)
# Ejecuta el flujo
if __name__ == "__main__":
flow.run()
En este ejemplo, la tarea1
registra un mensaje informativo indicando que está en ejecución. La tarea2
, por otro lado, registra mensajes de depuración (debug) y advertencia (warning) dependiendo de las condiciones durante su ejecución.
Éstos logs se mostrarán en la consola durante la ejecución del flujo y también serán accesibles a través de la UI de Prefect si estás monitoreando tus flujos en Prefect Cloud o Prefect Server.
Recuerda configurar el nivel adecuado de logeo antes de la ejecución del flujo si necesitas más o menos detalle en tus registros. Esto se hace generalmente al comienzo de tu script:
python
logging.basicConfig(level=<http://logging.INFO|logging.INFO>)
Configurar el nivel a INFO
asegurará que todos los logs INFO
, WARNING
, ERROR
, y CRITICAL
sean mostrados, pero omitirá los de DEBUG
a menos que cambies el nivel a logging.DEBUG
.Lucas
01/26/2024, 1:15 PMMarvin
01/26/2024, 1:16 PMlogging
, el módulo estándar de Python, se incluye un "logger" configurado específicamente para interactuar con Prefect.
Cuando utilizas simplemente logging
sin agregarlo como un "extra logger", puedes notar que los mensajes no aparecen en la terminal. Esto sucede porque Prefect redirige y gestiona los logs a través de su propio sistema de logeo, el cual está diseñado para capturar, etiquetar y enviar los logs de la ejecución de tus flujos para su posterior visualización en la UI.
Para asegurarte de que tus registros aparezcan como esperas, debes utilizar el logger de Prefect:
python
from prefect import flow, task
import prefect
@task
def mi_tarea():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Ejecutando tarea")
@flow
def mi_flujo():
mi_tarea()
mi_flujo()
Usando prefect.context.get("logger")
, obtienes el logger configurado por Prefect que está preparado para funcionar con todas las capacidades de logeo de la plataforma, incluyendo la correcta emisión de logs en la terminal y en la UI.Lucas
01/26/2024, 2:18 PMMarvin
01/26/2024, 2:19 PM[Código Local] --> |define| --> [Flujo (Flow)]
|
|-----|deploy|----> [Archivo prefect.yml] --> |registra en| --> [Servidor Prefect]
| |
| |--> |crea| --> [Deployment]
| |
| |--> |asigna| --> [Work Pool]
| |
| |--> [Workers]
| |
| |--> |escucha| --> [Flow Runs]
|
|--> |ejecuta| --> [Tareas (Tasks)]
Descripción del diagrama:
1. Código Local: Es donde se define el flujo y las tareas en una máquina de desarrollo.
2. Flujo (Flow): Representa la estructura lógica de las tareas y cómo se conectan entre sí.
3. Archivo prefect.yml: Define la configuración del despliegue (deployment) como el work pool, etiquetas y variables de entorno.
4. Servidor Prefect: Gestiona los flujos, las ejecuciones y los recursos necesarios para la orquestación.
5. Deployment: Configuración del despliegue de un flujo, que incluye dónde y cómo debe ejecutarse.
6. Work Pool: Grupos de workers destinados a ejecutar flujos con etiquetas específicas.
7. Workers: Agentes que ejecutan las instancias de flujos (flow runs).
8. Flow Runs: Instancias de flujos que se están ejecutando, cada una con su propio estado y contexto.
Los pasos a alto nivel son:
- Desarrollas el flujo localmente y lo defines programáticamente.
- Creas un archivo prefect.yml
que especifica la configuración de despliegue y lo registras en el servidor de Prefect.
- El servidor de Prefect, a través del proceso de despliegue, crea un 'Deployment' que incluye información sobre qué 'Work Pool' debe manejar el flujo.
- Los 'Workers' asociados con el 'Work Pool' especificado escuchan las nuevas instancias de ejecución del flujo ('Flow Runs').
- Cuando hay un 'Flow Run', los 'Workers' ejecutan las tareas definidas en el flujo de acuerdo con la configuración del despliegue.
Cada elemento en este esquema está diseñado para maximizar la escalabilidad, la flexibilidad y la tolerancia a fallos de Prefect.Lucas
01/26/2024, 2:20 PMMarvin
01/26/2024, 2:21 PMmermaid
graph TD
Código_Local["Código Local"] --> |define| Flujo["Flujo (Flow)"]
Código_Local --> |deploy| Prefect_yml["Archivo prefect.yml"]
Prefect_yml --> |registra en| Servidor_Prefect["Servidor Prefect"]
Servidor_Prefect --> |crea| Deployment["Deployment"]
Deployment --> |asigna| Work_Pool["Work Pool"]
Work_Pool --> Workers["Workers"]
Workers --> |escucha| Flow_Runs["Flow Runs"]
Flow_Runs --> |ejecuta| Tareas["Tareas (Tasks)"]
Este diagrama Mermaid muestra cómo tu código local interactúa con el sistema Prefect mediante despliegues (deployments), cómo los servicios del servidor Prefect manejan los recursos, y cómo los workers ejecutan las tareas.
Para visualizar este diagrama, necesitarás un renderizador de Mermaid, como una extensión de navegador, un visualizador en línea o cualquier herramienta que soporte la sintaxis de Mermaid.Lucas
01/26/2024, 2:25 PMMarvin
01/26/2024, 2:26 PMmermaid
graph TB
Código_Local["Código Local"] --> |define| Flujo
Código_Local --> |deploy| Prefect_yml["Archivo prefect.yml"]
Prefect_yml --> |registra en| Servidor_Prefect["Servidor Prefect"]:::servidor
subgraph " "
direction LR
Servidor_Prefect --> |crea| Deployment["Deployment"]:::deployment
Deployment --> |asigna| Work_Pool["Work Pool"]:::workpool
end
subgraph " "
direction TB
Work_Pool --> Workers["Workers"]:::workers
Workers --> |escucha| Flow_Runs["Flow Runs"]:::flowruns
Flow_Runs --> |ejecuta| Tareas["Tareas (Tasks)"]:::tareas
end
classDef default fill:#f9f,stroke:#333,stroke-width:2px;
classDef servidor fill:#bbf,stroke:#333,stroke-width:2px;
classDef deployment fill:#fbf,stroke:#f66,stroke-width:2px,color:#f66;
classDef workpool fill:#fbb,stroke:#333,stroke-width:2px;
classDef workers fill:#bfb,stroke:#333,stroke-width:2px;
classDef flowruns fill:#ccf,stroke:#333,stroke-width:2px;
classDef tareas fill:#ffc,stroke:#333,stroke-width:2px;
class Flujo default;
Este diagrama muestra los distintos componentes de Prefect, comenzando con la definición y despliegue del flujo en la parte superior, y moviéndose hacia la gestión de ejecución en la parte inferior. Los colores ayudan a diferenciar cada uno de los componentes del sistema.
Para utilizar este diagrama, asegúrate de que tu herramienta de visualización soporte la sintaxis de Mermaid y las clases de estilo personalizadas para los colores.Lucas
01/26/2024, 2:27 PMMarvin
01/26/2024, 2:27 PMLucas
01/26/2024, 3:30 PMMarvin
01/26/2024, 3:30 PMEscribe en español y de manera resumida con un unico ejemplo esto:## What is a flow? </concepts/flows/|Flows> are like functions. They can take inputs, perform work, and return an output. In fact, you can turn any function into a Prefect flow by adding the
@flow
decorator.
When a function becomes a flow, its behavior changes, giving it the following advantages:
- All runs of the flow have persistent </concepts/states/|state>. Transitions between states are recorded, allowing for flow execution to be observed and acted upon.
- Input arguments can be type validated as workflow parameters.
- Retries can be performed on failure.
- Timeouts can be enforced to prevent unintentional, long-running workflows.
- Metadata about <#flow-runs|flow runs>, such as run time and final state, is automatically tracked.
- They can easily be elevated to a </concepts/deployments/|deployment>, which exposes a remote API for interacting with it
## Run your first flow
The simplest way to get started with Prefect is to annotate a Python function with the @flow
decorator.
The script below fetches statistics about the <https://github.com/PrefectHQ/prefect|main Prefect repository>.
Let's turn it into a Prefect flow and run it:
```python title="repo_info.py" hl_lines="2 5"
import httpMarvin
01/26/2024, 3:32 PMpython
from prefect import task, Flow
@task
def extraer():
# Aquí iría la lógica para extraer los datos.
return {'data': 'datos extraídos'}
@task
def transformar(datos):
# Aquí se procesarían o transformarían los datos.
return {'data': f'{datos["data"]} transformados'}
@task
def cargar(datos):
# Aquí se cargarían los datos a su destino final.
print(f'Cargando: {datos["data"]}')
with Flow("ETL") as flow:
datos_extraidos = extraer()
datos_transformados = transformar(datos_extraidos)
cargar(datos_transformados)
# Ejecución del flujo
flow.run()
En este ejemplo, el flujo llamado "ETL" contiene tres tareas: extraer
, transformar
, y cargar
. Cada tarea está decorada con @task
, lo cual las convierte en tareas de Prefect que pueden ser gestionadas por el flujo. Usamos with Flow("ETL") as flow:
para definir el flujo y sus dependencias. La función flow.run()
sirve para ejecutar el flujo.
El modelo de Prefect permite un control fino sobre la ejecución de tareas, manejo de errores, y la posibilidad de programar o ejecutar flujos en un entorno distribuido o en la nube.Lucas
01/26/2024, 3:56 PMMarvin
01/26/2024, 3:56 PMLucas
01/26/2024, 4:00 PMMarvin
01/26/2024, 4:01 PMprefect work-pool create [OPCIONES] NOMBRE
Donde NOMBRE
es un nombre único y obligatorio para la Work Pool. También puedes especificar parámetros opcionales para configurar la Work Pool, como el estado inicial (activo o en pausa), el tipo de infraestructura que ejecutará los flujos y si deseas que esta Work Pool sea la predeterminada para los despliegues locales.
Si prefieres usar la interfaz de usuario de Prefect (UI), sigue estos pasos:
1. Navega a la pestaña de Work Pools en la interfaz de usuario de Prefect.
2. Verifica que ves una lista de los Work Pools actualmente configurados.
3. Puedes crear una nueva Work Pool seleccionando el botón "+" y especificando los detalles de la misma.
Una vez que hayas configurado y creado tu Work Pool, podrás asignar flujos a ese grupo para que los agentes comiencen a escuchar y ejecutar las instancias de flujos de trabajo asignadas. Recuerda que cualquier cambio que hagas en la configuración de la Work Pool será inmediatamente reflejado en los agentes que estén solicitando trabajo para esa Work Pool específica.Lucas
01/26/2024, 5:11 PMMarvin
01/26/2024, 5:11 PMLucas
01/26/2024, 11:37 PMMarvin
01/26/2024, 11:38 PMpython
from prefect import config
# Obtener la URL de la API de Prefect desde la configuración
prefect_api_url = config.get("PREFECT_API_URL")
print(f"La URL de la API de Prefect es: {prefect_api_url}")
En este código, config
es un módulo de Prefect que proporciona acceso a las variables de configuración de Prefect. Si configuraste la URL de la API como una variable de entorno o a través del CLI de Prefect utilizando prefect config set PREFECT_API_URL="tu_url"
, podrás obtenerla directamente desde Python como se muestra arriba.
Si estás trabajando en un entorno donde la URL de la API no está configurada a través del perfil de Prefect, podrías necesitar configurarla manualmente o asegurarte de que está correctamente configurada como una variable de entorno en tu sistema.Lucas
01/26/2024, 11:39 PM<https://orion-docs.prefect.io/migration_guide/>
. Otherwise ensure that your code is pinned to the expected version.
warnings.warn(
Traceback (most recent call last):
File "<string>", line 1, in <module>
_ImportError: cannot import name 'config' from 'prefect' (C:\Users\Lucas\miniconda3\envs\electraenv\Lib\site-packages\prefect\__init__.py)_Marvin
01/26/2024, 11:39 PMprefect.config
from Prefect 1.x while you have Prefect 2.x installed. In Prefect 2.x (also known as Orion), the way configurations are handled has changed significantly.
In Prefect 2.x, you need to use prefect.settings
instead of prefect.config
. For example, to retrieve the API URL, you would do the following:
python
from prefect import get_settings
# Get the Orion API URL
prefect_api_url = get_settings().api_url
print(f"La URL de la API de Prefect es: {prefect_api_url}")
To resolve your issue, ensure your code is updated to be compatible with Prefect 2.x by utilizing the new prefect.settings
module, or if you need to continue using Prefect 1.x, make sure to install the correct version of Prefect that is compatible with your codebase.
For more information on migrating from Prefect 1.x to 2.x, you can refer to the Prefect 2.x migration guide.Lucas
01/26/2024, 11:45 PMMarvin
01/26/2024, 11:46 PMPREFECT_API_URL
and PREFECT_API_KEY
as environment variables if you're running Prefect 2.x. The Prefect CLI command to set these is:
bash
prefect config set PREFECT_API_URL="<https://api.prefect.cloud/api/accounts/[ACCOUNT-ID]/workspaces/[WORKSPACE-ID]>"
prefect config set PREFECT_API_KEY="[API-KEY]"
Then, in Python, you could access these environment variables using the standard library:
python
import os
PREFECT_API_URL = os.getenv("PREFECT_API_URL")
PREFECT_API_KEY = os.getenv("PREFECT_API_KEY")
It's worth mentioning that the exact method to access Prefect settings in code may vary depending on the specific version of Prefect 2.x you're using. If you're still encountering issues, I would recommend checking the latest Prefect 2.x documentation or reaching out to the Prefect community for the most up-to-date guidance.Lucas
01/28/2024, 12:23 AMMarvin
01/28/2024, 12:24 AMpython
from prefect import Client
client = Client()
flow_run_id = "id-of-the-flow-run" # Replace with the actual flow run ID
new_tag = "new-tag"
# Add a tag to an existing flow run
client.update_flow_run_tags(flow_run_id=flow_run_id, add=[new_tag])
The above example assumes that the prefect client handles authentication with the Prefect API server and that there is a method update_flow_run_tags
which allows you to add new tags. This method might not exist as described, and you might need to use a different approach such as a GraphQL mutation to achieve the desired effect.
I recommend checking the latest Prefect documentation or source code for the correct method to add tags to flow runs in the version of Prefect you are using. If you're using Prefect Cloud, the API or UI may provide a way to add tags after a flow run has been created.Lucas
01/30/2024, 3:46 PMMarvin
01/30/2024, 3:47 PMsys.path
or your PYTHONPATH to include the directories where the flows live.
2. Orchestrate Sub-flows: In the parent flow, you could call these imported flow functions. Each call would effectively start a new flow run for the respective sub-flow.
3. Run in Parallel or Sequentially: Depending on your requirements, you can run these sub-flows in parallel using Prefect's concurrent
features or run them sequentially.
Here's an example with hypothetical function names and file structures:
python
import sys
# Add the directories of your flows to the Python path if necessary
sys.path.append('/path/to/flow1_directory')
sys.path.append('/path/to/flow2_directory')
from flow1_directory.flow1 import flow1
from flow2_directory.flow2 import flow2
from prefect import flow
@flow
def parent_flow():
# Run flows sequentially
flow1_run = flow1()
flow2_run = flow2()
# Alternatively, run flows in parallel using Prefect's concurrent features
# to be adjusted according to your actual use case
parent_flow()
In the example above, flow1
and flow2
are imported from their respective locations and then orchestrated within the parent_flow
. You would execute parent_flow
to run the sub-flows.
Please ensure that you handle the dependency management appropriately if the sub-flows are expected to run in a particular order or have dependencies on each other's output.
Always test this setup in a development environment to verify that the sub-flows are being called correctly, and make sure you manage the Prefect contexts and parameters as necessary for your specific use case.Lucas
01/30/2024, 5:29 PMMarvin
01/31/2024, 8:28 PMLucas
01/31/2024, 8:35 PMMarvin
01/31/2024, 8:35 PMWhen I run prefect deploy this appears:a Fer D y Omar(devenv) PS C:\Reportes_Power_BI\Python\dev> prefect deploy ? Select a flow to deploy [Use arrows to move; enter to select; n to select none] ┌────┬──────────────┬───────────────────────────────────────────────────────────────────────────────┐ │ │ Flow Name │ Location │ ├────┼──────────────┼───────────────────────────────────────────────────────────────────────────────┤ │ > │ test_flow │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ test_flow │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ test_flow │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ test_flow │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ foo │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ bar │ devenv\Lib\site-packages\prefect\testing\standard_test
Marvin
01/31/2024, 8:35 PMWhen I run prefect deploy this appears:(devenv) PS C:\Reportes_Power_BI\Python\dev> prefect deploy ? Select a flow to deploy [Use arrows to move; enter to select; n to select none] ┌────┬──────────────┬───────────────────────────────────────────────────────────────────────────────┐ │ │ Flow Name │ Location │ ├────┼──────────────┼───────────────────────────────────────────────────────────────────────────────┤ │ > │ test_flow │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ test_flow │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ test_flow │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ test_flow │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ foo │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ bar │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py
Marvin
01/31/2024, 8:36 PMWhen I run prefect deploy this appears:(devenv) PS C:\Reportes_Power_BI\Python\dev> prefect deploy ? Select a flow to deploy [Use arrows to move; enter to select; n to select none] │ │ Flow Name │ Location │ │ > │ test_flow │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ foo │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ bar │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ test_flow │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ test_flow │ devenv\Lib\site-packages\prefect\testing\standard_test_suites\task_runners.py │ │ │ cargar_stock │ Prefect_Test\stock_softland_prefect.py │ │ │ mi_flujo │ Prefect_Test\test.py │ │ │ test_flow │ Prefect_Test\test_logs.py │ │ │ mi_flujo │ Prefect_Test\test_logs_commons.py │ └────┴
Marvin
01/31/2024, 8:37 PMprefect deploy
, Prefect presents you with a list of detected flows that you can deploy. The list you're seeing includes flows found in your environment along with their locations.
To proceed with the deployment, you should:
1. Use the arrow keys to navigate to the flow you want to deploy.
2. Press Enter to select the flow.
For example, if you want to deploy the flow named "cargar_stock" located in Prefect_Test\stock_softland_prefect.py
, you would move the selection to that flow and press Enter. After selecting a flow, Prefect will guide you through additional configuration steps needed for the deployment, such as setting the flow's name, version, and any deployment-specific settings (e.g., scheduling, infrastructure, etc.).
Keep in mind that the paths being shown might point to locations within the virtual environment (e.g., devenv\Lib\...
). If that is not expected, make sure the PYTHONPATH
is set correctly, and your Python files are structured properly. It's unusual for the flows to be deployed from within the site-packages
directory unless you're deliberately deploying flows that are part of an installed package.
If you're seeing unexpected flows or locations, you might want to check the current working directory and make sure you're running prefect deploy
from the root directory of your project, where your flow definitions reside. Also, verify that your flows are uniquely named, which will help Prefect distinguish between them, especially if you have multiple flows in different locations with the same name.Lucas
01/31/2024, 9:39 PMMarvin
01/31/2024, 9:40 PMenv
directory) during the deployment process with Prefect, it suggests that those Python files contain flow definitions. This might happen if the virtual environment was created in the same directory as your project, or if the environment contains installed packages that have Prefect flows in them.
Here are some steps you can take to avoid deploying flows from the environment directory:
1. Virtual Environment Location: Consider creating your virtual environment outside of your project directory to prevent Prefect from scanning it for flow definitions. Commonly, developers will place virtual environments in a separate directory entirely, such as a .venvs
folder in their home directory or elsewhere on their filesystem.
2. Structure Your Project: Ensure your project is structured in such a way that only your own flow definitions are in the main project directory and that the virtual environment is excluded.
3. Clarify Your PYTHONPATH: The PYTHONPATH
environment variable might need to be set or adjusted to include only the directories that contain your flow definitions. Remove any unnecessary paths that might lead to the environment directory being scanned.
4. Use .prefectignore File: You can use a .prefectignore
file to prevent specific directories and patterns from being scanned by Prefect when detecting flows. This is analogous to .gitignore
in Git. Simply create a .prefectignore
file in your project root and add patterns to exclude, such as:
plaintext
env/
**/__pycache__/
**/*.pyc
5. Explicitly Specify Flow Paths: When running prefect deploy
, you can explicitly point to the file that contains your flow definitions instead of letting Prefect scan all directories.
If you take these steps and still encounter issues, you may need to revisit how Prefect is picking up flows in your environment and ensure the deployment command is run in the correct context.Lucas
06/11/2024, 4:44 PMMarvin
06/11/2024, 4:45 PM__file__
attribute, which is available in the global namespace of the module where the task is defined.
2. Pass the Flow's script path to the task, which can be done using Prefect's context
or parameter passing
.
Here’s a step-by-step guide on how to achieve it:
### 1. Define your Flow and Task
Let's assume you have the following directory structure:
project/
├── flow_script.py
└── some_module/
└── task_module.py
### 2. Define your task in task_module.py
python
# project/some_module/task_module.py
from prefect import task
@task
def my_task():
import __main__
flow_script_path = __main__.__file__
print(f"The flow script path is: {flow_script_path}")
### 3. Define your flow in flow_script.py
and call the task
python
# project/flow_script.py
from prefect import flow
from some_module.task_module import my_task
@flow
def my_flow():
my_task()
if __name__ == "__main__":
my_flow()
When you run flow_script.py
, the task my_task
will print the path to the flow_script.py
file.
### Explanation
- __file__
Attribute: This attribute is a built-in way in Python to obtain the path of the script being executed. By importing __main__
within the task, you access the __file__
attribute from the main execution module, which in this context is the flow script.
- Module Importation: Ensure that your flow_script.py
correctly sets the module search path or is executed in a way that Python can find and import some_module
.
### Running the Flow
Navigate to the project
directory and run the flow script:
bash
python flow_script.py
The output will be:
plaintext
The flow script path is: project/flow_script.py
This approach leverages Python’s introspection features and the global namespace __main__
, making it possible to retrieve and print paths at runtime from within different modules and tasks.Lucas
07/26/2024, 3:56 PMMarvin
07/26/2024, 3:56 PMbash
prefect variables set MY_VARIABLE "Some Value"
Using Prefect Python API:
python
from prefect import variables
# Set a variable
variables.set_variable("MY_VARIABLE", "Some Value")
### 2. Getting a Variable
In another script, you can retrieve the variable value.
Using Prefect Python API:
python
from prefect import flow, task, variables
@task
def read_variable():
value = variables.get_variable("MY_VARIABLE")
print(f"The value of MY_VARIABLE is: {value}")
return value
@flow
def monitor_flow():
read_variable()
if __name__ == "__main__":
monitor_flow()
### Example Workflow
#### Script 1: Execution Flow (execution_flow.py)
python
from prefect import flow, task, variables
@task
def set_variable():
variables.set_variable("MY_VARIABLE", "Execution Complete")
@flow
def execution_flow():
set_variable()
if __name__ == "__main__":
execution_flow()
#### Script 2: Monitoring Flow (monitoring_flow.py)
python
from prefect import flow, task, variables
@task
def read_variable():
value = variables.get_variable("MY_VARIABLE")
print(f"The value of MY_VARIABLE is: {value}")
return value
@flow
def monitor_flow():
read_variable()
if __name__ == "__main__":
monitor_flow()
### Running the Flows
1. First, run the execution_flow.py
script to set the variable.
bash
python execution_flow.py
2. Then, run the monitoring_flow.py
script to read the variable.
bash
python monitoring_flow.py
### Summary
Using Prefect Variables, you can store and share non-sensitive information between different flows and scripts. The CLI commands, Prefect UI, and Prefect Python API provide flexible options for managing these shared variables. This approach works well for configuration settings and other non-sensitive data required across multiple workflows.
For more details, you can refer to the Prefect Variables documentation.Lucas
07/26/2024, 4:00 PMMarvin
07/26/2024, 4:00 PMpython
from prefect import flow, task, variables
@task
def set_var():
variables.set_variable("VAR1", "Value from script 1")
@flow
def script1_flow():
set_var()
if __name__ == "__main__":
script1_flow()
Script 2:
python
from prefect import flow, task, variables
@task
def set_var():
variables.set_variable("VAR2", "Value from script 2")
@flow
def script2_flow():
set_var()
if __name__ == "__main__":
script2_flow()
#### Retrieving Variables in Monitoring Script:
python
from prefect import flow, task, variables
@task
def read_variables():
var1 = variables.get_variable("VAR1")
var2 = variables.get_variable("VAR2")
print(f"VAR1: {var1}, VAR2: {var2}")
@flow
def monitoring_flow():
read_variables()
if __name__ == "__main__":
monitoring_flow()
### Option 2: Use Prefect Context or Parameters
You can use context or parameters if the flows need to share some runtime configurations.
### Option 3: External Storage
For more complex scenarios, consider using external storage like a database or cloud storage to share states between flows. This adds an extra layer of flexibility and persistence beyond the Prefect environment.
### Example Using Prefect Context (if flows are related):
python
from prefect import flow, task, context
# Script 1
@task
def set_var():
context.get_run_context().task_run.state["var_from_script1"] = "Value from script 1"
@flow
def script1_flow():
set_var()
if __name__ == "__main__":
script1_flow()
# Script 2
@task
def set_var():
context.get_run_context().task_run.state["var_from_script2"] = "Value from script 2"
@flow
def script2_flow():
set_var()
if __name__ == "__main__":
script2_flow()
# Monitoring Script
@task
def read_variables():
var1 = context.get_run_context().variables.get("var_from_script1")
var2 = context.get_run_context().variables.get("var_from_script2")
print(f"var_from_script1: {var1}, var_from_script2: {var2}")
@flow
def monitoring_flow():
read_variables()
if __name__ == "__main__":
monitoring_flow()
### Option 4: External Storage with a Database
Using an external database could be necessary for storing states or sharing configurations across asynchronous flow runs. Each script would read and write to a common database.
Real-world implementation might include:
- Writing data: Each execution flow writes the variable to the database.
- Reading data: The monitoring flow reads the variable from the database.
Example with pseudo-code implementation:
```python
import sqlite3
from prefect import flow, task
# Database setup (done once)
conn = sqlite3.connect('variables.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS variables (name TEXT PRIMARY KEY, value TEXT)''')
conn.commit()
conn.close()
# Script 1
@task
def set_var():
conn = sqlite3.connect('variables.db')
c = conn.cursor()
c.execute("INSERT OR REPLACE INTO variables (name, value) VALUES ('var1', 'Value from script 1')")
conn.commit()
conn.close()
@flow
def script1_flow():
set_var()
if name == "__main__":
script1_flow()
# Monitoring Script
@task
def read_variables():
conn = sqlite3.connect('variables.db')
c = conn.cursor()
c.execute("SELECT value FROM variables WHERE name='var1'")
var1 = c.fetchone()[0]
c.execute("SELECT value FROM variables WHERE name='var2'")
var2 = c.fetchone()[0]
conn.close()Marvin
07/26/2024, 4:00 PMLucas
07/26/2024, 4:04 PMMarvin
07/26/2024, 4:05 PMpython
from prefect import flow, task
from prefect.client import get_client
@task
def set_metadata():
client = get_client()
flow_run_id = client.get_current_flow_run().id
metadata = {"script": "script1", "info": "This is script 1 execution"}
client.update_flow_run_metadata(flow_run_id, metadata)
return metadata
@flow
def flow1():
set_metadata()
if __name__ == "__main__":
flow1()
#### Script 2 (flow_script2.py)
python
from prefect import flow, task
from prefect.client import get_client
@task
def set_metadata():
client = get_client()
flow_run_id = client.get_current_flow_run().id
metadata = {"script": "script2", "info": "This is script 2 execution"}
client.update_flow_run_metadata(flow_run_id, metadata)
return metadata
@flow
def flow2():
set_metadata()
if __name__ == "__main__":
flow2()
### Retrieving Metadata in the Monitoring Script
#### Monitoring Script (monitoring_script.py)
python
from prefect import flow, task
from prefect.client import get_client
@task
def get_metadata(flow_run_id):
client = get_client()
flow_run_metadata = client.get_flow_run_metadata(flow_run_id)
return flow_run_metadata
@flow
def monitoring_flow(flow_runs):
for run_id in flow_runs:
metadata = get_metadata(run_id)
print(f"Metadata for flow run {run_id}: {metadata}")
if __name__ == "__main__":
# Dummy flow run IDs for the demonstration, replace with actual flow run IDs
flow_runs = ["flow_run_id_1", "flow_run_id_2"]
monitoring_flow(flow_runs)
### Running the Flows
1. Execute each script to set metadata:
bash
python flow_script1.py
python flow_script2.py
2. Run the monitoring script to retrieve and display metadata:
bash
python monitoring_script.py
### Explanation
1. Client Initialization: The get_client()
function initializes a Prefect client that allows interacting with the Prefect API.
2. Setting Metadata: In each execution flow, the set_metadata
task sets some metadata and associates it with the current flow run using update_flow_run_metadata
.
3. Retrieving Metadata: The monitoring script uses get_flow_run_metadata
to retrieve the metadata for specific flow runs using their IDs.
By using this approach, you can share and retrieve variable data or metadata between different flow runs, even if they are defined in separate scripts and have no direct correlation. Make sure to replace "flow_run_id_1"
and "flow_run_id_2"
with the actual flow run IDs you want to monitor.Lucas
07/26/2024, 5:10 PMMarvin
07/26/2024, 5:10 PMpython
from prefect.blocks.system import String
# Creating a String block
api_endpoint = String(value="<https://api.example.com/data>")
api_endpoint.save(name="api-endpoint")
# Using the String block in a flow
@flow
def my_flow():
endpoint = String.load("api-endpoint").value
print(f"API Endpoint: {endpoint}")
if __name__ == "__main__":
my_flow()
- Variable:
python
from prefect import flow, task, variables
@task
def read_variable():
config_value = variables.get_variable("config_key")
print(f"Config Value: {config_value}")
@flow
def my_flow():
read_variable()
if __name__ == "__main__":
# Set a variable using CLI or programmatically
variables.set_variable("config_key", "config_value")
my_flow()
### Summary
- String Blocks:
- Best for reusable, constant data (e.g., configurations, paths, URLs).
- Managed and versioned within Prefect’s infrastructure.
- Ideal for configurations used across multiple flows/tasks.
- Variables:
- Best for frequently accessed but occasionally updated data.
- Scoped to Prefect's server instance or workspace.
- Suitable for managing runtime parameters and dynamic configurations, but not sensitive data.
Use String Blocks for structured, reusable strings that might require version control and integration within Prefect's workflows. Use Variables for simple, mutable configuration values that might change and are frequently accessed across different runs.