Hugo Slepicka

    Hugo Slepicka

    11 months ago
    Hi, I have a very newbie question related to objects returned by a task when composing a Flow. I have a couple of procedures that I want to execute and they return objects. I would like to pass to the subsequent procedure a parameter based on a property which is a member of the object returned by the first procedure. Something like the code below:
    import random
    from prefect import task, Flow
    
    
    class Generator:
        def __init__(self, data):
            self.particle = data
    
    
    @task
    def run_generator():
        return Generator(random.random())
    
    
    @task
    def compute(particle):
        print(particle**2)
    
    
    with Flow('flow1') as flow:
        gen = run_generator()
        compute(gen.particle)
    When I try to create the flow I get the following error:
    ---------------------------------------------------------------------------
    AttributeError                            Traceback (most recent call last)
    /var/folders/7g/2_wk8ddx7h5_4r12flp9031m0000gn/T/ipykernel_3073/2988601640.py in <module>
          1 with Flow('flow1') as flow:
          2     gen = run_generator()
    ----> 3     val = compute(gen.particle)
    
    AttributeError: 'FunctionTask' object has no attribute 'particle'
    Is there any chance I can have the delayed execution of the parameter until the flow is executed? I know that changing
    compute
    to receive the object and handling the data access there works but not always I have the possibility to change the function and it would be bad in my case to create many wrappers for users of my library.
    Kevin Kho

    Kevin Kho

    11 months ago
    Hey Hugo, you need to do something like this:
    import random
    from prefect import task, Flow
    
    
    class Generator:
        def __init__(self, data):
            self.particle = data
    
    
    @task
    def run_generator():
        return Generator(random.random())
    
    @task
    def compute(x: Generator):
        print(x.particle**2)
    
    
    with Flow('flow1') as flow:
        gen = run_generator()
        compute(gen)
    
    flow.run()
    because
    gen
    is not a
    Generator
    until runtime when the value is filled. Because of the deferred execution, it is still of type
    FunctionTask
    which is why you get the error that there is no
    particle
    . Does that make sense?
    Hugo Slepicka

    Hugo Slepicka

    11 months ago
    @Kevin Kho it makes sense… so I will need to make wrappers for all my functions which before required just the value to receive the object instead of the attribute?
    Is there a way around it with parameter or something?
    If I add the type annotation for the return type of
    run_generator
    would it be possible to bypass the check?
    Kevin Kho

    Kevin Kho

    11 months ago
    Yes to the first question like this
    with Flow('flow1') as flow:
        gen = run_generator()
        particle = get_particle(gen)
        compute(particle)
    no way around it because the object does not exist until run time so you can’t get the attribute. Type annotations are not read in Prefect (yet!).
    Hugo Slepicka

    Hugo Slepicka

    11 months ago
    Sounds good! Thanks for the help!