Bradley Hurley
03/11/2022, 9:25 PM@dataclass
class OperatingLocation:
bucket: str
prefix: str
uri: str
class OperatingMetaData(Task):
def __init__(
self,
bucket_name: Optional[str] = None,
**kwargs: Any
):
self.bucket_name = bucket_name
super().__init__(**kwargs)
@defaults_from_attrs("bucket_name")
def run(self, bucket_name: str) -> OperatingLocation:
prefix = "some/thing/here"
return OperatingLocation(bucket=bucket_name,
prefix=prefix,
uri=f"s3://{bucket_name}/{prefix}"
)
omd_task = OperatingMetaData()
def create_flow():
with Flow() as flow:
omd_task_results = omd_task(bucket_name="TestBucket")
other_task_results = some_other_task(
bucket=omd_task_results.bucket_name,
prefix=omd_task_results.prefix,
uri=omd_task_results.uri
)
return flow
other_task_results
seems to be the task and not the returned object.Kevin Kho
03/11/2022, 9:32 PMBradley Hurley
03/11/2022, 9:33 PMKevin Kho
03/11/2022, 9:34 PMBradley Hurley
03/11/2022, 9:34 PMFlow as flow:
omd_results = omd(bucket_name="Brad")
a = hello_world(input_str=omd_results.uri, upstream_tasks=[omd_results])
b = hello_world(input_str=omd_results.prefix, upstream_tasks=[a])
c = hello_world(input_str=omd_results.uri, upstream_tasks=[b])
flow.run_config = k8s_run_config
return flow
hello_world
@task(name="hello-world", log_stdout=True)
def hello_world(input_str: str):
print(f"Hello, world! - {input_str}")
poetry run prefect register --project hermana -p tests/integration/samples/hello_world.py
File "/Users/bradley/repos/nuna-atlas/tests/integration/samples/hello_world.py", line 106, in create_flow
a = hello_world(input_str=omd_results.uri, upstream_tasks=[omd_results])
AttributeError: 'OperatingMetaData' object has no attribute 'uri'
hello_world
to be a class without a decorator and was able to return a string and tuple without issues, but when I try to return a dataclass things seems to breakdown for me.Kevin Kho
03/11/2022, 10:37 PMomd_results.uri
because it doesn’t exist during Flow build time because it’s still of type TaskBradley Hurley
03/11/2022, 10:38 PM