<@ULVA73B9P> Optimal way to use a flow schedule to...
# marvin-in-the-wild
j
@Marvin Optimal way to use a flow schedule to pass a
start_date
parameter to a flow? How are schedules used to set parameters for flows, which can then be retried with the same parameters if they fail?
m
To schedule a flow with a
start_date
parameter and ensure that failed runs can be retried with the same parameters, follow these steps: 1. Define the Flow with Parameters: Define your flow and include the
start_date
parameter.
Copy code
python
from datetime import datetime
from prefect import flow

@flow
def my_flow(start_date: datetime):
    print(f"Flow started with start_date: {start_date}")
2. Schedule the Flow: Use Prefect's scheduling capabilities to create scheduled runs. You can specify the parameters when creating the deployment.
Copy code
python
from prefect.deployments import Deployment
from prefect.orion.schemas.schedules import IntervalSchedule
from datetime import timedelta

# Create a schedule that runs every day
schedule = IntervalSchedule(interval=timedelta(days=1))

# Specify the parameters for each scheduled run
deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="scheduled-flow",
    parameters={"start_date": datetime.utcnow()},
    schedule=schedule
)

# Apply the deployment
deployment.apply()
3. Retrying Failed Runs: Prefect automatically retries failed flow runs based on your retry settings. To ensure retries use the same parameters, leverage Prefect's retry policy. For example, specify a retry policy when defining your flow:
Copy code
python
@flow(retries=3, retry_delay_seconds=60)  # 3 retries with a 60 second delay
def my_flow(start_date: datetime):
    print(f"Flow started with start_date: {start_date}")
### Summary - Define your flow with the necessary parameters. - Schedule the flow using
Deployment.build_from_flow
and pass the parameters. - Set retry policies within your flow definition to handle failures and automatic retries. For more detailed information, refer to the Prefect Schedules documentation and the Prefect Flows documentation. You can also see the sample code examples related to flow scheduling.
j
@Marvin how would you do this in a YAML configuration?
m
To use a YAML configuration file to schedule a flow and pass parameters such as
start_date
, your
prefect.yaml
would look something like this:
Copy code
yaml
# prefect.yaml

prefect-version: 2.0
name: my_project
build: null
push: null
pull: null

deployments:
  - name: my_flow_deployment
    version: 1
    schedule:
      type: IntervalSchedule
      interval: "1 day"
    parameters:
      start_date: "{{ now }}"
    entrypoint: path/to/my_flow.py:my_flow
    work_pool:
      name: "my-work-pool"
      work_queue_name: "my-work-queue"
### Explanation - prefect-version: Specifies the version of Prefect. - name: The name of your project. - deployments: Defines the deployment configurations. - name: Name of the deployment. - version: Version of the deployment (optional). - schedule: Defines the schedule for the flow. Here, we are using an
IntervalSchedule
that runs daily. - parameters: Specifies the parameters to pass to the flow run. You can use Jinja templates like
{{ now }}
to dynamically set the
start_date
. - entrypoint: The location of your flow's entrypoint function. - work_pool: Specifies the work pool configuration. ### Steps 1. Create the YAML configuration: Save the YAML content to a file named
prefect.yaml
in your project directory. 2. Define the Flow: Ensure your flow is defined in
path/to/my_flow.py
. 3. Deploy the Flow: Use the command
prefect deploy
to create the deployment based on the configuration file. For further details, visit the Schedules documentation and Deployments documentation.
j
@Marvin Where did you get the
{{ now }}
variable from? Is that in the docs, or in a github example?
m
The use of
{{ now }}
as a Jinja template variable in the YAML configuration isn't explicitly documented in the examples or directly mentioned in the issues I checked on GitHub. My apologies for any confusion caused by this assumption. Instead, you can use a custom Jinja template to achieve the equivalent effect. You can leverage Jinja templating within the YAML file for more dynamic parameters. Here’s how you can structure your YAML configuration without relying on undocumented variables:
Copy code
yaml
# prefect.yaml

prefect-version: 2.0
name: my_project
build: null
push: null
pull: null

deployments:
  - name: my_flow_deployment
    version: 1
    schedule:
      type: IntervalSchedule
      interval: "1 day"
    parameters:
      start_date: "{{ timestamp_utc }}"
    entrypoint: path/to/my_flow.py:my_flow
    work_pool:
      name: "my-work-pool"
      work_queue_name: "my-work-queue"
In this case,
{{ timestamp_utc }}
would be evaluated at runtime to pass the current UTC timestamp as the
start_date
. ### Summary - YAML Configuration: Use
timestamp_utc
or any other dynamic values supported by Prefect’s Jinja2 templating. - Deploying Flows: Use
prefect deploy
to apply the deployment. For more examples and up-to-date information, please refer to the Prefect Deployments documentation and the specific section on Schedules.
j
@Marvin Where is the documentation for PRefec'ts Jinja2 template?
m
Unfortunately, the search results did not provide a direct link to documentation specific to using Jinja2 templates within Prefect YAML files. However, Jinja2 templating is a common feature within Prefect for various contexts, including webhooks and cloud automation. Generally, it leverages all standard Jinja2 capabilities. For more detailed guidance, you might find relevant information in Prefect's guides or concepts sections. Please refer to the Prefect Documentation. If you need explicit examples or a deep dive into template functionalities within Prefect, I recommend checking Prefect's community forum or reaching out directly within their support channels.
n
hi @Joshua Caskie - in general you can template values into your yaml from • blocks
{{ prefect.blocks.secret.whatever }}
• variables
{{ <http://prefect.variables.my|prefect.variables.my>_value }}
• env vars
{{ $SOME_ENV_VAR }}
• build / push step outputs
{{ some_upstream_step.result }}
j
@Nate Thanks. Specifically was looking for a timestamp template for the current time as I think through scheduling deployments with a "day" parameter
n
simplest way I can think of to do this all in the yaml would be to add a
build
step like
Copy code
- entrypoint: flows/my_flow.py:some_flow
  name: test
  build:
  - prefect.deployments.steps.run_shell_script:
      script: prefect variable set now "$(date '+%m %d %Y')" --overwrite
      expand_env_vars: true
    
  parameters:
    date: "{{ prefect.variables.now }}"
Copy code
» prefect deployment inspect 'some-flow/test' | rg param
    'parameters': {'date': '08 26 2024'},
🎯 1