Robert Holmes
04/07/2022, 1:38 PMdef prefect_flow():
with Flow('cloud_reporting_etl') as flow:
for column, value in jobs_df.iterrows():
job_name = value['Job Name']
query = value['Query']
filename = value['Filename']
extracted = extract_data(f"""{query}""")
load_data_to_s3(extracted, filename)
return flow
Kevin Kho
load_data_to_s3
a task?
The first issue is that more of these needs to be converted to task. The end will look something like this
with Flow(..) as flow:
col, val = some_task_to_return_rows(jobs_df)
job_name = retrieve_val.map(val, 'Job Name')
query = retrieve_val.map(val, 'Query')
filename = retrieve_val.map(val, 'Filename)
extracted = extract_data.map(query) # can't use f-string
load_data_to_s3.map(extracted, filename)
return flow
where retrieve_val
is something like
@task
def retrieve_val(value, field):
return value[field]
Kevin Kho
Robert Holmes
04/07/2022, 3:25 PM