https://prefect.io logo
Title
b

Bradley Hurley

03/11/2022, 9:25 PM
Hi Folks - I have a question about using the return value from a task as input in another task. I have searched around and am not sure if I am missing something. 🧵
I have a dataclass and task:
@dataclass
class OperatingLocation:
    bucket: str
    prefix: str
    uri: str


class OperatingMetaData(Task):

    def __init__(
            self,
            bucket_name: Optional[str] = None,
            **kwargs: Any
    ):
        self.bucket_name = bucket_name
        super().__init__(**kwargs)

    @defaults_from_attrs("bucket_name")
    def run(self, bucket_name: str) -> OperatingLocation:
        prefix = "some/thing/here"
        return OperatingLocation(bucket=bucket_name,
                                 prefix=prefix,
                                 uri=f"s3://{bucket_name}/{prefix}"
                                 )
Then I attempt to create a flow like this:
omd_task = OperatingMetaData()

def create_flow():
    with Flow() as flow:
        omd_task_results = omd_task(bucket_name="TestBucket")

        other_task_results = some_other_task(
            bucket=omd_task_results.bucket_name,
            prefix=omd_task_results.prefix,
            uri=omd_task_results.uri
        )

        return flow
Im not sure if its just my linter, but
other_task_results
seems to be the task and not the returned object.
k

Kevin Kho

03/11/2022, 9:32 PM
Ahh they are a task using flow creation time, because the returned object is only instantiated during the actual flow run. The linter is giving the build time type. Does that make sense?
b

Bradley Hurley

03/11/2022, 9:33 PM
That does.
I was going to attempt to deploy and verify before I asked, but I have a setup using microk8s and was having some other issues.
k

Kevin Kho

03/11/2022, 9:34 PM
What issues do you have?
b

Bradley Hurley

03/11/2022, 9:34 PM
Just environmental stuff. Hopefully almost resolved.
I was able to get my local instance running, but am still having issues. Maybe I misunderstood your comment, or I still have something wrong.
Flow as flow:
        omd_results = omd(bucket_name="Brad")
        a = hello_world(input_str=omd_results.uri, upstream_tasks=[omd_results])
        b = hello_world(input_str=omd_results.prefix, upstream_tasks=[a])
        c = hello_world(input_str=omd_results.uri, upstream_tasks=[b])

    flow.run_config = k8s_run_config
    return flow
Here is my
hello_world
@task(name="hello-world", log_stdout=True)
def hello_world(input_str: str):
    print(f"Hello, world! - {input_str}")
When I run
poetry run prefect register --project hermana -p tests/integration/samples/hello_world.py
File "/Users/bradley/repos/nuna-atlas/tests/integration/samples/hello_world.py", line 106, in create_flow
    a = hello_world(input_str=omd_results.uri, upstream_tasks=[omd_results])
  AttributeError: 'OperatingMetaData' object has no attribute 'uri'
@Kevin Kho - I feel like I am messing something easy up. I modified
hello_world
to be a class without a decorator and was able to return a string and tuple without issues, but when I try to return a dataclass things seems to breakdown for me.
k

Kevin Kho

03/11/2022, 10:37 PM
You can’t to stuff like this:
omd_results.uri
because it doesn’t exist during Flow build time because it’s still of type Task
You need to access properties inside tasks to defer the execution as well
b

Bradley Hurley

03/11/2022, 10:38 PM
ok, thanks
I will refactor