Hello guys, quick questions about retries behaviou...
# ask-community
f
Hello guys, quick questions about retries behaviour. I have a terminal mapped task that has retry policy. The retries seem to work well but i'm a bit worried about the logs. The delay between each retries is 10 secondes and for testing purpose, it is a tiny mapping (tried with 6 children tasks) and the flow is launched locally from terminal. So, between each retries, there is time where there is no task to execute so prefect enters in a "waiting" state according to the log (
Waiting for next available Task run at 2022-04-27T10:00:35.571259+00:00
). After a few seconds, a new log message states:
Beginning Flow run for 'flow_name'
and i have all of the previous log message of the previous tasks, as if the flow was re-executed. Is it normal ? Is the flow really reexecuted ? When executing this flow from prefect cloud, i don't see those logs so i'm a bit confused.
a
Can you share the flow code?
f
Copy code
def main():
    with Flow("data_gov_cat_extraction") as flow:
        logger = prefect.context.get("logger")
        nb_record = Parameter("nb_record", default=4)
        limit_per_page = Parameter("limit_per_page", default=2)
        publisher_id = Parameter("publisher_id", default='data-gov')

        starz_api_key = PrefectSecret("STARZ_API_KEY")
        publisher_url = PrefectSecret("DATA_GOV_URL")() # The second brackets force execution, avoiding automatic dependencies
        starz_url = PrefectSecret("STARZ_URL")

        cond = need_all_records(nb_record)

        with case(cond, True):
            response = extract_data(0, publisher_url, 0)
            nb_record1 = get_total_count(response)

        nb_record = merge(nb_record1, nb_record)

        pages_index, nb_record_per_page = get_pages_index(nb_record, limit_per_page) # outpt : [list]
        response = extract_data.map(nb_record_per_page, unmapped(publisher_url), pages_index) # [[list], [list], ..., [list]]
        
        formatted_datas = transform.map(response, unmapped(publisher_id)) # [[list], [list], ..., [list]]
        
        data_to_retry = load_data.map(formatted_datas, unmapped(starz_api_key), unmapped(starz_url)) # [[smaller list], ..., [smaller list]]

        retry_post.map(flatten(data_to_retry), unmapped(starz_api_key), unmapped(starz_url))

    flow.executor = LocalDaskExecutor()
    # flow.register(project_name="Tutorial")
    flow.run(nb_record = 4, limit_per_page = 2, publisher_id = 'data-gov')
    # flow.visualize()
a
you can't do that in Prefect 1.0 - logging is only supported from tasks, so you would need to move any logs from flow to tasks:
Copy code
logger = prefect.context.get("logger")
you are simultaneously mapping over a list
nb_record_per_page
and over a list
pages_index
- is that intended? Regarding:
Copy code
flow.run(nb_record = 4, limit_per_page = 2, publisher_id = 'data-gov')
the syntax is meant to be a dictionary:
Copy code
flow.run(parameters=dict(nb_record=4, ...))
f
β€’ Oh ok i didn't know that thanks, i don't log inside the flow anyway so this line is useless β€’ yes the mapping over two list is intended, it's not a good practice ? maybe i should map over one list of tuple ? β€’ ok got it i will update
a
it's not a good practice ?
it's totally supported, just not that common and some people just forget the unmapped keyword - wanted to just confirm it was intentional LMK if using the updated syntax and removing the logger fixed the issue for you
f
Ok got it. I tried with the updated syntax and still have the weird log behaviour. But it seems that the flow isn't reexecuted considering the "printing" speed of logs
a
can you say more about the weird log behavior? can you share the logs that look weird to you?
which log level did you set? is it actually a DEBUG log message? you can adjust that:
Copy code
export PREFECT__LOGGING__LEVEL=INFO
f
Here are the full logs. Reading them, it seems that the whole flow is reexecuted between each retries but i assume it is not.
a
this log message means that retry fired - it only tells you that your task run failed and will retry as defined by your retry delay which seems like a totally useful log message
f
yes this one is ok. The one i don't understand is the one line 134 for example, stating that the whole flow run will be reexecuted
i'm sorry if this is not very clear
a
me neither! πŸ˜„ why a new flow run is triggered?! can you perhaps follow the standard Prefect convention and move the code block from main to the top-level scope and attach the executor this way?
Copy code
with Flow("data_gov_cat_extraction", executor=LocalDaskExecutor()) as flow:
    nb_record = Parameter("nb_record", default=4)
    limit_per_page = Parameter("limit_per_page", default=2)
    publisher_id = Parameter("publisher_id", default='data-gov')

    starz_api_key = PrefectSecret("STARZ_API_KEY")
    publisher_url = PrefectSecret("DATA_GOV_URL")() # The second brackets force execution, avoiding automatic dependencies
    starz_url = PrefectSecret("STARZ_URL")
    cond = need_all_records(nb_record)
    with case(cond, True):
        response = extract_data(0, publisher_url, 0)
        nb_record1 = get_total_count(response)
    nb_record = merge(nb_record1, nb_record)
    pages_index, nb_record_per_page = get_pages_index(nb_record, limit_per_page) # output : [list]
    response = extract_data.map(nb_record_per_page, unmapped(publisher_url), pages_index) # [[list], [list], ..., [list]]
    formatted_datas = transform.map(response, unmapped(publisher_id)) # [[list], [list], ..., [list]]
    data_to_retry = load_data.map(formatted_datas, unmapped(starz_api_key), unmapped(starz_url)) # [[smaller list], ..., [smaller list]]
    retry_post.map(flatten(data_to_retry), unmapped(starz_api_key), unmapped(starz_url))

