https://prefect.io logo
#prefect-community
Title
# prefect-community
f

Florian Guily

04/27/2022, 10:08 AM
Hello guys, quick questions about retries behaviour. I have a terminal mapped task that has retry policy. The retries seem to work well but i'm a bit worried about the logs. The delay between each retries is 10 secondes and for testing purpose, it is a tiny mapping (tried with 6 children tasks) and the flow is launched locally from terminal. So, between each retries, there is time where there is no task to execute so prefect enters in a "waiting" state according to the log (
Waiting for next available Task run at 2022-04-27T10:00:35.571259+00:00
). After a few seconds, a new log message states:
Beginning Flow run for 'flow_name'
and i have all of the previous log message of the previous tasks, as if the flow was re-executed. Is it normal ? Is the flow really reexecuted ? When executing this flow from prefect cloud, i don't see those logs so i'm a bit confused.
a

Anna Geller

04/27/2022, 10:31 AM
Can you share the flow code?
f

Florian Guily

04/27/2022, 11:44 AM
Copy code
def main():
    with Flow("data_gov_cat_extraction") as flow:
        logger = prefect.context.get("logger")
        nb_record = Parameter("nb_record", default=4)
        limit_per_page = Parameter("limit_per_page", default=2)
        publisher_id = Parameter("publisher_id", default='data-gov')

        starz_api_key = PrefectSecret("STARZ_API_KEY")
        publisher_url = PrefectSecret("DATA_GOV_URL")() # The second brackets force execution, avoiding automatic dependencies
        starz_url = PrefectSecret("STARZ_URL")

        cond = need_all_records(nb_record)

        with case(cond, True):
            response = extract_data(0, publisher_url, 0)
            nb_record1 = get_total_count(response)

        nb_record = merge(nb_record1, nb_record)

        pages_index, nb_record_per_page = get_pages_index(nb_record, limit_per_page) # outpt : [list]
        response = extract_data.map(nb_record_per_page, unmapped(publisher_url), pages_index) # [[list], [list], ..., [list]]
        
        formatted_datas = transform.map(response, unmapped(publisher_id)) # [[list], [list], ..., [list]]
        
        data_to_retry = load_data.map(formatted_datas, unmapped(starz_api_key), unmapped(starz_url)) # [[smaller list], ..., [smaller list]]

        retry_post.map(flatten(data_to_retry), unmapped(starz_api_key), unmapped(starz_url))

    flow.executor = LocalDaskExecutor()
    # flow.register(project_name="Tutorial")
    flow.run(nb_record = 4, limit_per_page = 2, publisher_id = 'data-gov')
    # flow.visualize()
a

Anna Geller

04/27/2022, 11:50 AM
you can't do that in Prefect 1.0 - logging is only supported from tasks, so you would need to move any logs from flow to tasks:
Copy code
logger = prefect.context.get("logger")
you are simultaneously mapping over a list
nb_record_per_page
and over a list
pages_index
- is that intended? Regarding:
Copy code
flow.run(nb_record = 4, limit_per_page = 2, publisher_id = 'data-gov')
the syntax is meant to be a dictionary:
Copy code
flow.run(parameters=dict(nb_record=4, ...))
f

Florian Guily

04/27/2022, 11:54 AM
β€’ Oh ok i didn't know that thanks, i don't log inside the flow anyway so this line is useless β€’ yes the mapping over two list is intended, it's not a good practice ? maybe i should map over one list of tuple ? β€’ ok got it i will update
a

Anna Geller

04/27/2022, 12:07 PM
it's not a good practice ?
it's totally supported, just not that common and some people just forget the unmapped keyword - wanted to just confirm it was intentional LMK if using the updated syntax and removing the logger fixed the issue for you
f

Florian Guily

04/27/2022, 12:10 PM
Ok got it. I tried with the updated syntax and still have the weird log behaviour. But it seems that the flow isn't reexecuted considering the "printing" speed of logs
a

Anna Geller

04/27/2022, 12:13 PM
can you say more about the weird log behavior? can you share the logs that look weird to you?
which log level did you set? is it actually a DEBUG log message? you can adjust that:
Copy code
export PREFECT__LOGGING__LEVEL=INFO
f

Florian Guily

04/27/2022, 12:18 PM
Here are the full logs. Reading them, it seems that the whole flow is reexecuted between each retries but i assume it is not.
a

Anna Geller

04/27/2022, 12:18 PM
this log message means that retry fired - it only tells you that your task run failed and will retry as defined by your retry delay which seems like a totally useful log message
f

Florian Guily

04/27/2022, 12:21 PM
yes this one is ok. The one i don't understand is the one line 134 for example, stating that the whole flow run will be reexecuted
i'm sorry if this is not very clear
a

