Giacomo Chiarella
02/06/2025, 8:13 AM@flow(name=DAG_NAME, retries=0, retry_delay_seconds=30, description=f"It runs {DAG_NAME}")
def flow_entrypoint():
some code
if __name__ == "__main__":
args = check_arguments()
if args.build_deploy:
build_deploy(flow_entrypoint, DAG_NAME, DAG_NAME, ENV,
f"<flow_script_path>:flow_entrypoint", schedule=SCHEDULE, tags=["a", "b"])
else:
flow_entrypoint()
where build_deploy is
def build_deploy(flow: Flow, deployment_name: str, work_queue_name: str, storage: str, path: str, schedule: str = None,
parameters: dict = None, tags: list = None, priority: int = 1, work_queue_concurrency: int = 1,
work_pool_name: str = "default-worker-pool"):
asyncio.run(create_or_update_work_queue(work_queue_name, priority, work_queue_concurrency, work_pool_name))
flow.from_source(S3Bucket.load(storage), path) \
.deploy(name=deployment_name, work_pool_name=work_pool_name, work_queue_name=work_queue_name,
schedule=CronSchedule(cron=schedule, timezone="UTC") if schedule else None, parameters=parameters,
tags=tags)
I then save all flows in a bucket and then I created a service in the Prefect instance to pull the flows scripts from the bucket and execute
python <path to flow script> --build-deploy
the service is scheduled to run every X hours.
I have the feeling this approach is not scalable and the more flows you write the longer the service will run, which means at some point is may overlap one run to the next one. Is it the right approach?Giacomo Chiarella
02/06/2025, 10:51 AMStephen Herron
02/06/2025, 1:12 PMStephen Herron
02/06/2025, 1:22 PMfrom prefect import deploy
from prefect.client.schemas.schedules import CronSchedule
from flows import flow
from utils.deployment_defaults import (
default_tags,
default_storage,
default_work_pool,
)
is_schedule_active = bool(get_env() == "production")
deploy_a_params = {}
deploy_n_params = {}
entrypoint = "path/to/entrypoint:entrypoint"
if __name__ == "__main__":
deploy(
flow.from_source(
source=default_storage,
entrypoint=entrypoint,
).to_deployment(
name="deployment_a",
tags=[*default_tags],
parameters=deploy_a_params,
enforce_parameter_schema=True,
),
flow.from_source(
source=default_storage,
entrypoint=entrypoint,
).to_deployment(
name="whatever",
tags=[*default_tags, "has-schedule"],
parameters=deploy_n_params,
schedules=[
CronSchedule(
cron="40 4 * * *",
timezone="Europe/London",
),
],
is_schedule_active=is_schedule_active,
enforce_parameter_schema=True,
),
work_pool_name=default_work_pool,
print_next_steps_message=False,
)
Giacomo Chiarella
02/06/2025, 2:09 PMGiacomo Chiarella
02/07/2025, 8:12 AM