Is there a way to interact with futures inside of ...
# ask-community
b
Is there a way to interact with futures inside of a task without them automatically resolving to their respective results?
E.G.
Copy code
@task
def collapse_data(data_future_a, data_future_b):
    base_data = big_expensive_process()
    data_a = data_future_a.result()
    data_b = data_future_b.result()

    return base_data.join(data_a).join(data_b)

@flow
def main_flow():
    data_future_a = data_query.submit(QUERY_A)
    data_future_b = data_query.submit(QUERY_B)
    data_table = collapse_data.submit(no_resolve(data_future_a), no_resolve(data_future_b))

    save_data(data_table)
n
like this?
Copy code
In [1]: from prefect import flow, task

In [2]: from prefect.utilities.annotations import quote

In [3]: @task
   ...: def identity(x):
   ...:     return x
   ...:

In [4]: @task
   ...: def foo(future):
   ...:     print(type(future))
   ...:     return future
   ...:

In [5]: @flow
   ...: def test():
   ...:     fut = identity.submit(42)
   ...:     foo(quote(fut))
   ...:

In [6]: test()
14:09:52.148 | INFO    | prefect.engine - Created flow run 'smiling-collie' for flow 'test'
14:09:53.169 | INFO    | Flow run 'smiling-collie' - Created task run 'foo-0' for task 'foo'
14:09:53.171 | INFO    | Flow run 'smiling-collie' - Executing 'foo-0' immediately...
14:09:53.187 | INFO    | Flow run 'smiling-collie' - Created task run 'identity-0' for task 'identity'
14:09:53.188 | INFO    | Flow run 'smiling-collie' - Submitted task run 'identity-0' for execution.
<class 'prefect.futures.PrefectFuture'>
14:09:53.519 | INFO    | Task run 'foo-0' - Finished in state Completed()
14:09:53.531 | INFO    | Task run 'identity-0' - Finished in state Completed()
14:09:53.768 | INFO    | Flow run 'smiling-collie' - Finished in state Completed('All states completed.')
i'm not totally sure if that would cause weirdness if you had wait_for involved here and downstream stuff whats your motivation here?
b
the argument is a list of futures stemming from a
.map
that each contain a decent-size DataFrame. Instead of having all the results in memory at once, I'd rather concatenate them into a single dataframe as the parent tasks complete.
n
gotcha, yeah you should be able to call
.result()
on that future from inside the downstream task (since those futures are tracked in the FlowRunContext) so that sounds like it should work fine i think
b
Based upon the docs, it does seem like
quote
should be the answer, but it's not playing well with the
DaskTaskRunner
Copy code
Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
...
'Could not serialize object of type HighLevelGraph'