Anna Geller

04/27/2022, 12:25 PM
me neither! πŸ˜„ why a new flow run is triggered?! can you perhaps follow the standard Prefect convention and move the code block from main to the top-level scope and attach the executor this way?
Copy code
with Flow("data_gov_cat_extraction", executor=LocalDaskExecutor()) as flow:
    nb_record = Parameter("nb_record", default=4)
    limit_per_page = Parameter("limit_per_page", default=2)
    publisher_id = Parameter("publisher_id", default='data-gov')

    starz_api_key = PrefectSecret("STARZ_API_KEY")
    publisher_url = PrefectSecret("DATA_GOV_URL")() # The second brackets force execution, avoiding automatic dependencies
    starz_url = PrefectSecret("STARZ_URL")
    cond = need_all_records(nb_record)
    with case(cond, True):
        response = extract_data(0, publisher_url, 0)
        nb_record1 = get_total_count(response)
    nb_record = merge(nb_record1, nb_record)
    pages_index, nb_record_per_page = get_pages_index(nb_record, limit_per_page) # output : [list]
    response = extract_data.map(nb_record_per_page, unmapped(publisher_url), pages_index) # [[list], [list], ..., [list]]
    formatted_datas = transform.map(response, unmapped(publisher_id)) # [[list], [list], ..., [list]]
    data_to_retry = load_data.map(formatted_datas, unmapped(starz_api_key), unmapped(starz_url)) # [[smaller list], ..., [smaller list]]
    retry_post.map(flatten(data_to_retry), unmapped(starz_api_key), unmapped(starz_url))

if __name__ == '__main__':
    flow.run()
f

Florian Guily

04/27/2022, 12:29 PM
yep still having the same behaviour... I also want to point out the fact that logs don't behave like this when executing it from prefect cloud
a

Anna Geller

04/27/2022, 12:32 PM
what if you trigger the flow via CLI?
Copy code
prefect run -p your_local_flow_file.py
f

Florian Guily

04/27/2022, 12:37 PM
Same. But its printed so fast that it can't be reexecuted imo. I wa just wondering if this was normal to get this block printed multiple times.
Copy code
└── 14:34:18 | INFO    | Flow run RUNNING: terminal tasks are incomplete.
└── 14:34:18 | INFO    | Waiting for next available Task run at 2022-04-27T12:34:27.863268+00:00
└── 14:34:27 | INFO    | Beginning Flow run for 'data_gov_cat_extraction'
└── 14:34:27 | INFO    | Task 'DATA_GOV_URL': Starting task run...
└── 14:34:27 | INFO    | Task 'DATA_GOV_URL': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'extract_data': Starting task run...
└── 14:34:27 | INFO    | Task 'extract_data': Finished task run for task with final state: 'Mapped'
└── 14:34:27 | INFO    | Task 'transform': Starting task run...
└── 14:34:27 | INFO    | Task 'transform': Finished task run for task with final state: 'Mapped'
└── 14:34:27 | INFO    | Task 'STARZ_API_KEY': Starting task run...
└── 14:34:27 | INFO    | Task 'STARZ_URL': Starting task run...
└── 14:34:27 | INFO    | Task 'STARZ_API_KEY': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'STARZ_URL': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'load_data': Starting task run...
└── 14:34:27 | INFO    | Task 'load_data': Finished task run for task with final state: 'Mapped'
└── 14:34:27 | INFO    | Task 'retry_post': Starting task run...
└── 14:34:27 | INFO    | Task 'retry_post': Finished task run for task with final state: 'Mapped'
└── 14:34:27 | INFO    | Task 'extract_data[0]': Starting task run...
└── 14:34:27 | INFO    | Task 'extract_data[0]': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'extract_data[1]': Starting task run...
└── 14:34:27 | INFO    | Task 'transform[0]': Starting task run...
└── 14:34:27 | INFO    | Task 'extract_data[1]': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'transform[0]': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'transform[1]': Starting task run...
└── 14:34:27 | INFO    | Task 'transform[1]': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'load_data[0]': Starting task run...
└── 14:34:27 | INFO    | Task 'load_data[0]': Finished task run for task with final state: 'Success'
└── 14:34:27 | INFO    | Task 'load_data[1]': Starting task run...
└── 14:34:27 | INFO    | Task 'load_data[1]': Finished task run for task with final state: 'Success'
must be something in my code if i'm the only one πŸ˜…
a

Anna Geller

04/27/2022, 12:46 PM
do you happen to have
flow.run()
in your Flow constructor? can you remove any flow.run() from your code and just run it from the CLI:
Copy code
prefect run -p flow.py
f

