Stephen Lloyd
04/08/2022, 12:43 PM@task
def get_table() -> object:
... query table
result: pandas.DataFrame = cursor.fetch_dataframe()
return result
def load_to_s3(result: object) -> None:
awswrangler.s3.to_csv(
df=result,
path='<s3://bucket/folder/table.csv>
)
with Flow(...) as flow:
get_data = get_table()
save_data = load_to_s3(get_data)
...
I’d like to now extend this to somehow pass a list of tables from the same database to extract and load to s3.
Is this possible given this simple template?Sylvain Hazard
04/08/2022, 12:46 PMwith Flow() as flow:
tables = get_tables() # Change the query accordingly
save_data = load_to_s3.map(tables)
This will create a load_to_s3
task for each table in tables
. More information on this mechanism in the doc.Stephen Lloyd
04/13/2022, 10:16 AM