Sergey Shamsuyarov

    Sergey Shamsuyarov

    11 months ago
    Hi, I have a problem with prefect core. I dynamically create a cron clock with different parameters and add them to the schedule. When I try to run them in one flow, then if the execution time overlaps, then only the first is executed. I thought it was solved https://github.com/PrefectHQ/prefect/pull/3394, but i have same problem.
    def gen_prefect_cronclock(config, report_conf_path):
        clocks = []
        for report, cron in get_all_reports_cron_dict(report_conf_path).items():
            for tz, ppk_name_list in get_tz_ppk_dict(config).items():
                clocks.append(CronClock(cron,
                                        start_date=pendulum.datetime(
                                            1970, 1, 1, tz=tz),
                                        parameter_defaults={
                                            'report_path': report, 'ppk_name_list': ppk_name_list}
                                        )
                              )
        return clocks
    run flow
    with Flow("main-report-flow", main_schedule) as main_flow:
        conf = Parameter('config', default=config)
        report_config_path = Parameter('report_config_path',
                                       default=REPORT_CONF_PATH)
        report_path = Parameter('report_path', default="")
        ppk_name_list = Parameter('ppk_name_list', default=[])
        tasck_execute_send_report.map(unmapped(conf),
                                      unmapped(report_config_path),
                                      unmapped(report_path),
                                      ppk_name_list)
    Anna Geller

    Anna Geller

    11 months ago
    Hi @Sergey Shamsuyarov, I was trying to reproduce the issue but the parameters attached to the schedule are working properly. The following flow generates 2 flow runs every minute: one with the parameter Sergey, and one with parameter Anna.
    from prefect import Flow, Parameter, task
    from prefect.schedules.clocks import CronClock
    from prefect.schedules import Schedule
    import pendulum
    
    
    clock_1 = CronClock(
        "*/1 * * * *",
        start_date=pendulum.now(),
        parameter_defaults={"user_input": "Sergey"},
    )
    
    clock_2 = CronClock(
        "*/1 * * * *", start_date=pendulum.now(), parameter_defaults={"user_input": "Anna"}
    )
    
    schedule = Schedule(clocks=[clock_1, clock_2])
    
    
    @task(log_stdout=True)
    def hello_world(user_input: str):
        print(f"hello {user_input}")
    
    
    with Flow("test-flow", schedule=schedule) as flow:
        param = Parameter("user_input", default="Marvin")
        hw = hello_world(param)
    Can it be that perhaps your schedule doesn’t get attached properly to the flow object? The overlapping schedule doesn’t seem to be the issue here.
    Sergey Shamsuyarov

    Sergey Shamsuyarov

    11 months ago
    Hi, thanks for the help. I run your test. and get output, with only one parameter
    [2021-10-14 06:02:21+0000] INFO - prefect.test-flow | Waiting for next scheduled run at 2021-10-14T06:03:00+00:00
    [2021-10-14 06:03:00+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test-flow'
    [2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Starting task run...
    [2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Finished task run for task with final state: 'Success'
    [2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Starting task run...
    [2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | hello Anna
    [2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Finished task run for task with final state: 'Success'
    [2021-10-14 06:03:00+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2021-10-14 06:03:00+0000] INFO - prefect.test-flow | Waiting for next scheduled run at 2021-10-14T06:04:00+00:00
    [2021-10-14 06:04:00+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test-flow'
    [2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Starting task run...
    [2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Finished task run for task with final state: 'Success'
    [2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Starting task run...
    [2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | hello Anna
    [2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Finished task run for task with final state: 'Success'
    [2021-10-14 06:04:00+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2021-10-14 06:04:00+0000] INFO - prefect.test-flow | Waiting for next scheduled run at 2021-10-14T06:05:00+00:00
    i use prefect 0.15.1 in jupyterlab enviroment base on Ubuntu 20.04.3 LTS.
    Anna Geller

    Anna Geller

    11 months ago
    To test overlapping schedules, it’s best if you register your flow and check the output in Prefect Cloud or Server UI. The built-in scheduler that you get with Prefect Core is more of a convenience method than a real scheduler. You can find more about it here.
    Sergey Shamsuyarov

    Sergey Shamsuyarov

    11 months ago
    Ок thanks, i try up local server, but for prefect.core is it correct? When I create a clock with different start times the parameters from clock are transmitted normally
    Anna Geller

    Anna Geller

    11 months ago
    If this is the first time you want to deploy your Prefect Core flows, it’s much easier to get started with Prefect Cloud. You can sign up for a Starter or Standard tier here and your first 10000 successful tasks per month are for free so that you can try everything this way. Once you signed up, you can create an API key, authenticate with this API key, and you can start registering and scheduling your flows. More details on that are here.
    Sergey Shamsuyarov

    Sergey Shamsuyarov

    11 months ago
    yes i try my code with local agent, when i use flow.register in prefect cloud alweys work fine as in doc.