What is the best way of dealing with loops inside a flow?
I wrote a program which has to loop through a list of file names, scrape their xml, process each, write the results to a database.
This does not work. (All functions called are tasks)
Copy code
with Flow("epex_scraping", schedule=schedule) as flow:
"""Main definition of all Data pipeline steps"""
report_names = scrape_for_file_names()
for report_name in report_names:
# extract
report_xml = get_xml_files(report_name)
report_json = get_xml_jsons(report_xml)
# transform
public_trades_collection = generate_public_trades(report_json)
# load
write_to_public_trades_db(public_trades_collection)
flow.register('project_x')
flow.run()
A further explanation:
using the functional API, you’re declaring your flow’s structure, which is evaluated a build time. At flow run time, the structure is evaluated. So, in your example,
report_names
is not a list, it’s a `Task`’s result.
Dylan
10/30/2020, 5:41 PM
The
list
becomes available at runtime
Dylan
10/30/2020, 5:42 PM
Mapping is the way we use iterables to generate tasks at runtime
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.