mithalee mohapatra
08/17/2021, 11:30 PMKevin Kho
mithalee mohapatra
08/18/2021, 12:34 AMKevin Kho
mithalee mohapatra
08/18/2021, 12:36 AMKevin Kho
mithalee mohapatra
08/18/2021, 12:38 AMKevin Kho
Kevin Kho
mithalee mohapatra
08/18/2021, 12:50 AMKevin Kho
from prefect import task, Flow, Parameter
from prefect.engine.signals import FAIL, SKIP
from dataclasses import dataclass
@dataclass
class WriteConfig:
target_base_path: str
def validate(self):
if not self.target_base_path:
raise FAIL("'target_base_path' is required for hudi_options.")
@task
def load_configs(executor_instances, hudi_options: WriteConfig):
hudiparms = WriteConfig(target_base_path=hudi_options[0])
print(hudiparms)
with Flow('Write Table') as flow:
executor_instances = Parameter('executor_instance', default=1)
hudi_options= Parameter('hudi_options', default=["test_write"])
load_configs(executor_instances, hudi_options)
flow.register("dsdc")
Kevin Kho
with Flow('Write Table',executor=get_dask_executor(for_k8, 4, 25,'mmm/mmm-phase-0:' + mmm_version)) as hudi_flow:) as flow:
is doing? I think it’s related to this. Maybe you can try registering without the executor to verify?mithalee mohapatra
08/18/2021, 12:56 AMmithalee mohapatra
08/18/2021, 1:21 AMKevin Kho
Kevin Kho