https://prefect.io logo
Title
m

Michael Law

04/11/2022, 8:23 AM
Hey! I have been using prefect for a bit now on AKS in Azure for hosting our agents. On the whole this seems to work well, it creates K8s jobs for our flow runs, but regardless of how we trigger them, it only ever seems to to run the amount of concurrent tasks in the flow to be the amount of virtual or physical CPUs on the cluster. I was under the impression if we set the scheduler to "threads" and the num_workers to x2-4 the number of CPUs, the flow tasks would look to create that amount of threads to run the flow? I am comfortable we can scale these as we are simply submitting jobs to a databricks cluster, so we have no CPU bound work at all happening on the cluster, it is purely IO monitoring. Any help would be appreciated, I have attached a sample of how we trigger our flow in the thread. Thanks in advance for any help which could be offered.
def run(self, flow: Flow, scheduler = "threads", num_workers = 8):
        env = {
            "DATABRICKS_CLUSTER_ID": self.cluster_id,
            "MOUNT": self.mount,
            "CODEMOUNT": self.codeMount,
            "ENVIRONMENT": self.environment,
            "RELEASE_VERSION": self.release_version,
            "APP_IMAGE": self.kubernetes_job_image,
            "AZURE_STORAGE_CONNECTION_STRING": self.storage_connection,
            "PREFECT__CONTEXT__SECRETS__DATABRICKS_CONNECTION_STRING": f"'{self.databricks_connection}'",
            "PREFECT_PROJECT": self.prefect_project,
            "DATABRICKS_INSTANCE_POOL_ID": self.instance_pool_id,
            "SQL_SERVER_JDBC_CONNECTION_STRING": self.sql_jdbc_connection_string
        }

        flow.executor = LocalDaskExecutor(scheduler = scheduler, num_workers = num_workers)

        # 'DEBUG' is not set on the K8 cluster

        if (os.environ["DEBUG"] == "1"):
            flow.run()
        elif (os.environ["DEBUG"] == "2"):
            flow.visualize()
        else:
            flow.run_config = KubernetesRun(image=self.kubernetes_job_image, env=env)
            flow.storage = Azure(container="fdpflows", connection_string=self.storage_connection)
a

Anna Geller

04/11/2022, 9:22 AM
How did you assess how many threads and processes your flow run was using? One option that may help here is attaching the executor directly to your Flow object when it gets created:
with Flow("your_flow", executor = LocalDaskExecutor(scheduler = scheduler, num_workers = num_workers)) as flow:
This may help since the executor is retrieved from storage at runtime
m

Michael Law

04/11/2022, 10:16 AM
I can see from the timeline it never runs more than 4, which matches the amount of processors on the machine
Thanks for getting back to me
I'll try that suggestion too
a

Anna Geller

04/11/2022, 10:30 AM
Gotcha. What may also help is to log the thread and process ID within your flow. You could do that by attaching the thread and process to your log format:
export PREFECT__LOGGING__FORMAT="%(levelname)s - %(name)s - %(process)s - %(thread)s | %(message)s"
this could be set on the agent or attached to your run config afaik:
flow.run_config = KubernetesRun(image=self.kubernetes_job_image, env=dict(PREFECT__LOGGING__FORMAT="%(levelname)s - %(name)s - %(process)s - %(thread)s | %(message)s"))
m

Michael Law

04/11/2022, 10:32 AM
Ah yeah great shout @Anna Geller I'll add that now and trigger some of these jobs
👍 1
Success!!
TYVM
a

Anna Geller

04/11/2022, 2:09 PM
Glad to hear it works now! 🙌