https://prefect.io logo
j

Joël Luijmes

01/15/2021, 10:33 AM
I’m running Prefect in Kubernetes. What I have is a flow which spawns 15 Kubernetes high CPU intense jobs. In order to do some parallelism, I have in the flow a DaskExecutor configured (6 workers, 1 thread). What I see is the prefect-job which is created by the Kubernetes agent, uses quite some resources (400m CPU, ~850 MiB) . Is there a better alternative to deploy this? Which uses less resources? The alternative is using a static dask cluster, but as multiple flows can be ran at the same time, this doens’t seem like a better alternative. I’m open to some suggestions 🙂
It kinda seems like there is a memory leak. Its now running for 40minutes, and the memory has increased to 1.27Gi
Note: I’m talking about the job created by the prefect agent. The actual job executing code is only using 188MiB.
n

nicholas

01/15/2021, 3:13 PM
Hi @Joël Luijmes - could you share the code you're using to reproduce this?
j

Joël Luijmes

01/15/2021, 3:15 PM
Uhm let me try to get the relevant parts. The flow it self is build dynamically on running the script based on a config file 😅
I htink this contains most important bits
Copy code
executor = DaskExecutor(cluster_kwargs={"n_workers": 6, "threads_per_worker": 1, "memory_limit": "1GB"})

schedule = CronSchedule(cron, start_date=datetime.utcnow())
handler = slack_notifier(only_states=[state.Failed])
storage_gcr = Docker(
    base_image="prefecthq/prefect:0.14.1-python3.8",
    registry_url="...",
    python_dependencies=["pandas"],
)

with Flow(
    f"CloudSQL to BigQuery ({cron})",
    storage=storage,
    executor=executor,
    schedule=schedule,
    run_config=KubernetesRun(
        # prefect_job_template contains a job spec describing resource limits, secrets, nodeselector..
        job_template=prefect_job_template
    ),
) as flow:
    # Class with @resource_manager which spawns off a pod and service to run CloudSQL Proxy
    with CloudSQLProxyManager(...) as service_name:
        # Job which calls RunNamespacedJob.run() on run
        job = CreateSyncJob(
            job_spec_template,
            state_handlers=[cleanup_on_failure, handler],
        )

        # Link the dependency of task to the CloudSQLProxyManager
        task = job(service_name)
This is one of the generated flows
n

nicholas

01/15/2021, 3:25 PM
Hm gotcha - I think this might be best moved to a GitHub ticket; the Core team will be better able to diagnose what might be happening (and if the leak is coming from the prefect engine or not!)
Would you mind opening a ticket with this info?
j

Joël Luijmes

01/15/2021, 3:30 PM
Sure, don’t you have a bot to do that? Or does that lose some of the details?
n

nicholas

01/15/2021, 3:31 PM
We do, Marvin isn't so good at keeping participants notified yet 🤫
but I'm happy to use the bot if you'd prefer 🙂
j

Joël Luijmes

01/15/2021, 3:32 PM
Haha oke, I’ll make an issue then, I’d like to keep notified 🙂
n

nicholas

01/15/2021, 3:33 PM
Ok thank you!!
j

Joël Luijmes

01/15/2021, 3:40 PM
n

nicholas

01/15/2021, 3:41 PM
Brilliant, thanks 🙂