Hugo Slepicka
09/27/2021, 11:09 PMimport random
from prefect import task, Flow
class Generator:
def __init__(self, data):
self.particle = data
@task
def run_generator():
return Generator(random.random())
@task
def compute(particle):
print(particle**2)
with Flow('flow1') as flow:
gen = run_generator()
compute(gen.particle)
When I try to create the flow I get the following error:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/var/folders/7g/2_wk8ddx7h5_4r12flp9031m0000gn/T/ipykernel_3073/2988601640.py in <module>
1 with Flow('flow1') as flow:
2 gen = run_generator()
----> 3 val = compute(gen.particle)
AttributeError: 'FunctionTask' object has no attribute 'particle'
Is there any chance I can have the delayed execution of the parameter until the flow is executed?
I know that changing compute
to receive the object and handling the data access there works but not always I have the possibility to change the function and it would be bad in my case to create many wrappers for users of my library.Kevin Kho
import random
from prefect import task, Flow
class Generator:
def __init__(self, data):
self.particle = data
@task
def run_generator():
return Generator(random.random())
@task
def compute(x: Generator):
print(x.particle**2)
with Flow('flow1') as flow:
gen = run_generator()
compute(gen)
flow.run()
because gen
is not a Generator
until runtime when the value is filled. Because of the deferred execution, it is still of type FunctionTask
which is why you get the error that there is no particle
. Does that make sense?Hugo Slepicka
09/28/2021, 1:57 AMrun_generator
would it be possible to bypass the check?Kevin Kho
with Flow('flow1') as flow:
gen = run_generator()
particle = get_particle(gen)
compute(particle)
no way around it because the object does not exist until run time so you can’t get the attribute. Type annotations are not read in Prefect (yet!).Hugo Slepicka
09/28/2021, 4:29 AM