I am trying to use a data class as a paramter to ...
# ask-community
m
I am trying to use a data class as a paramter to my task but getting the below error while registering the flow. Error: " Beginning health checks... System Version check: OK Traceback (most recent call last): File "/opt/prefect/healthcheck.py", line 151, in <module> flows = cloudpickle_deserialization_check(flow_file_paths) File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check flows.append(cloudpickle.loads(flow_bytes)) AttributeError: Can't get attribute 'WriteConfig' on <module 'pypline.prefect.tasks.hudi_writer_eks' from '/usr/local/lib/python3.8/site-packages/pypline/prefect/tasks/hudi_writer_eks.py'> " Prefect Code: I am trying to use a data class as a parameter to my task but getting the below error while registering the flow. I am using Prefect version 0.14.6 Error: " Beginning health checks... System Version check: OK Traceback (most recent call last): File "/opt/prefect/healthcheck.py", line 151, in <module> flows = cloudpickle_deserialization_check(flow_file_paths) File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check flows.append(cloudpickle.loads(flow_bytes)) AttributeError: Can't get attribute 'WriteConfig' on <module 'pypline.prefect.tasks.hudi_writer_eks' from '/usr/local/lib/python3.8/site-packages/pypline/prefect/tasks/hudi_writer_eks.py'> import boto3 import prefect from kubernetes import client, config from kubernetes.client.rest import ApiException from prefect import task from prefect.engine.signals import FAIL, SKIP from pypline.lib.s3 import S3URI from dataclasses import dataclass @dataclass class WriteConfig: target_base_path: str target_table_name: str pk_column: str partition_column: str pre_combine_column: str source_path: str def validate(self): if not self.target_base_path: raise FAIL("'target_base_path' is required for hudi_options.") # TODO: validate that the required params are not empty @task def load_configs(executor_instances, hudi_options: WriteConfig): hudiparms = WriteConfig(target_base_path=hudi_options[0] ,target_table_name=hudi_options[1] , pk_column=hudi_options[2] , partition_column=hudi_options[3] , pre_combine_column=hudi_options[4] , source_path=hudi_options[5]) print(hudiparms) with Flow('Write Table',executor=get_dask_executor(for_k8, 4, 25, 'mmm/mmm-phase-0:' + mmm_version)) as hudi_flow:) as flow: executor_instances = Parameter('executor_instance', default=1) hudi_options= Parameter('hudi_options', default=[ "test_write" ,"test_table" ,"id" ,"one" ,"two" ,"userdata1.parquet"] ) load_configs(executor_instances, hudi_options)
k
Hi @mithalee mohapatra, will double check but I think this is because Parameters need to be serializable because it’s stored in our database. This might be better if you Parameterize the location of the config to load instead. Could you also move the code to the thread when you get the chance so we don’t crowd the main channel?
m
Where to move the code? Sorry i dnt follow you
k
Here in this thread where you replied. You can just say something like “Edited: code in thread”
m
Edited Code: import boto3 import prefect from kubernetes import client, config from kubernetes.client.rest import ApiException from prefect import task from prefect.engine.signals import FAIL, SKIP from pypline.lib.s3 import S3URI from dataclasses import dataclass @dataclass class WriteConfig: target_base_path: str target_table_name: str pk_column: str partition_column: str pre_combine_column: str source_path: str def validate(self): if not self.target_base_path: raise FAIL("'target_base_path' is required for hudi_options.") # TODO: validate that the required params are not empty @task def load_configs(executor_instances, hudi_options: WriteConfig): hudiparms = WriteConfig(target_base_path=hudi_options[0] ,target_table_name=hudi_options[1] , pk_column=hudi_options[2] , partition_column=hudi_options[3] , pre_combine_column=hudi_options[4] , source_path=hudi_options[5]) print(hudiparms) with Flow('Write Table',executor=get_dask_executor(for_k8, 4, 25, 'mmm/mmm-phase-0:' + mmm_version)) as hudi_flow:) as flow: executor_instances = Parameter('executor_instance', default=1) hudi_options= Parameter('hudi_options', default=[ "test_write" ,"test_table" ,"id" ,"one" ,"two" ,"userdata1.parquet"] ) load_configs(executor_instances, hudi_options)
k
Thank you! Just need to edit the original post to trim it.
m
Parameterize the location of the config to load?
k
Ah sorry I misread the code. Let me re-read
Is this error when you register or run?
m
Register
k
I made a simpler version of your script and it’s working.
Copy code
from prefect import task, Flow, Parameter
from prefect.engine.signals import FAIL, SKIP
from dataclasses import dataclass

@dataclass
class WriteConfig:
    target_base_path: str
    def validate(self):
        if not self.target_base_path:
            raise FAIL("'target_base_path' is required for hudi_options.")

@task
def load_configs(executor_instances, hudi_options: WriteConfig):
    hudiparms = WriteConfig(target_base_path=hudi_options[0])
    print(hudiparms)
	
	
with Flow('Write Table') as flow:
        executor_instances = Parameter('executor_instance', default=1)
        hudi_options= Parameter('hudi_options', default=["test_write"])
        load_configs(executor_instances, hudi_options) 

flow.register("dsdc")
Could you tell me what the
Copy code
with Flow('Write Table',executor=get_dask_executor(for_k8, 4, 25,'mmm/mmm-phase-0:' + mmm_version)) as hudi_flow:) as flow:
is doing? I think it’s related to this. Maybe you can try registering without the executor to verify?
m
It creates a docker image. The flow runs on Kubernetes with dask executors. I will try without the executor.
Where can i find the healthcheck.py?
k
Here , but I don’t think you need to go there. Basically the error is that the flow can’t be serialized/deserialized with cloudpickle
Maybe you can try attaching storage and executor on this Flow?