https://prefect.io logo
#prefect-community
Title
# prefect-community
s

Stephen Lloyd

04/08/2022, 12:43 PM
I wrote a naive flow that looks like:
Copy code
@task
def get_table() -> object:
  ... query table
  result: pandas.DataFrame = cursor.fetch_dataframe()
  return result

def load_to_s3(result: object) -> None:
  awswrangler.s3.to_csv(
    df=result,
    path='<s3://bucket/folder/table.csv>
  )

with Flow(...) as flow:
  get_data = get_table()
  save_data = load_to_s3(get_data)
  ...
I’d like to now extend this to somehow pass a list of tables from the same database to extract and load to s3. Is this possible given this simple template?
s

Sylvain Hazard

04/08/2022, 12:46 PM
Yes of course ! Off the top of my head, something like this would work I believe :
Copy code
with Flow() as flow:
   tables = get_tables() # Change the query accordingly
   save_data = load_to_s3.map(tables)
This will create a
load_to_s3
task for each table in
tables
. More information on this mechanism in the doc.
upvote 1
s

Stephen Lloyd

04/13/2022, 10:16 AM
Thanks!
6 Views