Hi, I have a very newbie question related to objects returned by a task when composing a Flow. I hav...
h

Hugo Slepicka

almost 4 years ago
Hi, I have a very newbie question related to objects returned by a task when composing a Flow. I have a couple of procedures that I want to execute and they return objects. I would like to pass to the subsequent procedure a parameter based on a property which is a member of the object returned by the first procedure. Something like the code below:
import random
from prefect import task, Flow


class Generator:
    def __init__(self, data):
        self.particle = data


@task
def run_generator():
    return Generator(random.random())


@task
def compute(particle):
    print(particle**2)


with Flow('flow1') as flow:
    gen = run_generator()
    compute(gen.particle)
When I try to create the flow I get the following error:
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/var/folders/7g/2_wk8ddx7h5_4r12flp9031m0000gn/T/ipykernel_3073/2988601640.py in <module>
      1 with Flow('flow1') as flow:
      2     gen = run_generator()
----> 3     val = compute(gen.particle)

AttributeError: 'FunctionTask' object has no attribute 'particle'
Is there any chance I can have the delayed execution of the parameter until the flow is executed? I know that changing
compute
to receive the object and handling the data access there works but not always I have the possibility to change the function and it would be bad in my case to create many wrappers for users of my library.