Hi, I have a very newbie question related to objects returned by a task when composing a Flow.
I have a couple of procedures that I want to execute and they return objects.
I would like to pass to the subsequent procedure a parameter based on a property which is a member of the object returned by the first procedure.
Something like the code below:
import random
from prefect import task, Flow
class Generator:
def __init__(self, data):
self.particle = data
@task
def run_generator():
return Generator(random.random())
@task
def compute(particle):
print(particle**2)
with Flow('flow1') as flow:
gen = run_generator()
compute(gen.particle)
When I try to create the flow I get the following error:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/var/folders/7g/2_wk8ddx7h5_4r12flp9031m0000gn/T/ipykernel_3073/2988601640.py in <module>
1 with Flow('flow1') as flow:
2 gen = run_generator()
----> 3 val = compute(gen.particle)
AttributeError: 'FunctionTask' object has no attribute 'particle'
Is there any chance I can have the delayed execution of the parameter until the flow is executed?
I know that changing
compute
to receive the object and handling the data access there works but not always I have the possibility to change the function and it would be bad in my case to create many wrappers for users of my library.