Florian Guily

04/27/2022, 12:53 PM
no i did the changes as you said and putted the
flow.run()
in the
if __name__ == '__main__':
Copy code
with Flow("data_gov_cat_extraction", executor=LocalDaskExecutor()) as flow:
    nb_record = Parameter("nb_record", default=4)
    limit_per_page = Parameter("limit_per_page", default=2)
    publisher_id = Parameter("publisher_id", default='data-gov')
    starz_api_key = PrefectSecret("STARZ_API_KEY")
    publisher_url = PrefectSecret("DATA_GOV_URL")() # The second brackets force execution, avoiding automatic dependencies
    starz_url = PrefectSecret("STARZ_URL")
    cond = need_all_records(nb_record)
    with case(cond, True):
        response = extract_data(0, publisher_url, 0)
        nb_record1 = get_total_count(response)
    nb_record = merge(nb_record1, nb_record)
    pages_index, nb_record_per_page = get_pages_index(nb_record, limit_per_page) # outpt : [list]
    response = extract_data.map(nb_record_per_page, unmapped(publisher_url), pages_index) # [[list], [list], ..., [list]]
    formatted_datas = transform.map(response, unmapped(publisher_id)) # [[list], [list], ..., [list]]
    data_to_retry = load_data.map(formatted_datas, unmapped(starz_api_key), unmapped(starz_url)) # [[smaller list], ..., [smaller list]]
    retry_post.map(flatten(data_to_retry), unmapped(starz_api_key), unmapped(starz_url))

if __name__ == "__main__":
    flow.run()
a

Anna Geller

04/27/2022, 1:14 PM
I don't know why your flow run gets executed multiple times locally. Did you try upgrading Prefect and trying it all again from a new virtual environment? also can you share your full flow code? you can share via DM if you don't want to share publicly, perhaps there is something wrong in some other part of the flow definition
f

Florian Guily

04/27/2022, 2:29 PM
ok let me comment a bit for you and i'll send you this
πŸ‘ 1
k

Kevin Kho

04/27/2022, 2:45 PM
can you add
run_on_schedule=False
to the
flow.run()
and see if the behavior is still there?
f

Florian Guily

04/27/2022, 2:55 PM
Behaviour still there with
flow.run(run_on_schedule=False)
from CLI
k

Kevin Kho

04/27/2022, 2:58 PM
That is really really weird. Yeah hard to tell with the current code up there
f

Florian Guily

04/27/2022, 2:59 PM
yeah i sended the whole code to anna
k

Kevin Kho

04/27/2022, 2:59 PM
Ah ok
a

Anna Geller

04/27/2022, 3:11 PM
not sure if this matters, but could you rename nb_record as shown here?
Copy code
with Flow("data_gov_cat_extraction", executor=LocalDaskExecutor()) as flow:
    nb_record_param = Parameter("nb_record", default=4)
    limit_per_page = Parameter("limit_per_page", default=2)
    publisher_id = Parameter("publisher_id", default="data-gov")

    starz_api_key = PrefectSecret("STARZ_API_KEY")
    publisher_url = PrefectSecret(
        "DATA_GOV_URL"
    )()  # The second brackets force execution, avoiding automatic dependencies
    starz_url = PrefectSecret("STARZ_URL")

    cond = need_all_records(nb_record_param)

    with case(cond, True):
        response = extract_data(0, publisher_url, 0)
        nb_record1 = get_total_count(response)

    nb_record = merge(nb_record1, nb_record_param)
the reason: you previously had two tasks with the same name nb_record - perhaps it led to some issue?
f

Florian Guily

04/27/2022, 3:16 PM
Doesn't change anything πŸ˜•
i'm feeling so bad taking so much of your time...
k

Kevin Kho

04/27/2022, 3:18 PM
You can DM me the flow too, but if Anna can’t solve it, I likely can’t πŸ˜†
a

Anna Geller

04/27/2022, 3:18 PM
in that case it must be due to the event loop somehow not being closed when using async tasks. This will be much easier to do in 2.0
f

Florian Guily

04/27/2022, 3:21 PM
Yes i'm really looking forward to 2.0. And doesn't seems to really execute the flow every time. It is just showing up in the logs so i was a bit scared at first. Also this behaviour is not showing up in prefect cloud.
πŸ‘ 1
k

Kevin Kho

04/27/2022, 3:22 PM
Yeah I read the code. I don’t have any suggestions beyond what Anna mentioned
f

Florian Guily

04/27/2022, 3:24 PM
Yeah no problem, u guys did a lot thanks !
just as a last info, i updated prefect to 1.2 and the issue is still there
πŸ‘ 1
7 Views