https://prefect.io logo
h

Hugo Slepicka

09/27/2021, 11:09 PM
Hi, I have a very newbie question related to objects returned by a task when composing a Flow. I have a couple of procedures that I want to execute and they return objects. I would like to pass to the subsequent procedure a parameter based on a property which is a member of the object returned by the first procedure. Something like the code below:
Copy code
import random
from prefect import task, Flow


class Generator:
    def __init__(self, data):
        self.particle = data


@task
def run_generator():
    return Generator(random.random())


@task
def compute(particle):
    print(particle**2)


with Flow('flow1') as flow:
    gen = run_generator()
    compute(gen.particle)
When I try to create the flow I get the following error:
Copy code
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/var/folders/7g/2_wk8ddx7h5_4r12flp9031m0000gn/T/ipykernel_3073/2988601640.py in <module>
      1 with Flow('flow1') as flow:
      2     gen = run_generator()
----> 3     val = compute(gen.particle)

AttributeError: 'FunctionTask' object has no attribute 'particle'
Is there any chance I can have the delayed execution of the parameter until the flow is executed? I know that changing
compute
to receive the object and handling the data access there works but not always I have the possibility to change the function and it would be bad in my case to create many wrappers for users of my library.
k

Kevin Kho

09/28/2021, 1:47 AM
Hey Hugo, you need to do something like this:
Copy code
import random
from prefect import task, Flow


class Generator:
    def __init__(self, data):
        self.particle = data


@task
def run_generator():
    return Generator(random.random())

@task
def compute(x: Generator):
    print(x.particle**2)


with Flow('flow1') as flow:
    gen = run_generator()
    compute(gen)

flow.run()
because
gen
is not a
Generator
until runtime when the value is filled. Because of the deferred execution, it is still of type
FunctionTask
which is why you get the error that there is no
particle
. Does that make sense?
h

Hugo Slepicka

09/28/2021, 1:57 AM
@Kevin Kho it makes sense… so I will need to make wrappers for all my functions which before required just the value to receive the object instead of the attribute?
Is there a way around it with parameter or something?
If I add the type annotation for the return type of
run_generator
would it be possible to bypass the check?
k

Kevin Kho

09/28/2021, 2:15 AM
Yes to the first question like this
Copy code
with Flow('flow1') as flow:
    gen = run_generator()
    particle = get_particle(gen)
    compute(particle)
no way around it because the object does not exist until run time so you can’t get the attribute. Type annotations are not read in Prefect (yet!).
1
h

Hugo Slepicka

09/28/2021, 4:29 AM
Sounds good! Thanks for the help!
7 Views