https://prefect.io logo
e

Edison A

10/30/2020, 5:34 PM
What is the best way of dealing with loops inside a flow? I wrote a program which has to loop through a list of file names, scrape their xml, process each, write the results to a database. This does not work. (All functions called are tasks)
Copy code
with Flow("epex_scraping", schedule=schedule) as flow:
    """Main definition of all Data pipeline steps"""
    report_names = scrape_for_file_names()
    for report_name in report_names:
        # extract
        report_xml = get_xml_files(report_name)
        report_json = get_xml_jsons(report_xml)
        # transform
        public_trades_collection = generate_public_trades(report_json)
        # load
        write_to_public_trades_db(public_trades_collection)

flow.register('project_x')
flow.run()
d

Dylan

10/30/2020, 5:39 PM
Hi @Edison A Welcome to the Prefect Community!
👍 1
Prefect can generate dynamic tasks at runtime using mapping: https://docs.prefect.io/core/concepts/mapping.html
A further explanation: using the functional API, you’re declaring your flow’s structure, which is evaluated a build time. At flow run time, the structure is evaluated. So, in your example,
report_names
is not a list, it’s a `Task`’s result.
The
list
becomes available at runtime
Mapping is the way we use iterables to generate tasks at runtime
e

Edison A

10/31/2020, 8:43 AM
Thanks @Dylan. This is very helpful
marvin 1
2 Views