Hi, in prefect 2.0 - is there a way to perform 'yi...
# prefect-community
m
Hi, in prefect 2.0 - is there a way to perform 'yields' - we have some (unfortunately) long running tasks where we stream some content, which takes a while (+30 min, very large zipped json files) - and I can't seem to figure out if I can yield partial result futures in my task incrementally and start processing them in down stream tasks. Currently, the work flow is: ā€¢ Stream read big annoying file into small chunks, save them in S3 incrementally in a separate thread inside the task, and return list of all the chunked file URI's as single result (= single
prefectfuture[list[str]])
object) ā—¦ this is where I would like to yield ā€¢ Run some down stream processing on each chunk ā—¦ Here we use future.result() to loop over and start each task - this works 100%, but if we could use yield in the first task, we could begin processing immediately Is something like this supported / going to be supported? I tried making it work with contextmanager and tasks, but ran into nothing but trouble
something like (pseudo-code):
Copy code
from prefect import flow, task
from ... import load_file
from concurrent.futures import ThreadpoolExecutor, as_completed

@task
@contextmanager
def stream_big_file():
    file_bytes = load_file()
    futures = []
    with ThreadpoolEexecutor() as executor():
        with mystreamer(file_bytes) as stream:
            for chunked_content in stream:
                chunk_uri: str = executor.submit(persist_func, chunked_content, ...)
                futures.append(chunk_uri)
    for future in as_completed(futures):
        yield future.result()


@task
def process_chunk(chunk_uri):
    ...

@flow
def yield_flow():
    uri_iterator = stream_big_file()
    process_chunk.map(uri_iterator) # should not be blocking
On a related note, is anything like
as_completed
from concurrent.futures supported for prefectfutures?
a
re as completed, check out this example from Michael https://discourse.prefect.io/t/how-to-get-finished-futures-as-soon-as-they-complete-in-a-non-blocking-way/790 how are you processing this file? e.g.
pandas
has a way of reading files in chunks so that you wouldn't have to rely on async or any Prefect functionality for that
and mapping is not there yet in 2.0, so you can't use this syntax yet:
Copy code
process_chunk.map(uri_iterator)
you would need a for loop
m
We read multiple 160Gb zipped json files - we can't change this logic, as the data format comes from third parties. No pandas at all - once we chunk the content out we persist it in raw datalake storage. Current limitation from our implementation is that we need to stream all 160gb first and then we can begin processing chunks in parallel (by using a loop over
list_of_chunks.result()
and passing the chunks to next task - I was hoping there was a way to get around this by yielding incremental future chunks from a task šŸ™‚ I don't want to get around the async functionality from prefect - I really like the future setup of 2.0
And thanks for the discourse link! That will do for now until an equivalent would become included in prefect šŸ˜ƒ
a
Gotcha. Thanks for this great and detailed explanation! And a really tough problem! I think you're right that the example from Michael seems like the right approach for now, but once we have mapping in 2.0, this will get easier
m
It is the right approach for
as_completed
- but as for my initial inquiry (single future of iterable, where yield would make a big difference) it does not help unfortunately - it is equivalent to my current implementation of
for chunk_result in future_list.result():
The current implementation works, it would just be nice to cut off 1 hour of processing per flow run to save cost (it adds up when you run many workers of size in k8s) Map would also not solve it I think - what I map over has to be an iterable of futures, but currently a task can only deliver a single future (to my knowledge) - this would be really powerful feature if yieldable was possible though, but maybe not so easy to implement (or maybe it is implemented and I just can not figure out how to do it!) Sorry for the confusion with two questions in a single thread! šŸ˜…
Is github issues the place for feature requests or is slack community the prefered way? I don't mind writing up something about this issue and how something like yield would help me in a more detailed way for evaluation as a feature
a
Great idea! If you could submit a feature request with a description of the problem that yield would solve for you, that would be awesome! You can do that here - just pick 2.0. Not sure if this is relevant, but you need to consider that task runs are stateless - while you may pass data from one task to downstream tasks, they shouldn't share stateful information with each other (which could be relevant when you build such logic to process one giant file)
šŸ™Œ 1
m
Thank you, I will do so šŸ™‚
šŸ‘ 1
šŸ™ 1
307 Views