Thomas Furmston
10/18/2021, 9:49 AMimport logging
import pendulum
import prefect
from prefect import Flow, Parameter, task
from prefect.storage import Docker
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import StartFlowRun
logger = logging.getLogger(__name__)
weekday_schedule = CronSchedule(
'41 10 * * 1-5',
start_date=pendulum.now(tz='Europe/London')
)
@task
def calculate_flow_end_date(end_date: str):
if end_date is not None:
return end_date
return prefect.context.get('scheduled_start_time').to_date_string()
common_flow = StartFlowRun(
flow_name='already_existing_flow1',
project_name='my_project_name',
wait=True,
)
baseline_flow = StartFlowRun(
flow_name='already_existing_flow2',
project_name='my_project_name',
wait=True,
)
with Flow("my_scheduled_flow",
schedule=weekday_schedule,
storage=Docker(
base_image='my-docker-image:latest',
local_image=True,
)) as flow:
num_days_parameter = Parameter('num_days', default=1)
num_back_fill_days_parameter = Parameter('num_back_fill_days', default=1)
end_date_parameter = Parameter('end_date', default=None)
task_end_date = calculate_flow_end_date(end_date_parameter)
<http://logger.info|logger.info>('Task End Date: %s', task_end_date)
<http://logger.info|logger.info>('Num Days: %s', num_days_parameter)
<http://logger.info|logger.info>('Num Days Backfill: %s', num_back_fill_days_parameter)
common_flow_result = common_flow(parameters={
'num_days': num_days_parameter,
'num_back_fill_days': num_back_fill_days_parameter,
'end_date': task_end_date,
})
baseline_flow_result = baseline_flow(
upstream_tasks=[common_flow_result],
parameters={
'num_days': num_days_parameter,
'num_back_fill_days': num_back_fill_days_parameter,
'end_date': task_end_date,
}
)