https://prefect.io logo
Title
f

Florian Guily

05/12/2022, 2:47 PM
hello i'm experiencing a weird log behaviour when using a MySqlFetch task in a local env (tried with and without CLI). I have a task that does an SQL query to retrieve a parameter value. As soon as this task is executed, the logs are kind of duplicated. I tried with the call of MySqlFetch and without inside the task and the logs are not duplicated when i don't use it. I add the sample logs below
[2022-05-12 16:44:57+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'data_gov_cat_extraction'
[2022-05-12 16:44:57+0200] INFO - prefect.TaskRunner | Task 'is_incremental': Starting task run...
[2022-05-12 16:44:57+0200] INFO - prefect.TaskRunner | Task 'is_incremental': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:57+0200] INFO - prefect.TaskRunner | Task 'DATA_GOV_URL': Starting task run...
[2022-05-12 16:44:57+0200] INFO - prefect.TaskRunner | Task 'nb_record': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'limit_per_page': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'DATA_GOV_URL': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'nb_record': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'case(False)': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'limit_per_page': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "True" did not match "False"')
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'case(False)': Finished task run for task with final state: 'Skipped'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'need_all_records': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'get_last_date': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'need_all_records': Finished task run for task with final state: 'Skipped'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Skipped'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'extract_data': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'extract_data': Finished task run for task with final state: 'Skipped'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'get_total_count': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'get_total_count': Finished task run for task with final state: 'Skipped'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'get_last_date': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'get_last_date': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'Merge': Starting task run...
INFO:prefect.TaskRunner:Task 'Merge': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'extract_data': Starting task run...
INFO:prefect.TaskRunner:Task 'extract_data': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'Merge': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'Merge': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:58+0200] INFO - prefect.extract_data | url : <https://catalog.data.gov/api/3/action/package_search?fq=metadata_created:[2022-05-03T00:00:00Z%20TO%20NOW]&rows=0&start=0>
INFO:prefect.extract_data:url : <https://catalog.data.gov/api/3/action/package_search?fq=metadata_created:[2022-05-03T00:00:00Z%20TO%20NOW]&rows=0&start=0>
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'extract_data': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'extract_data': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_total_count': Starting task run...
INFO:prefect.TaskRunner:Task 'get_total_count': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.get_total_count | Total record count matching this query is 501
INFO:prefect.get_total_count:Total record count matching this query is 501
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_total_count': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'get_total_count': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'Merge': Starting task run...
INFO:prefect.TaskRunner:Task 'Merge': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'Merge': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'Merge': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_pages_index': Starting task run...
INFO:prefect.TaskRunner:Task 'get_pages_index': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.get_pages_index | Number of page with 50 elements : 10
INFO:prefect.get_pages_index:Number of page with 50 elements : 10
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_pages_index': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'get_pages_index': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_pages_index[0]': Starting task run...
INFO:prefect.TaskRunner:Task 'get_pages_index[0]': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_pages_index[1]': Starting task run...
INFO:prefect.TaskRunner:Task 'get_pages_index[1]': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_pages_index[0]': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'get_pages_index[0]': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_pages_index[1]': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'get_pages_index[1]': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'extract_data': Starting task run...
INFO:prefect.TaskRunner:Task 'extract_data': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'extract_data': Finished task run for task with final state: 'Mapped'
INFO:prefect.TaskRunner:Task 'extract_data': Finished task run for task with final state: 'Mapped'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'publisher_id': Starting task run...
INFO:prefect.TaskRunner:Task 'publisher_id': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'publisher_id': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'publisher_id': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'transform': Starting task run...
INFO:prefect.TaskRunner:Task 'transform': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'transform': Finished task run for task with final state: 'Mapped'
INFO:prefect.TaskRunner:Task 'transform': Finished task run for task with final state: 'Mapped'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'STARZ_API_KEY': Starting task run...
INFO:prefect.TaskRunner:Task 'STARZ_API_KEY': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'STARZ_URL': Starting task run...
INFO:prefect.TaskRunner:Task 'STARZ_URL': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'STARZ_API_KEY': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'STARZ_API_KEY': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'STARZ_URL': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'STARZ_URL': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'load_data': Starting task run...
INFO:prefect.TaskRunner:Task 'load_data': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'load_data': Finished task run for task with final state: 'Mapped'
INFO:prefect.TaskRunner:Task 'load_data': Finished task run for task with final state: 'Mapped'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'log_data': Starting task run...
INFO:prefect.TaskRunner:Task 'log_data': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'log_data': Starting task run...
INFO:prefect.TaskRunner:Task 'log_data': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'log_data': Starting task run...
INFO:prefect.TaskRunner:Task 'log_data': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'set_new_date': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.log_data | data is : 2022-05-03
INFO:prefect.TaskRunner:Task 'set_new_date': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.log_data | data is : 501
[2022-05-12 16:44:59+0200] INFO - prefect.log_data | data is : 2022-05-03
INFO:prefect.log_data:data is : 2022-05-03
INFO:prefect.log_data:data is : 501
INFO:prefect.log_data:data is : 2022-05-03
Notice that as soon as task
get_last_date
(which contains the
MySqlFetch
task) gets executed, the logs are duplicated.
[2022-05-12 16:36:22+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'data_gov_cat_extraction'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'is_incremental': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'is_incremental': Finished task run for task with final state: 'Success'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'DATA_GOV_URL': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'limit_per_page': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'limit_per_page': Finished task run for task with final state: 'Success'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'nb_record': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'nb_record': Finished task run for task with final state: 'Success'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'DATA_GOV_URL': Finished task run for task with final state: 'Success'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'case(False)': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Success'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "True" did not match "False"')
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'case(False)': Finished task run for task with final state: 'Skipped'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'get_last_date': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'get_last_date': Finished task run for task with final state: 'Success'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'need_all_records': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'need_all_records': Finished task run for task with final state: 'Skipped'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'Merge': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'extract_data': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'Merge': Finished task run for task with final state: 'Success'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
without the call to
MySqlFetch
in the task
get_last_date
a

Anna Geller

05/12/2022, 3:01 PM
duplicated logs often occur when you have flow.run() somewhere in your code - can you share a full flow code example? ideally a simple example we could use to reproduce the issue, no sensitive info
f

Florian Guily

05/12/2022, 3:05 PM
yes i do a run when calling
MySqlFetch
in a task like:
@task
def get_last_date():
    last_date = MySqlFetch(args).run()
    return last_date
So this could be the issue ?
i did this because the task was not execute without doing a run
a

Anna Geller

05/12/2022, 6:34 PM
thanks for sharing more - usually the syntax is:
last_date = MySqlFetch(args)

with Flow() as flow:
    last_date()
• you initialize the task (the task class) at the module scope • then you call this initialized task (here last_date) in the Flow constructor - this then calls the .run() method automatically
f

Florian Guily

05/13/2022, 10:40 AM
Tried with the usual syntax but still have the duplicated logs when the task ends. This is always after the log message that says that the task succeded, like in the sample of logs i provided
a

Anna Geller

05/13/2022, 11:09 AM
is this solved now? I saw your message here
usually, when logs are occurring twice this is some misconfiguration in the flow, either in the logger, or
flow.run()
called from the flow itself you can also try reregistering your flow - LMK if this is still an open issue
f

Florian Guily

05/13/2022, 1:34 PM
yeah no duplication in cloud. Not a big deal in the end.
👍 1