Hey! I have been using prefect for a bit now on AK...
# ask-community
m
Hey! I have been using prefect for a bit now on AKS in Azure for hosting our agents. On the whole this seems to work well, it creates K8s jobs for our flow runs, but regardless of how we trigger them, it only ever seems to to run the amount of concurrent tasks in the flow to be the amount of virtual or physical CPUs on the cluster. I was under the impression if we set the scheduler to "threads" and the num_workers to x2-4 the number of CPUs, the flow tasks would look to create that amount of threads to run the flow? I am comfortable we can scale these as we are simply submitting jobs to a databricks cluster, so we have no CPU bound work at all happening on the cluster, it is purely IO monitoring. Any help would be appreciated, I have attached a sample of how we trigger our flow in the thread. Thanks in advance for any help which could be offered.
Copy code
def run(self, flow: Flow, scheduler = "threads", num_workers = 8):
        env = {
            "DATABRICKS_CLUSTER_ID": self.cluster_id,
            "MOUNT": self.mount,
            "CODEMOUNT": self.codeMount,
            "ENVIRONMENT": self.environment,
            "RELEASE_VERSION": self.release_version,
            "APP_IMAGE": self.kubernetes_job_image,
            "AZURE_STORAGE_CONNECTION_STRING": self.storage_connection,
            "PREFECT__CONTEXT__SECRETS__DATABRICKS_CONNECTION_STRING": f"'{self.databricks_connection}'",
            "PREFECT_PROJECT": self.prefect_project,
            "DATABRICKS_INSTANCE_POOL_ID": self.instance_pool_id,
            "SQL_SERVER_JDBC_CONNECTION_STRING": self.sql_jdbc_connection_string
        }

        flow.executor = LocalDaskExecutor(scheduler = scheduler, num_workers = num_workers)

        # 'DEBUG' is not set on the K8 cluster

        if (os.environ["DEBUG"] == "1"):
            flow.run()
        elif (os.environ["DEBUG"] == "2"):
            flow.visualize()
        else:
            flow.run_config = KubernetesRun(image=self.kubernetes_job_image, env=env)
            flow.storage = Azure(container="fdpflows", connection_string=self.storage_connection)
a
How did you assess how many threads and processes your flow run was using? One option that may help here is attaching the executor directly to your Flow object when it gets created:
Copy code
with Flow("your_flow", executor = LocalDaskExecutor(scheduler = scheduler, num_workers = num_workers)) as flow:
This may help since the executor is retrieved from storage at runtime
m
I can see from the timeline it never runs more than 4, which matches the amount of processors on the machine
Thanks for getting back to me
I'll try that suggestion too
a
Gotcha. What may also help is to log the thread and process ID within your flow. You could do that by attaching the thread and process to your log format:
Copy code
export PREFECT__LOGGING__FORMAT="%(levelname)s - %(name)s - %(process)s - %(thread)s | %(message)s"
this could be set on the agent or attached to your run config afaik:
Copy code
flow.run_config = KubernetesRun(image=self.kubernetes_job_image, env=dict(PREFECT__LOGGING__FORMAT="%(levelname)s - %(name)s - %(process)s - %(thread)s | %(message)s"))
m
Ah yeah great shout @Anna Geller I'll add that now and trigger some of these jobs
👍 1
Success!!
TYVM
a
Glad to hear it works now! 🙌