I'm reaching the point of frustration with what yo...
# prefect-community
h
I'm reaching the point of frustration with what you should be a simple flow. I would like to be able to loop through an API until I get a result that has no data in it. However Prefect's mix of build time and run time have left me very confused about what I can and can't do.
Copy code
with Flow('event-export', schedule) as flow:

    api_key = EnvVarSecret('SOME_API_KEY')

    offset = get_last_offset()
    event_data = extract_email_event_data(api_key, offset)

    transformed_data = transform(event_data)
    send_to_firehose(transformed_data)
    update_last_offset(offset=offset+100)
I would like to be able to call the extract task for as long as necessary, incrementing the offset on each occasion, and I would like to transform and send each extracted set of events in one swoop. However I can't seem to figure out how to put this into a while loop with the result of the extract task to be my end condition. I.e if no events are left then stop the flow. I've already tried
LOOP
but this is no use to me because I don't want to wait until I get all the results back which could take hours.
c
Hi @Hawkar Mahmod! Yes, writing deferred code can have a steep learning curve - the code you’ve provided here looks perfectly valid and normal, what exactly are you running into?
h
Sorry @Chris White I pressed Enter too quickly and have since updated it.
👍 1
c
Prefect currently has a strict requirement that a task must finish before a downstream dependency can begin; moreover, as with all workflow tools, Prefect is currently based on a DAG model which means that a task can not alter it’s state based on something downstream (i.e., there can be no dependency loops). [Note: I say “currently” because we have some ideas on unique ways to relax these assumptions] So, based on the context you’ve provided here, I think you have three options: 1.) Put the while loop within the
extract_email_event_data
task and have it return all data 2.) Combine the
extract_email_event_data
and
transform
tasks into one task so that each chunk can be extracted and processed the moment it’s ready 3.) If you are orchestrating this flow via a Prefect backend, you can use the
FlowRunTask
in the task library to have each iteration of the run kick off a new run with the required
offset
as a parameter (defaults to 0 for the first run, and each subsequent run kicks off the next with a new offset value) If it were me, I’d prefer #3 so that each offset batch has it’s own run id and can be restarted / rerun independently
h
OK well that's actually quite reassuring. Means I wasn't completely losing my mind! 😄 I like the simplicity of 2. and it was probably my natural last choice. I was hesitant to go for it because it feels like it goes against the Prefect spirit of having each task do as atomic a unit of work as possible. But I understand given the current limitations it's one of the only ways round this. With 3. I presume this means abstracting this whole flow out and basically looping the flow as part of another Prefect process? Certainly a good suggestion. I might give that a go. Thank you for that guidance. I'll give those a try and see how I fare. Will keep you updated!
c
Sounds good!
s
@Chris White @Hawkar Mahmod I was wondering how to implement this in general — https://github.com/PrefectHQ/prefect/discussions/3074. What I settled on was for the “loop controller” task to trigger a subflow. (Going for something like Chris’ #3) This seems to work — at least in small tests. The problem with this is that the loop controller actively waits for the subtask to complete. I wonder if this is going to cause resource bottlenecks in more complex cases. A better solution would be a “coroutine” style task which could be “pending” in a loop, and that could be triggered an event (like the subtask results being available), but I don’t know how to implement this with current code.
c
Interesting yea; I can’t guarantee any timelines yet but complex run patterns like this are definitely on our radar