if __name__ == '__main__':
    flow.run()
f
yep still having the same behaviour... I also want to point out the fact that logs don't behave like this when executing it from prefect cloud
a
what if you trigger the flow via CLI?
Copy code
prefect run -p your_local_flow_file.py
f
Same. But its printed so fast that it can't be reexecuted imo. I wa just wondering if this was normal to get this block printed multiple times.
Copy code
└── 14:34:18 | INFO    | Flow run RUNNING: terminal tasks are incomplete.
└── 14:34:18 | INFO    | Waiting for next available Task run at 2022-04-27T12:34:27.863268+00:00
└── 14:34:27 | INFO    | Beginning Flow run for 'data_gov_cat_extraction'
└── 14:34:27 | INFO    | Task 'DATA_GOV_URL': Starting task run...
└── 14:34:27 | INFO    | Task 'DATA_GOV_URL': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'extract_data': Starting task run...
└── 14:34:27 | INFO    | Task 'extract_data': Finished task run for task with final state: 'Mapped'
└── 14:34:27 | INFO    | Task 'transform': Starting task run...
└── 14:34:27 | INFO    | Task 'transform': Finished task run for task with final state: 'Mapped'
└── 14:34:27 | INFO    | Task 'STARZ_API_KEY': Starting task run...
└── 14:34:27 | INFO    | Task 'STARZ_URL': Starting task run...
└── 14:34:27 | INFO    | Task 'STARZ_API_KEY': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'STARZ_URL': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'load_data': Starting task run...
└── 14:34:27 | INFO    | Task 'load_data': Finished task run for task with final state: 'Mapped'
└── 14:34:27 | INFO    | Task 'retry_post': Starting task run...
└── 14:34:27 | INFO    | Task 'retry_post': Finished task run for task with final state: 'Mapped'
└── 14:34:27 | INFO    | Task 'extract_data[0]': Starting task run...
└── 14:34:27 | INFO    | Task 'extract_data[0]': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'extract_data[1]': Starting task run...
└── 14:34:27 | INFO    | Task 'transform[0]': Starting task run...
└── 14:34:27 | INFO    | Task 'extract_data[1]': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'transform[0]': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'transform[1]': Starting task run...
└── 14:34:27 | INFO    | Task 'transform[1]': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'load_data[0]': Starting task run...
└── 14:34:27 | INFO    | Task 'load_data[0]': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'load_data[1]': Starting task run...
└── 14:34:27 | INFO    | Task 'load_data[1]': Finished task run for task with final state: 'Success'
must be something in my code if i'm the only one πŸ˜…
a
do you happen to have
flow.run()
in your Flow constructor? can you remove any flow.run() from your code and just run it from the CLI:
Copy code
prefect run -p flow.py
f
no i did the changes as you said and putted the
flow.run()
in the
if __name__ == '__main__':
Copy code
with Flow("data_gov_cat_extraction", executor=LocalDaskExecutor()) as flow:
    nb_record = Parameter("nb_record", default=4)
    limit_per_page = Parameter("limit_per_page", default=2)
    publisher_id = Parameter("publisher_id", default='data-gov')
    starz_api_key = PrefectSecret("STARZ_API_KEY")
    publisher_url = PrefectSecret("DATA_GOV_URL")() # The second brackets force execution, avoiding automatic dependencies
    starz_url = PrefectSecret("STARZ_URL")
    cond = need_all_records(nb_record)
    with case(cond, True):
        response = extract_data(0, publisher_url, 0)
        nb_record1 = get_total_count(response)
    nb_record = merge(nb_record1, nb_record)
    pages_index, nb_record_per_page = get_pages_index(nb_record, limit_per_page) # outpt : [list]
    response = extract_data.map(nb_record_per_page, unmapped(publisher_url), pages_index) # [[list], [list], ..., [list]]
    formatted_datas = transform.map(response, unmapped(publisher_id)) # [[list], [list], ..., [list]]
    data_to_retry = load_data.map(formatted_datas, unmapped(starz_api_key), unmapped(starz_url)) # [[smaller list], ..., [smaller list]]
    retry_post.map(flatten(data_to_retry), unmapped(starz_api_key), unmapped(starz_url))

