Malthe Karbo
04/22/2022, 12:30 PMprefectfuture[list[str]])
object)
⦠this is where I would like to yield
⢠Run some down stream processing on each chunk
⦠Here we use future.result() to loop over and start each task - this works 100%, but if we could use yield in the first task, we could begin processing immediately
Is something like this supported / going to be supported? I tried making it work with contextmanager and tasks, but ran into nothing but troubleMalthe Karbo
04/22/2022, 12:38 PMfrom prefect import flow, task
from ... import load_file
from concurrent.futures import ThreadpoolExecutor, as_completed
@task
@contextmanager
def stream_big_file():
file_bytes = load_file()
futures = []
with ThreadpoolEexecutor() as executor():
with mystreamer(file_bytes) as stream:
for chunked_content in stream:
chunk_uri: str = executor.submit(persist_func, chunked_content, ...)
futures.append(chunk_uri)
for future in as_completed(futures):
yield future.result()
@task
def process_chunk(chunk_uri):
...
@flow
def yield_flow():
uri_iterator = stream_big_file()
process_chunk.map(uri_iterator) # should not be blocking
Malthe Karbo
04/22/2022, 12:54 PMas_completed
from concurrent.futures supported for prefectfutures?Anna Geller
pandas
has a way of reading files in chunks so that you wouldn't have to rely on async or any Prefect functionality for thatAnna Geller
process_chunk.map(uri_iterator)
you would need a for loopMalthe Karbo
04/22/2022, 1:44 PMlist_of_chunks.result()
and passing the chunks to next task - I was hoping there was a way to get around this by yielding incremental future chunks from a task š
I don't want to get around the async functionality from prefect - I really like the future setup of 2.0Malthe Karbo
04/22/2022, 1:52 PMAnna Geller
Malthe Karbo
04/22/2022, 3:59 PMas_completed
- but as for my initial inquiry (single future of iterable, where yield would make a big difference) it does not help unfortunately - it is equivalent to my current implementation of for chunk_result in future_list.result():
The current implementation works, it would just be nice to cut off 1 hour of processing per flow run to save cost (it adds up when you run many workers of size in k8s)
Map would also not solve it I think - what I map over has to be an iterable of futures, but currently a task can only deliver a single future (to my knowledge) - this would be really powerful feature if yieldable was possible though, but maybe not so easy to implement (or maybe it is implemented and I just can not figure out how to do it!)
Sorry for the confusion with two questions in a single thread! š
Malthe Karbo
04/22/2022, 4:00 PMAnna Geller
Malthe Karbo
04/22/2022, 6:04 PMMalthe Karbo
04/22/2022, 7:16 PM