Florian Guily
04/27/2022, 10:08 AMWaiting for next available Task run at 2022-04-27T10:00:35.571259+00:00
). After a few seconds, a new log message states: Beginning Flow run for 'flow_name'
and i have all of the previous log message of the previous tasks, as if the flow was re-executed. Is it normal ? Is the flow really reexecuted ? When executing this flow from prefect cloud, i don't see those logs so i'm a bit confused.Anna Geller
04/27/2022, 10:31 AMFlorian Guily
04/27/2022, 11:44 AMdef main():
with Flow("data_gov_cat_extraction") as flow:
logger = prefect.context.get("logger")
nb_record = Parameter("nb_record", default=4)
limit_per_page = Parameter("limit_per_page", default=2)
publisher_id = Parameter("publisher_id", default='data-gov')
starz_api_key = PrefectSecret("STARZ_API_KEY")
publisher_url = PrefectSecret("DATA_GOV_URL")() # The second brackets force execution, avoiding automatic dependencies
starz_url = PrefectSecret("STARZ_URL")
cond = need_all_records(nb_record)
with case(cond, True):
response = extract_data(0, publisher_url, 0)
nb_record1 = get_total_count(response)
nb_record = merge(nb_record1, nb_record)
pages_index, nb_record_per_page = get_pages_index(nb_record, limit_per_page) # outpt : [list]
response = extract_data.map(nb_record_per_page, unmapped(publisher_url), pages_index) # [[list], [list], ..., [list]]
formatted_datas = transform.map(response, unmapped(publisher_id)) # [[list], [list], ..., [list]]
data_to_retry = load_data.map(formatted_datas, unmapped(starz_api_key), unmapped(starz_url)) # [[smaller list], ..., [smaller list]]
retry_post.map(flatten(data_to_retry), unmapped(starz_api_key), unmapped(starz_url))
flow.executor = LocalDaskExecutor()
# flow.register(project_name="Tutorial")
flow.run(nb_record = 4, limit_per_page = 2, publisher_id = 'data-gov')
# flow.visualize()
Anna Geller
04/27/2022, 11:50 AMlogger = prefect.context.get("logger")
you are simultaneously mapping over a list nb_record_per_page
and over a list pages_index
- is that intended?
Regarding:
flow.run(nb_record = 4, limit_per_page = 2, publisher_id = 'data-gov')
the syntax is meant to be a dictionary:
flow.run(parameters=dict(nb_record=4, ...))
Florian Guily
04/27/2022, 11:54 AMAnna Geller
04/27/2022, 12:07 PMit's not a good practice ?it's totally supported, just not that common and some people just forget the unmapped keyword - wanted to just confirm it was intentional LMK if using the updated syntax and removing the logger fixed the issue for you
Florian Guily
04/27/2022, 12:10 PMAnna Geller
04/27/2022, 12:13 PMexport PREFECT__LOGGING__LEVEL=INFO
Florian Guily
04/27/2022, 12:18 PMAnna Geller
04/27/2022, 12:18 PMFlorian Guily
04/27/2022, 12:21 PMAnna Geller
04/27/2022, 12:25 PMwith Flow("data_gov_cat_extraction", executor=LocalDaskExecutor()) as flow:
nb_record = Parameter("nb_record", default=4)
limit_per_page = Parameter("limit_per_page", default=2)
publisher_id = Parameter("publisher_id", default='data-gov')
starz_api_key = PrefectSecret("STARZ_API_KEY")
publisher_url = PrefectSecret("DATA_GOV_URL")() # The second brackets force execution, avoiding automatic dependencies
starz_url = PrefectSecret("STARZ_URL")
cond = need_all_records(nb_record)
with case(cond, True):
response = extract_data(0, publisher_url, 0)
nb_record1 = get_total_count(response)
nb_record = merge(nb_record1, nb_record)
pages_index, nb_record_per_page = get_pages_index(nb_record, limit_per_page) # output : [list]
response = extract_data.map(nb_record_per_page, unmapped(publisher_url), pages_index) # [[list], [list], ..., [list]]
formatted_datas = transform.map(response, unmapped(publisher_id)) # [[list], [list], ..., [list]]
data_to_retry = load_data.map(formatted_datas, unmapped(starz_api_key), unmapped(starz_url)) # [[smaller list], ..., [smaller list]]
retry_post.map(flatten(data_to_retry), unmapped(starz_api_key), unmapped(starz_url))
if __name__ == '__main__':
flow.run()
Florian Guily
04/27/2022, 12:29 PMAnna Geller
04/27/2022, 12:32 PMprefect run -p your_local_flow_file.py
Florian Guily
04/27/2022, 12:37 PMβββ 14:34:18 | INFO | Flow run RUNNING: terminal tasks are incomplete.
βββ 14:34:18 | INFO | Waiting for next available Task run at 2022-04-27T12:34:27.863268+00:00
βββ 14:34:27 | INFO | Beginning Flow run for 'data_gov_cat_extraction'
βββ 14:34:27 | INFO | Task 'DATA_GOV_URL': Starting task run...
βββ 14:34:27 | INFO | Task 'DATA_GOV_URL': Finished task run for task with final state: 'Success'
βββ 14:34:27 | INFO | Task 'extract_data': Starting task run...
βββ 14:34:27 | INFO | Task 'extract_data': Finished task run for task with final state: 'Mapped'
βββ 14:34:27 | INFO | Task 'transform': Starting task run...
βββ 14:34:27 | INFO | Task 'transform': Finished task run for task with final state: 'Mapped'
βββ 14:34:27 | INFO | Task 'STARZ_API_KEY': Starting task run...
βββ 14:34:27 | INFO | Task 'STARZ_URL': Starting task run...
βββ 14:34:27 | INFO | Task 'STARZ_API_KEY': Finished task run for task with final state: 'Success'
βββ 14:34:27 | INFO | Task 'STARZ_URL': Finished task run for task with final state: 'Success'
βββ 14:34:27 | INFO | Task 'load_data': Starting task run...
βββ 14:34:27 | INFO | Task 'load_data': Finished task run for task with final state: 'Mapped'
βββ 14:34:27 | INFO | Task 'retry_post': Starting task run...
βββ 14:34:27 | INFO | Task 'retry_post': Finished task run for task with final state: 'Mapped'
βββ 14:34:27 | INFO | Task 'extract_data[0]': Starting task run...
βββ 14:34:27 | INFO | Task 'extract_data[0]': Finished task run for task with final state: 'Success'
βββ 14:34:27 | INFO | Task 'extract_data[1]': Starting task run...
βββ 14:34:27 | INFO | Task 'transform[0]': Starting task run...
βββ 14:34:27 | INFO | Task 'extract_data[1]': Finished task run for task with final state: 'Success'
βββ 14:34:27 | INFO | Task 'transform[0]': Finished task run for task with final state: 'Success'
βββ 14:34:27 | INFO | Task 'transform[1]': Starting task run...
βββ 14:34:27 | INFO | Task 'transform[1]': Finished task run for task with final state: 'Success'
βββ 14:34:27 | INFO | Task 'load_data[0]': Starting task run...
βββ 14:34:27 | INFO | Task 'load_data[0]': Finished task run for task with final state: 'Success'
βββ 14:34:27 | INFO | Task 'load_data[1]': Starting task run...
βββ 14:34:27 | INFO | Task 'load_data[1]': Finished task run for task with final state: 'Success'
Anna Geller
04/27/2022, 12:46 PMflow.run()
in your Flow constructor? can you remove any flow.run() from your code and just run it from the CLI:
prefect run -p flow.py
Florian Guily
04/27/2022, 12:53 PMflow.run()
in the if __name__ == '__main__':
with Flow("data_gov_cat_extraction", executor=LocalDaskExecutor()) as flow:
nb_record = Parameter("nb_record", default=4)
limit_per_page = Parameter("limit_per_page", default=2)
publisher_id = Parameter("publisher_id", default='data-gov')
starz_api_key = PrefectSecret("STARZ_API_KEY")
publisher_url = PrefectSecret("DATA_GOV_URL")() # The second brackets force execution, avoiding automatic dependencies
starz_url = PrefectSecret("STARZ_URL")
cond = need_all_records(nb_record)
with case(cond, True):
response = extract_data(0, publisher_url, 0)
nb_record1 = get_total_count(response)
nb_record = merge(nb_record1, nb_record)
pages_index, nb_record_per_page = get_pages_index(nb_record, limit_per_page) # outpt : [list]
response = extract_data.map(nb_record_per_page, unmapped(publisher_url), pages_index) # [[list], [list], ..., [list]]
formatted_datas = transform.map(response, unmapped(publisher_id)) # [[list], [list], ..., [list]]
data_to_retry = load_data.map(formatted_datas, unmapped(starz_api_key), unmapped(starz_url)) # [[smaller list], ..., [smaller list]]
retry_post.map(flatten(data_to_retry), unmapped(starz_api_key), unmapped(starz_url))
if __name__ == "__main__":
flow.run()
Anna Geller
04/27/2022, 1:14 PMFlorian Guily
04/27/2022, 2:29 PMKevin Kho
04/27/2022, 2:45 PMrun_on_schedule=False
to the flow.run()
and see if the behavior is still there?Florian Guily
04/27/2022, 2:55 PMflow.run(run_on_schedule=False)
from CLIKevin Kho
04/27/2022, 2:58 PMFlorian Guily
04/27/2022, 2:59 PMKevin Kho
04/27/2022, 2:59 PMAnna Geller
04/27/2022, 3:11 PMwith Flow("data_gov_cat_extraction", executor=LocalDaskExecutor()) as flow:
nb_record_param = Parameter("nb_record", default=4)
limit_per_page = Parameter("limit_per_page", default=2)
publisher_id = Parameter("publisher_id", default="data-gov")
starz_api_key = PrefectSecret("STARZ_API_KEY")
publisher_url = PrefectSecret(
"DATA_GOV_URL"
)() # The second brackets force execution, avoiding automatic dependencies
starz_url = PrefectSecret("STARZ_URL")
cond = need_all_records(nb_record_param)
with case(cond, True):
response = extract_data(0, publisher_url, 0)
nb_record1 = get_total_count(response)
nb_record = merge(nb_record1, nb_record_param)
the reason: you previously had two tasks with the same name nb_record - perhaps it led to some issue?Florian Guily
04/27/2022, 3:16 PMKevin Kho
04/27/2022, 3:18 PMAnna Geller
04/27/2022, 3:18 PMFlorian Guily
04/27/2022, 3:21 PMKevin Kho
04/27/2022, 3:22 PMFlorian Guily
04/27/2022, 3:24 PM