Florian Guily
05/12/2022, 2:47 PM[2022-05-12 16:44:57+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'data_gov_cat_extraction'
[2022-05-12 16:44:57+0200] INFO - prefect.TaskRunner | Task 'is_incremental': Starting task run...
[2022-05-12 16:44:57+0200] INFO - prefect.TaskRunner | Task 'is_incremental': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:57+0200] INFO - prefect.TaskRunner | Task 'DATA_GOV_URL': Starting task run...
[2022-05-12 16:44:57+0200] INFO - prefect.TaskRunner | Task 'nb_record': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'limit_per_page': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'DATA_GOV_URL': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'nb_record': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'case(False)': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'limit_per_page': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "True" did not match "False"')
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'case(False)': Finished task run for task with final state: 'Skipped'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'need_all_records': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'get_last_date': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'need_all_records': Finished task run for task with final state: 'Skipped'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Skipped'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'extract_data': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'extract_data': Finished task run for task with final state: 'Skipped'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'get_total_count': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'get_total_count': Finished task run for task with final state: 'Skipped'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'get_last_date': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'get_last_date': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'Merge': Starting task run...
INFO:prefect.TaskRunner:Task 'Merge': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'extract_data': Starting task run...
INFO:prefect.TaskRunner:Task 'extract_data': Starting task run...
[2022-05-12 16:44:58+0200] INFO - prefect.TaskRunner | Task 'Merge': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'Merge': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:58+0200] INFO - prefect.extract_data | url : <https://catalog.data.gov/api/3/action/package_search?fq=metadata_created:[2022-05-03T00:00:00Z%20TO%20NOW]&rows=0&start=0>
INFO:prefect.extract_data:url : <https://catalog.data.gov/api/3/action/package_search?fq=metadata_created:[2022-05-03T00:00:00Z%20TO%20NOW]&rows=0&start=0>
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'extract_data': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'extract_data': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_total_count': Starting task run...
INFO:prefect.TaskRunner:Task 'get_total_count': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.get_total_count | Total record count matching this query is 501
INFO:prefect.get_total_count:Total record count matching this query is 501
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_total_count': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'get_total_count': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'Merge': Starting task run...
INFO:prefect.TaskRunner:Task 'Merge': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'Merge': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'Merge': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_pages_index': Starting task run...
INFO:prefect.TaskRunner:Task 'get_pages_index': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.get_pages_index | Number of page with 50 elements : 10
INFO:prefect.get_pages_index:Number of page with 50 elements : 10
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_pages_index': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'get_pages_index': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_pages_index[0]': Starting task run...
INFO:prefect.TaskRunner:Task 'get_pages_index[0]': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_pages_index[1]': Starting task run...
INFO:prefect.TaskRunner:Task 'get_pages_index[1]': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_pages_index[0]': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'get_pages_index[0]': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'get_pages_index[1]': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'get_pages_index[1]': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'extract_data': Starting task run...
INFO:prefect.TaskRunner:Task 'extract_data': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'extract_data': Finished task run for task with final state: 'Mapped'
INFO:prefect.TaskRunner:Task 'extract_data': Finished task run for task with final state: 'Mapped'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'publisher_id': Starting task run...
INFO:prefect.TaskRunner:Task 'publisher_id': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'publisher_id': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'publisher_id': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'transform': Starting task run...
INFO:prefect.TaskRunner:Task 'transform': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'transform': Finished task run for task with final state: 'Mapped'
INFO:prefect.TaskRunner:Task 'transform': Finished task run for task with final state: 'Mapped'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'STARZ_API_KEY': Starting task run...
INFO:prefect.TaskRunner:Task 'STARZ_API_KEY': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'STARZ_URL': Starting task run...
INFO:prefect.TaskRunner:Task 'STARZ_URL': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'STARZ_API_KEY': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'STARZ_API_KEY': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'STARZ_URL': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'STARZ_URL': Finished task run for task with final state: 'Success'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'load_data': Starting task run...
INFO:prefect.TaskRunner:Task 'load_data': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'load_data': Finished task run for task with final state: 'Mapped'
INFO:prefect.TaskRunner:Task 'load_data': Finished task run for task with final state: 'Mapped'
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'log_data': Starting task run...
INFO:prefect.TaskRunner:Task 'log_data': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'log_data': Starting task run...
INFO:prefect.TaskRunner:Task 'log_data': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'log_data': Starting task run...
INFO:prefect.TaskRunner:Task 'log_data': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.TaskRunner | Task 'set_new_date': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.log_data | data is : 2022-05-03
INFO:prefect.TaskRunner:Task 'set_new_date': Starting task run...
[2022-05-12 16:44:59+0200] INFO - prefect.log_data | data is : 501
[2022-05-12 16:44:59+0200] INFO - prefect.log_data | data is : 2022-05-03
INFO:prefect.log_data:data is : 2022-05-03
INFO:prefect.log_data:data is : 501
INFO:prefect.log_data:data is : 2022-05-03
get_last_date
(which contains the MySqlFetch
task) gets executed, the logs are duplicated.[2022-05-12 16:36:22+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'data_gov_cat_extraction'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'is_incremental': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'is_incremental': Finished task run for task with final state: 'Success'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'DATA_GOV_URL': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'limit_per_page': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'limit_per_page': Finished task run for task with final state: 'Success'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'nb_record': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'nb_record': Finished task run for task with final state: 'Success'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'DATA_GOV_URL': Finished task run for task with final state: 'Success'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'case(False)': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Success'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "True" did not match "False"')
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'case(False)': Finished task run for task with final state: 'Skipped'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'get_last_date': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'get_last_date': Finished task run for task with final state: 'Success'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'need_all_records': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'need_all_records': Finished task run for task with final state: 'Skipped'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'Merge': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'extract_data': Starting task run...
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'Merge': Finished task run for task with final state: 'Success'
[2022-05-12 16:36:23+0200] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
MySqlFetch
in the task get_last_date
Anna Geller
Florian Guily
05/12/2022, 3:05 PMMySqlFetch
in a task like:
@task
def get_last_date():
last_date = MySqlFetch(args).run()
return last_date
So this could be the issue ?Anna Geller
last_date = MySqlFetch(args)
with Flow() as flow:
last_date()
• you initialize the task (the task class) at the module scope
• then you call this initialized task (here last_date) in the Flow constructor - this then calls the .run() method automaticallyFlorian Guily
05/13/2022, 10:40 AMAnna Geller
flow.run()
called from the flow itself
you can also try reregistering your flow - LMK if this is still an open issueFlorian Guily
05/13/2022, 1:34 PM