Thread
#prefect-community
    Florian Guily

    Florian Guily

    5 months ago
    Hello guys, quick questions about retries behaviour. I have a terminal mapped task that has retry policy. The retries seem to work well but i'm a bit worried about the logs. The delay between each retries is 10 secondes and for testing purpose, it is a tiny mapping (tried with 6 children tasks) and the flow is launched locally from terminal. So, between each retries, there is time where there is no task to execute so prefect enters in a "waiting" state according to the log (
    Waiting for next available Task run at 2022-04-27T10:00:35.571259+00:00
    ). After a few seconds, a new log message states:
    Beginning Flow run for 'flow_name'
    and i have all of the previous log message of the previous tasks, as if the flow was re-executed. Is it normal ? Is the flow really reexecuted ? When executing this flow from prefect cloud, i don't see those logs so i'm a bit confused.
    Anna Geller

    Anna Geller

    5 months ago
    Can you share the flow code?
    Florian Guily

    Florian Guily

    5 months ago
    def main():
        with Flow("data_gov_cat_extraction") as flow:
            logger = prefect.context.get("logger")
            nb_record = Parameter("nb_record", default=4)
            limit_per_page = Parameter("limit_per_page", default=2)
            publisher_id = Parameter("publisher_id", default='data-gov')
    
            starz_api_key = PrefectSecret("STARZ_API_KEY")
            publisher_url = PrefectSecret("DATA_GOV_URL")() # The second brackets force execution, avoiding automatic dependencies
            starz_url = PrefectSecret("STARZ_URL")
    
            cond = need_all_records(nb_record)
    
            with case(cond, True):
                response = extract_data(0, publisher_url, 0)
                nb_record1 = get_total_count(response)
    
            nb_record = merge(nb_record1, nb_record)
    
            pages_index, nb_record_per_page = get_pages_index(nb_record, limit_per_page) # outpt : [list]
            response = extract_data.map(nb_record_per_page, unmapped(publisher_url), pages_index) # [[list], [list], ..., [list]]
            
            formatted_datas = transform.map(response, unmapped(publisher_id)) # [[list], [list], ..., [list]]
            
            data_to_retry = load_data.map(formatted_datas, unmapped(starz_api_key), unmapped(starz_url)) # [[smaller list], ..., [smaller list]]
    
            retry_post.map(flatten(data_to_retry), unmapped(starz_api_key), unmapped(starz_url))
    
        flow.executor = LocalDaskExecutor()
        # flow.register(project_name="Tutorial")
        flow.run(nb_record = 4, limit_per_page = 2, publisher_id = 'data-gov')
        # flow.visualize()
    Anna Geller

    Anna Geller

    5 months ago
    you can't do that in Prefect 1.0 - logging is only supported from tasks, so you would need to move any logs from flow to tasks:
    logger = prefect.context.get("logger")
    you are simultaneously mapping over a list
    nb_record_per_page
    and over a list
    pages_index
    - is that intended? Regarding:
    flow.run(nb_record = 4, limit_per_page = 2, publisher_id = 'data-gov')
    the syntax is meant to be a dictionary:
    flow.run(parameters=dict(nb_record=4, ...))
    Florian Guily

    Florian Guily

    5 months ago
    β€’ Oh ok i didn't know that thanks, i don't log inside the flow anyway so this line is useless β€’ yes the mapping over two list is intended, it's not a good practice ? maybe i should map over one list of tuple ? β€’ ok got it i will update
    Anna Geller

    Anna Geller

    5 months ago
    it's not a good practice ?
    it's totally supported, just not that common and some people just forget the unmapped keyword - wanted to just confirm it was intentional LMK if using the updated syntax and removing the logger fixed the issue for you
    Florian Guily

    Florian Guily

    5 months ago
    Ok got it. I tried with the updated syntax and still have the weird log behaviour. But it seems that the flow isn't reexecuted considering the "printing" speed of logs
    Anna Geller

    Anna Geller

    5 months ago
    can you say more about the weird log behavior? can you share the logs that look weird to you?
    which log level did you set? is it actually a DEBUG log message? you can adjust that:
    export PREFECT__LOGGING__LEVEL=INFO
    Florian Guily

    Florian Guily

    5 months ago
    Here are the full logs. Reading them, it seems that the whole flow is reexecuted between each retries but i assume it is not.
    Anna Geller

    Anna Geller

    5 months ago
    this log message means that retry fired - it only tells you that your task run failed and will retry as defined by your retry delay which seems like a totally useful log message
    Florian Guily

    Florian Guily

    5 months ago
    yes this one is ok. The one i don't understand is the one line 134 for example, stating that the whole flow run will be reexecuted
    i'm sorry if this is not very clear
    Anna Geller

    Anna Geller

    5 months ago
    me neither! πŸ˜„ why a new flow run is triggered?! can you perhaps follow the standard Prefect convention and move the code block from main to the top-level scope and attach the executor this way?
    with Flow("data_gov_cat_extraction", executor=LocalDaskExecutor()) as flow:
        nb_record = Parameter("nb_record", default=4)
        limit_per_page = Parameter("limit_per_page", default=2)
        publisher_id = Parameter("publisher_id", default='data-gov')
    
        starz_api_key = PrefectSecret("STARZ_API_KEY")
        publisher_url = PrefectSecret("DATA_GOV_URL")() # The second brackets force execution, avoiding automatic dependencies
        starz_url = PrefectSecret("STARZ_URL")
        cond = need_all_records(nb_record)
        with case(cond, True):
            response = extract_data(0, publisher_url, 0)
            nb_record1 = get_total_count(response)
        nb_record = merge(nb_record1, nb_record)
        pages_index, nb_record_per_page = get_pages_index(nb_record, limit_per_page) # output : [list]
        response = extract_data.map(nb_record_per_page, unmapped(publisher_url), pages_index) # [[list], [list], ..., [list]]
        formatted_datas = transform.map(response, unmapped(publisher_id)) # [[list], [list], ..., [list]]
        data_to_retry = load_data.map(formatted_datas, unmapped(starz_api_key), unmapped(starz_url)) # [[smaller list], ..., [smaller list]]
        retry_post.map(flatten(data_to_retry), unmapped(starz_api_key), unmapped(starz_url))
    
    if __name__ == '__main__':
        flow.run()
    Florian Guily

    Florian Guily

    5 months ago
    yep still having the same behaviour... I also want to point out the fact that logs don't behave like this when executing it from prefect cloud
    Anna Geller

    Anna Geller

    5 months ago
    what if you trigger the flow via CLI?
    prefect run -p your_local_flow_file.py
    Florian Guily

    Florian Guily

    5 months ago
    Same. But its printed so fast that it can't be reexecuted imo. I wa just wondering if this was normal to get this block printed multiple times.
    └── 14:34:18 | INFO    | Flow run RUNNING: terminal tasks are incomplete.
    └── 14:34:18 | INFO    | Waiting for next available Task run at 2022-04-27T12:34:27.863268+00:00
    └── 14:34:27 | INFO    | Beginning Flow run for 'data_gov_cat_extraction'
    └── 14:34:27 | INFO    | Task 'DATA_GOV_URL': Starting task run...
    └── 14:34:27 | INFO    | Task 'DATA_GOV_URL': Finished task run for task with final state: 'Success'
    └── 14:34:27 | INFO    | Task 'extract_data': Starting task run...
    └── 14:34:27 | INFO    | Task 'extract_data': Finished task run for task with final state: 'Mapped'
    └── 14:34:27 | INFO    | Task 'transform': Starting task run...
    └── 14:34:27 | INFO    | Task 'transform': Finished task run for task with final state: 'Mapped'
    └── 14:34:27 | INFO    | Task 'STARZ_API_KEY': Starting task run...
    └── 14:34:27 | INFO    | Task 'STARZ_URL': Starting task run...
    └── 14:34:27 | INFO    | Task 'STARZ_API_KEY': Finished task run for task with final state: 'Success'
    └── 14:34:27 | INFO    | Task 'STARZ_URL': Finished task run for task with final state: 'Success'
    └── 14:34:27 | INFO    | Task 'load_data': Starting task run...
    └── 14:34:27 | INFO    | Task 'load_data': Finished task run for task with final state: 'Mapped'
    └── 14:34:27 | INFO    | Task 'retry_post': Starting task run...
    └── 14:34:27 | INFO    | Task 'retry_post': Finished task run for task with final state: 'Mapped'
    └── 14:34:27 | INFO    | Task 'extract_data[0]': Starting task run...
    └── 14:34:27 | INFO    | Task 'extract_data[0]': Finished task run for task with final state: 'Success'
    └── 14:34:27 | INFO    | Task 'extract_data[1]': Starting task run...
    └── 14:34:27 | INFO    | Task 'transform[0]': Starting task run...
    └── 14:34:27 | INFO    | Task 'extract_data[1]': Finished task run for task with final state: 'Success'
    └── 14:34:27 | INFO    | Task 'transform[0]': Finished task run for task with final state: 'Success'
    └── 14:34:27 | INFO    | Task 'transform[1]': Starting task run...
    └── 14:34:27 | INFO    | Task 'transform[1]': Finished task run for task with final state: 'Success'
    └── 14:34:27 | INFO    | Task 'load_data[0]': Starting task run...
    └── 14:34:27 | INFO    | Task 'load_data[0]': Finished task run for task with final state: 'Success'
    └── 14:34:27 | INFO    | Task 'load_data[1]': Starting task run...
    └── 14:34:27 | INFO    | Task 'load_data[1]': Finished task run for task with final state: 'Success'
    must be something in my code if i'm the only one πŸ˜…
    Anna Geller

    Anna Geller

    5 months ago
    do you happen to have
    flow.run()
    in your Flow constructor? can you remove any flow.run() from your code and just run it from the CLI:
    prefect run -p flow.py
    Florian Guily

    Florian Guily

    5 months ago
    no i did the changes as you said and putted the
    flow.run()
    in the
    if __name__ == '__main__':
    with Flow("data_gov_cat_extraction", executor=LocalDaskExecutor()) as flow:
        nb_record = Parameter("nb_record", default=4)
        limit_per_page = Parameter("limit_per_page", default=2)
        publisher_id = Parameter("publisher_id", default='data-gov')
        starz_api_key = PrefectSecret("STARZ_API_KEY")
        publisher_url = PrefectSecret("DATA_GOV_URL")() # The second brackets force execution, avoiding automatic dependencies
        starz_url = PrefectSecret("STARZ_URL")
        cond = need_all_records(nb_record)
        with case(cond, True):
            response = extract_data(0, publisher_url, 0)
            nb_record1 = get_total_count(response)
        nb_record = merge(nb_record1, nb_record)
        pages_index, nb_record_per_page = get_pages_index(nb_record, limit_per_page) # outpt : [list]
        response = extract_data.map(nb_record_per_page, unmapped(publisher_url), pages_index) # [[list], [list], ..., [list]]
        formatted_datas = transform.map(response, unmapped(publisher_id)) # [[list], [list], ..., [list]]
        data_to_retry = load_data.map(formatted_datas, unmapped(starz_api_key), unmapped(starz_url)) # [[smaller list], ..., [smaller list]]
        retry_post.map(flatten(data_to_retry), unmapped(starz_api_key), unmapped(starz_url))
    
    if __name__ == "__main__":
        flow.run()
    Anna Geller

    Anna Geller

    5 months ago
    I don't know why your flow run gets executed multiple times locally. Did you try upgrading Prefect and trying it all again from a new virtual environment? also can you share your full flow code? you can share via DM if you don't want to share publicly, perhaps there is something wrong in some other part of the flow definition
    Florian Guily

    Florian Guily

    5 months ago
    ok let me comment a bit for you and i'll send you this
    Kevin Kho

    Kevin Kho

    5 months ago
    can you add
    run_on_schedule=False
    to the
    flow.run()
    and see if the behavior is still there?
    Florian Guily

    Florian Guily

    4 months ago
    Behaviour still there with
    flow.run(run_on_schedule=False)
    from CLI
    Kevin Kho

    Kevin Kho

    4 months ago
    That is really really weird. Yeah hard to tell with the current code up there
    Florian Guily

    Florian Guily

    4 months ago
    yeah i sended the whole code to anna
    Kevin Kho

    Kevin Kho

    4 months ago
    Ah ok
    Anna Geller

    Anna Geller

    4 months ago
    not sure if this matters, but could you rename nb_record as shown here?
    with Flow("data_gov_cat_extraction", executor=LocalDaskExecutor()) as flow:
        nb_record_param = Parameter("nb_record", default=4)
        limit_per_page = Parameter("limit_per_page", default=2)
        publisher_id = Parameter("publisher_id", default="data-gov")
    
        starz_api_key = PrefectSecret("STARZ_API_KEY")
        publisher_url = PrefectSecret(
            "DATA_GOV_URL"
        )()  # The second brackets force execution, avoiding automatic dependencies
        starz_url = PrefectSecret("STARZ_URL")
    
        cond = need_all_records(nb_record_param)
    
        with case(cond, True):
            response = extract_data(0, publisher_url, 0)
            nb_record1 = get_total_count(response)
    
        nb_record = merge(nb_record1, nb_record_param)
    the reason: you previously had two tasks with the same name nb_record - perhaps it led to some issue?
    Florian Guily

    Florian Guily

    4 months ago
    Doesn't change anything πŸ˜•
    i'm feeling so bad taking so much of your time...
    Kevin Kho

    Kevin Kho

    4 months ago
    You can DM me the flow too, but if Anna can’t solve it, I likely can’t πŸ˜†
    Anna Geller

    Anna Geller

    4 months ago
    in that case it must be due to the event loop somehow not being closed when using async tasks. This will be much easier to do in 2.0
    Florian Guily

    Florian Guily

    4 months ago
    Yes i'm really looking forward to 2.0. And doesn't seems to really execute the flow every time. It is just showing up in the logs so i was a bit scared at first. Also this behaviour is not showing up in prefect cloud.
    Kevin Kho

    Kevin Kho

    4 months ago
    Yeah I read the code. I don’t have any suggestions beyond what Anna mentioned
    Florian Guily

    Florian Guily

    4 months ago
    Yeah no problem, u guys did a lot thanks !
    just as a last info, i updated prefect to 1.2 and the issue is still there