Michael Law
04/11/2022, 8:23 AMdef run(self, flow: Flow, scheduler = "threads", num_workers = 8):
env = {
"DATABRICKS_CLUSTER_ID": self.cluster_id,
"MOUNT": self.mount,
"CODEMOUNT": self.codeMount,
"ENVIRONMENT": self.environment,
"RELEASE_VERSION": self.release_version,
"APP_IMAGE": self.kubernetes_job_image,
"AZURE_STORAGE_CONNECTION_STRING": self.storage_connection,
"PREFECT__CONTEXT__SECRETS__DATABRICKS_CONNECTION_STRING": f"'{self.databricks_connection}'",
"PREFECT_PROJECT": self.prefect_project,
"DATABRICKS_INSTANCE_POOL_ID": self.instance_pool_id,
"SQL_SERVER_JDBC_CONNECTION_STRING": self.sql_jdbc_connection_string
}
flow.executor = LocalDaskExecutor(scheduler = scheduler, num_workers = num_workers)
# 'DEBUG' is not set on the K8 cluster
if (os.environ["DEBUG"] == "1"):
flow.run()
elif (os.environ["DEBUG"] == "2"):
flow.visualize()
else:
flow.run_config = KubernetesRun(image=self.kubernetes_job_image, env=env)
flow.storage = Azure(container="fdpflows", connection_string=self.storage_connection)
Anna Geller
04/11/2022, 9:22 AMwith Flow("your_flow", executor = LocalDaskExecutor(scheduler = scheduler, num_workers = num_workers)) as flow:
This may help since the executor is retrieved from storage at runtimeMichael Law
04/11/2022, 10:16 AMAnna Geller
04/11/2022, 10:30 AMexport PREFECT__LOGGING__FORMAT="%(levelname)s - %(name)s - %(process)s - %(thread)s | %(message)s"
flow.run_config = KubernetesRun(image=self.kubernetes_job_image, env=dict(PREFECT__LOGGING__FORMAT="%(levelname)s - %(name)s - %(process)s - %(thread)s | %(message)s"))
Michael Law
04/11/2022, 10:32 AMAnna Geller
04/11/2022, 2:09 PM