Seth Coussens
09/15/2022, 4:51 PMSerina
09/15/2022, 4:54 PMSeth Coussens
09/15/2022, 5:02 PMSerina
09/15/2022, 5:43 PMcreate_flow_run_from_deployment()
and pass in the custom params but I don’t think that would be more optimal. You could also create a block to do some check to see if the service has updated their data and that way the params could remain unchanged while the flow would ignore the services that hadn’t been updated yet. Those are some solutions off the top of my head anyway.Taylor Curran
09/15/2022, 5:50 PMfrom prefect import flow, get_run_logger, task
from prefect.client import get_client
@task
async def add_new_scheduled_run(depl_id, delta_hours=6, parameter_dict):
"""
This task adds a scheduled flow run x hours from now with a specific parameter dict
"""
# Get the time x hours from now.
scheduled_time = now.add(hours=delta_hours)
# Use Prefect get_client() to schedule a new flow run x hours from now
async with get_client() as client:
response = await client.create_flow_run_from_deployment(
deployment_id=depl_id, parameters=parameter_dict state=Scheduled(scheduled_time=scheduled_time)
)
logger = get_run_logger()
<http://logger.info|logger.info>(f"INFO get client response: {response}")
<http://logger.info|logger.info>(f"INFO Scheduled a flow run for {scheduled_time}!")
Essentially, in using create_flow_run_from_deployment()
you can add a new schedule with specified parameters to any deployment.Seth Coussens
09/15/2022, 6:16 PMTaylor Curran
09/15/2022, 6:22 PM