if __name__ == "__main__":
    flow.run()
a
I don't know why your flow run gets executed multiple times locally. Did you try upgrading Prefect and trying it all again from a new virtual environment? also can you share your full flow code? you can share via DM if you don't want to share publicly, perhaps there is something wrong in some other part of the flow definition
f
ok let me comment a bit for you and i'll send you this
πŸ‘ 1
k
can you add
run_on_schedule=False
to the
flow.run()
and see if the behavior is still there?
f
Behaviour still there with
flow.run(run_on_schedule=False)
from CLI
k
That is really really weird. Yeah hard to tell with the current code up there
f
yeah i sended the whole code to anna
k
Ah ok
a
not sure if this matters, but could you rename nb_record as shown here?
Copy code
with Flow("data_gov_cat_extraction", executor=LocalDaskExecutor()) as flow:
    nb_record_param = Parameter("nb_record", default=4)
    limit_per_page = Parameter("limit_per_page", default=2)
    publisher_id = Parameter("publisher_id", default="data-gov")

    starz_api_key = PrefectSecret("STARZ_API_KEY")
    publisher_url = PrefectSecret(
        "DATA_GOV_URL"
    )()  # The second brackets force execution, avoiding automatic dependencies
    starz_url = PrefectSecret("STARZ_URL")

    cond = need_all_records(nb_record_param)

    with case(cond, True):
        response = extract_data(0, publisher_url, 0)
        nb_record1 = get_total_count(response)

    nb_record = merge(nb_record1, nb_record_param)
the reason: you previously had two tasks with the same name nb_record - perhaps it led to some issue?
f
Doesn't change anything πŸ˜•
i'm feeling so bad taking so much of your time...
k
You can DM me the flow too, but if Anna can’t solve it, I likely can’t πŸ˜†
a
in that case it must be due to the event loop somehow not being closed when using async tasks. This will be much easier to do in 2.0
f
Yes i'm really looking forward to 2.0. And doesn't seems to really execute the flow every time. It is just showing up in the logs so i was a bit scared at first. Also this behaviour is not showing up in prefect cloud.
πŸ‘ 1
k
Yeah I read the code. I don’t have any suggestions beyond what Anna mentioned
f
Yeah no problem, u guys did a lot thanks !
just as a last info, i updated prefect to 1.2 and the issue is still there
πŸ‘ 1