Hawkar Mahmod
09/02/2020, 5:09 PMwith Flow('event-export', schedule) as flow:
api_key = EnvVarSecret('SOME_API_KEY')
offset = get_last_offset()
event_data = extract_email_event_data(api_key, offset)
transformed_data = transform(event_data)
send_to_firehose(transformed_data)
update_last_offset(offset=offset+100)
I would like to be able to call the extract task for as long as necessary, incrementing the offset on each occasion, and I would like to transform and send each extracted set of events in one swoop. However I can't seem to figure out how to put this into a while loop with the result of the extract task to be my end condition. I.e if no events are left then stop the flow.
I've already tried LOOP
but this is no use to me because I don't want to wait until I get all the results back which could take hours.Chris White
09/02/2020, 5:11 PMHawkar Mahmod
09/02/2020, 5:12 PMChris White
09/02/2020, 5:20 PMextract_email_event_data
task and have it return all data
2.) Combine the extract_email_event_data
and transform
tasks into one task so that each chunk can be extracted and processed the moment it’s ready
3.) If you are orchestrating this flow via a Prefect backend, you can use the FlowRunTask
in the task library to have each iteration of the run kick off a new run with the required offset
as a parameter (defaults to 0 for the first run, and each subsequent run kicks off the next with a new offset value)
If it were me, I’d prefer #3 so that each offset batch has it’s own run id and can be restarted / rerun independentlyHawkar Mahmod
09/02/2020, 5:37 PMChris White
09/02/2020, 5:40 PMShaun Cutts
09/02/2020, 9:17 PMChris White
09/02/2020, 9:20 PM