Iryna
03/11/2024, 11:01 PMJenny
03/12/2024, 12:47 AMIryna
03/12/2024, 12:54 AMdeployment = Deployment.build_from_flow(
name=depl_name,
flow=flow,
tags=tags,
parameters=params,
infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
work_queue_name=work_queue_name,
schedule=(CronSchedule(cron="0 0 * * *", timezone="America/Chicago")),
storage=storage,
entrypoint=flow_entrypoint,
)
deployment.apply()
Jenny
03/12/2024, 1:25 AMChris Pickett
03/12/2024, 1:31 PMschedules
(plural) parameter, it’s similar to schedule
but takes a list of schedules. Here’s a full example:
from prefect import flow
from prefect.deployments import Deployment
from prefect.client.schemas.schedules import CronSchedule
@flow(log_prints=True)
def local_flow():
print("I'm a flow written to test deploy from source")
if __name__ == "__main__":
deployment = Deployment.build_from_flow(
name="test-sched",
flow=local_flow,
infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
work_queue_name="managed",
schedules=[CronSchedule(cron="0 0 * * *", timezone="America/Chicago")],
)
deployment.apply()
Iryna
03/12/2024, 11:16 PMschedules=[CronSchedule(cron="0 0 * * *", timezone="America/Chicago")]
I got this error:
Error deploying flows: 1 validation error for Deployment
__root__
Invalid schedule provided. Must be a schedule object, a dict, or a MinimalDeploymentSchedule. (type=value_error)
I fixed the issue with minimal deployment schedule
schedules=[MinimalDeploymentSchedule(schedule=(CronSchedule(cron=cron, timezone=timezone))),]
Chris Pickett
03/12/2024, 11:17 PMChris Pickett
03/12/2024, 11:18 PMIryna
03/12/2024, 11:20 PMJenny
03/14/2024, 9:23 PM