Joshua Caskie
08/26/2024, 4:31 PMstart_date
parameter to a flow? How are schedules used to set parameters for flows, which can then be retried with the same parameters if they fail?Marvin
08/26/2024, 4:32 PMstart_date
parameter and ensure that failed runs can be retried with the same parameters, follow these steps:
1. Define the Flow with Parameters:
Define your flow and include the start_date
parameter.
python
from datetime import datetime
from prefect import flow
@flow
def my_flow(start_date: datetime):
print(f"Flow started with start_date: {start_date}")
2. Schedule the Flow:
Use Prefect's scheduling capabilities to create scheduled runs. You can specify the parameters when creating the deployment.
python
from prefect.deployments import Deployment
from prefect.orion.schemas.schedules import IntervalSchedule
from datetime import timedelta
# Create a schedule that runs every day
schedule = IntervalSchedule(interval=timedelta(days=1))
# Specify the parameters for each scheduled run
deployment = Deployment.build_from_flow(
flow=my_flow,
name="scheduled-flow",
parameters={"start_date": datetime.utcnow()},
schedule=schedule
)
# Apply the deployment
deployment.apply()
3. Retrying Failed Runs:
Prefect automatically retries failed flow runs based on your retry settings. To ensure retries use the same parameters, leverage Prefect's retry policy.
For example, specify a retry policy when defining your flow:
python
@flow(retries=3, retry_delay_seconds=60) # 3 retries with a 60 second delay
def my_flow(start_date: datetime):
print(f"Flow started with start_date: {start_date}")
### Summary
- Define your flow with the necessary parameters.
- Schedule the flow using Deployment.build_from_flow
and pass the parameters.
- Set retry policies within your flow definition to handle failures and automatic retries.
For more detailed information, refer to the Prefect Schedules documentation and the Prefect Flows documentation. You can also see the sample code examples related to flow scheduling.Joshua Caskie
08/26/2024, 4:34 PMMarvin
08/26/2024, 4:34 PMstart_date
, your prefect.yaml
would look something like this:
yaml
# prefect.yaml
prefect-version: 2.0
name: my_project
build: null
push: null
pull: null
deployments:
- name: my_flow_deployment
version: 1
schedule:
type: IntervalSchedule
interval: "1 day"
parameters:
start_date: "{{ now }}"
entrypoint: path/to/my_flow.py:my_flow
work_pool:
name: "my-work-pool"
work_queue_name: "my-work-queue"
### Explanation
- prefect-version: Specifies the version of Prefect.
- name: The name of your project.
- deployments: Defines the deployment configurations.
- name: Name of the deployment.
- version: Version of the deployment (optional).
- schedule: Defines the schedule for the flow. Here, we are using an IntervalSchedule
that runs daily.
- parameters: Specifies the parameters to pass to the flow run. You can use Jinja templates like {{ now }}
to dynamically set the start_date
.
- entrypoint: The location of your flow's entrypoint function.
- work_pool: Specifies the work pool configuration.
### Steps
1. Create the YAML configuration: Save the YAML content to a file named prefect.yaml
in your project directory.
2. Define the Flow: Ensure your flow is defined in path/to/my_flow.py
.
3. Deploy the Flow: Use the command prefect deploy
to create the deployment based on the configuration file.
For further details, visit the Schedules documentation and Deployments documentation.Joshua Caskie
08/26/2024, 4:36 PM{{ now }}
variable from? Is that in the docs, or in a github example?Marvin
08/26/2024, 4:36 PM{{ now }}
as a Jinja template variable in the YAML configuration isn't explicitly documented in the examples or directly mentioned in the issues I checked on GitHub. My apologies for any confusion caused by this assumption. Instead, you can use a custom Jinja template to achieve the equivalent effect.
You can leverage Jinja templating within the YAML file for more dynamic parameters. Here’s how you can structure your YAML configuration without relying on undocumented variables:
yaml
# prefect.yaml
prefect-version: 2.0
name: my_project
build: null
push: null
pull: null
deployments:
- name: my_flow_deployment
version: 1
schedule:
type: IntervalSchedule
interval: "1 day"
parameters:
start_date: "{{ timestamp_utc }}"
entrypoint: path/to/my_flow.py:my_flow
work_pool:
name: "my-work-pool"
work_queue_name: "my-work-queue"
In this case, {{ timestamp_utc }}
would be evaluated at runtime to pass the current UTC timestamp as the start_date
.
### Summary
- YAML Configuration: Use timestamp_utc
or any other dynamic values supported by Prefect’s Jinja2 templating.
- Deploying Flows: Use prefect deploy
to apply the deployment.
For more examples and up-to-date information, please refer to the Prefect Deployments documentation and the specific section on Schedules.Joshua Caskie
08/26/2024, 4:37 PMMarvin
08/26/2024, 4:38 PMNate
08/26/2024, 4:44 PM{{ prefect.blocks.secret.whatever }}
• variables {{ <http://prefect.variables.my|prefect.variables.my>_value }}
• env vars {{ $SOME_ENV_VAR }}
• build / push step outputs {{ some_upstream_step.result }}
Joshua Caskie
08/26/2024, 7:26 PMNate
08/26/2024, 7:54 PMbuild
step like
- entrypoint: flows/my_flow.py:some_flow
name: test
build:
- prefect.deployments.steps.run_shell_script:
script: prefect variable set now "$(date '+%m %d %Y')" --overwrite
expand_env_vars: true
parameters:
date: "{{ prefect.variables.now }}"
» prefect deployment inspect 'some-flow/test' | rg param
'parameters': {'date': '08 26 2024'},