Bennett Rand
11/30/2023, 7:55 PMBennett Rand
11/30/2023, 7:56 PM@task
def collapse_data(data_future_a, data_future_b):
base_data = big_expensive_process()
data_a = data_future_a.result()
data_b = data_future_b.result()
return base_data.join(data_a).join(data_b)
@flow
def main_flow():
data_future_a = data_query.submit(QUERY_A)
data_future_b = data_query.submit(QUERY_B)
data_table = collapse_data.submit(no_resolve(data_future_a), no_resolve(data_future_b))
save_data(data_table)
Nate
11/30/2023, 8:10 PMIn [1]: from prefect import flow, task
In [2]: from prefect.utilities.annotations import quote
In [3]: @task
...: def identity(x):
...: return x
...:
In [4]: @task
...: def foo(future):
...: print(type(future))
...: return future
...:
In [5]: @flow
...: def test():
...: fut = identity.submit(42)
...: foo(quote(fut))
...:
In [6]: test()
14:09:52.148 | INFO | prefect.engine - Created flow run 'smiling-collie' for flow 'test'
14:09:53.169 | INFO | Flow run 'smiling-collie' - Created task run 'foo-0' for task 'foo'
14:09:53.171 | INFO | Flow run 'smiling-collie' - Executing 'foo-0' immediately...
14:09:53.187 | INFO | Flow run 'smiling-collie' - Created task run 'identity-0' for task 'identity'
14:09:53.188 | INFO | Flow run 'smiling-collie' - Submitted task run 'identity-0' for execution.
<class 'prefect.futures.PrefectFuture'>
14:09:53.519 | INFO | Task run 'foo-0' - Finished in state Completed()
14:09:53.531 | INFO | Task run 'identity-0' - Finished in state Completed()
14:09:53.768 | INFO | Flow run 'smiling-collie' - Finished in state Completed('All states completed.')
Nate
11/30/2023, 8:11 PMBennett Rand
11/30/2023, 8:20 PM.map
that each contain a decent-size DataFrame. Instead of having all the results in memory at once, I'd rather concatenate them into a single dataframe as the parent tasks complete.Nate
11/30/2023, 8:34 PM.result()
on that future from inside the downstream task (since those futures are tracked in the FlowRunContext) so that sounds like it should work fine i thinkBennett Rand
11/30/2023, 8:43 PMquote
should be the answer, but it's not playing well with the DaskTaskRunner
Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
...
'Could not serialize object of type HighLevelGraph'