Hi, I have a problem with prefect core. I dynamica...
# ask-community
s
Hi, I have a problem with prefect core. I dynamically create a cron clock with different parameters and add them to the schedule. When I try to run them in one flow, then if the execution time overlaps, then only the first is executed. I thought it was solved https://github.com/PrefectHQ/prefect/pull/3394, but i have same problem.
Copy code
def gen_prefect_cronclock(config, report_conf_path):
    clocks = []
    for report, cron in get_all_reports_cron_dict(report_conf_path).items():
        for tz, ppk_name_list in get_tz_ppk_dict(config).items():
            clocks.append(CronClock(cron,
                                    start_date=pendulum.datetime(
                                        1970, 1, 1, tz=tz),
                                    parameter_defaults={
                                        'report_path': report, 'ppk_name_list': ppk_name_list}
                                    )
                          )
    return clocks
run flow
Copy code
with Flow("main-report-flow", main_schedule) as main_flow:
    conf = Parameter('config', default=config)
    report_config_path = Parameter('report_config_path',
                                   default=REPORT_CONF_PATH)
    report_path = Parameter('report_path', default="")
    ppk_name_list = Parameter('ppk_name_list', default=[])
    tasck_execute_send_report.map(unmapped(conf),
                                  unmapped(report_config_path),
                                  unmapped(report_path),
                                  ppk_name_list)
a
Hi @Sergey Shamsuyarov, I was trying to reproduce the issue but the parameters attached to the schedule are working properly. The following flow generates 2 flow runs every minute: one with the parameter Sergey, and one with parameter Anna.
Copy code
from prefect import Flow, Parameter, task
from prefect.schedules.clocks import CronClock
from prefect.schedules import Schedule
import pendulum


clock_1 = CronClock(
    "*/1 * * * *",
    start_date=pendulum.now(),
    parameter_defaults={"user_input": "Sergey"},
)

clock_2 = CronClock(
    "*/1 * * * *", start_date=pendulum.now(), parameter_defaults={"user_input": "Anna"}
)

schedule = Schedule(clocks=[clock_1, clock_2])


@task(log_stdout=True)
def hello_world(user_input: str):
    print(f"hello {user_input}")


with Flow("test-flow", schedule=schedule) as flow:
    param = Parameter("user_input", default="Marvin")
    hw = hello_world(param)
Can it be that perhaps your schedule doesn’t get attached properly to the flow object? The overlapping schedule doesn’t seem to be the issue here.
s
Hi, thanks for the help. I run your test. and get output, with only one parameter
Copy code
[2021-10-14 06:02:21+0000] INFO - prefect.test-flow | Waiting for next scheduled run at 2021-10-14T06:03:00+00:00
[2021-10-14 06:03:00+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test-flow'
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Starting task run...
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Finished task run for task with final state: 'Success'
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Starting task run...
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | hello Anna
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Finished task run for task with final state: 'Success'
[2021-10-14 06:03:00+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-10-14 06:03:00+0000] INFO - prefect.test-flow | Waiting for next scheduled run at 2021-10-14T06:04:00+00:00
[2021-10-14 06:04:00+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test-flow'
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Starting task run...
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Finished task run for task with final state: 'Success'
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Starting task run...
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | hello Anna
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Finished task run for task with final state: 'Success'
[2021-10-14 06:04:00+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-10-14 06:04:00+0000] INFO - prefect.test-flow | Waiting for next scheduled run at 2021-10-14T06:05:00+00:00
i use prefect 0.15.1 in jupyterlab enviroment base on Ubuntu 20.04.3 LTS.
a
To test overlapping schedules, it’s best if you register your flow and check the output in Prefect Cloud or Server UI. The built-in scheduler that you get with Prefect Core is more of a convenience method than a real scheduler. You can find more about it here.
s
Ок thanks, i try up local server, but for prefect.core is it correct? When I create a clock with different start times the parameters from clock are transmitted normally
a
If this is the first time you want to deploy your Prefect Core flows, it’s much easier to get started with Prefect Cloud. You can sign up for a Starter or Standard tier here and your first 10000 successful tasks per month are for free so that you can try everything this way. Once you signed up, you can create an API key, authenticate with this API key, and you can start registering and scheduling your flows. More details on that are here.
s
yes i try my code with local agent, when i use flow.register in prefect cloud alweys work fine as in doc.
👍 1