Sergey Shamsuyarov
10/13/2021, 7:30 PMdef gen_prefect_cronclock(config, report_conf_path):
clocks = []
for report, cron in get_all_reports_cron_dict(report_conf_path).items():
for tz, ppk_name_list in get_tz_ppk_dict(config).items():
clocks.append(CronClock(cron,
start_date=pendulum.datetime(
1970, 1, 1, tz=tz),
parameter_defaults={
'report_path': report, 'ppk_name_list': ppk_name_list}
)
)
return clocks
run flow
with Flow("main-report-flow", main_schedule) as main_flow:
conf = Parameter('config', default=config)
report_config_path = Parameter('report_config_path',
default=REPORT_CONF_PATH)
report_path = Parameter('report_path', default="")
ppk_name_list = Parameter('ppk_name_list', default=[])
tasck_execute_send_report.map(unmapped(conf),
unmapped(report_config_path),
unmapped(report_path),
ppk_name_list)
Anna Geller
from prefect import Flow, Parameter, task
from prefect.schedules.clocks import CronClock
from prefect.schedules import Schedule
import pendulum
clock_1 = CronClock(
"*/1 * * * *",
start_date=pendulum.now(),
parameter_defaults={"user_input": "Sergey"},
)
clock_2 = CronClock(
"*/1 * * * *", start_date=pendulum.now(), parameter_defaults={"user_input": "Anna"}
)
schedule = Schedule(clocks=[clock_1, clock_2])
@task(log_stdout=True)
def hello_world(user_input: str):
print(f"hello {user_input}")
with Flow("test-flow", schedule=schedule) as flow:
param = Parameter("user_input", default="Marvin")
hw = hello_world(param)
Can it be that perhaps your schedule doesn’t get attached properly to the flow object? The overlapping schedule doesn’t seem to be the issue here.Sergey Shamsuyarov
10/14/2021, 6:11 AM[2021-10-14 06:02:21+0000] INFO - prefect.test-flow | Waiting for next scheduled run at 2021-10-14T06:03:00+00:00
[2021-10-14 06:03:00+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test-flow'
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Starting task run...
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Finished task run for task with final state: 'Success'
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Starting task run...
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | hello Anna
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Finished task run for task with final state: 'Success'
[2021-10-14 06:03:00+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-10-14 06:03:00+0000] INFO - prefect.test-flow | Waiting for next scheduled run at 2021-10-14T06:04:00+00:00
[2021-10-14 06:04:00+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test-flow'
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Starting task run...
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Finished task run for task with final state: 'Success'
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Starting task run...
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | hello Anna
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Finished task run for task with final state: 'Success'
[2021-10-14 06:04:00+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-10-14 06:04:00+0000] INFO - prefect.test-flow | Waiting for next scheduled run at 2021-10-14T06:05:00+00:00
i use prefect 0.15.1 in jupyterlab enviroment base on Ubuntu 20.04.3 LTS.Anna Geller
Sergey Shamsuyarov
10/14/2021, 11:18 AMAnna Geller
Sergey Shamsuyarov
10/14/2021, 2:01 PM