https://prefect.io logo
Title
a

Amir

11/02/2022, 7:34 PM
Xpost from the community page: the agent is on aks, and I'm having issue with resource management. Any thoughts? I'm currently aimlessly messing around with dask hoping something will stick.
n

Nate

11/02/2022, 8:36 PM
Hi @Amir can you access the state / logs of the pod ?
kubectl get pods # to find the problem pod
kubectl logs problem-pod-name
a

Amir

11/03/2022, 3:19 PM
Hey @Nate, sorry for the late reply - got off early yesterday. I'll spin it up and grab the logs. I'm assuming you want me to generate the logs for when it stalls, correct?
n

Nate

11/03/2022, 3:27 PM
Hey @Amir - no problem on second thought, I'm not sure that'll be super helpful. If the problem you're having is with your cluster distributing its own resources, I don't think we'll find anything incriminating on the pod logs this feels more like a cluster configuration issue somehow - are you on prefect 1 or 2?
a

Amir

11/03/2022, 4:08 PM
Prefect 2. I've attached a current snapshot of the pods CPU usage: it's currently stable and running, but will drop to ~0 in a few hours. We deploy the agent using prefect helm chart. There are two charts, with the second one patching (and taking priority) against the first. The .yaml files for these are as follows: 1.
apiVersion: <http://helm.toolkit.fluxcd.io/v2beta1|helm.toolkit.fluxcd.io/v2beta1>
kind: HelmRelease
metadata:
  name: agent
  namespace: prefect
spec:
  interval: 30m
  chart:
    spec:
      chart: prefect-agent
      version: 2022.09.26
      sourceRef:
        name: prefecthq
        kind: HelmRepository
        namespace: kube-system
  values:
    #See <https://github.com/PrefectHQ/prefect-helm/blob/2022.09.26/charts/prefect-agent/values.yaml> for details
    agent:
      image:
        prefectTag: 2.4.5-python3.9
2.
apiVersion: <http://helm.toolkit.fluxcd.io/v2beta1|helm.toolkit.fluxcd.io/v2beta1>
kind: HelmRelease
metadata:
  name: agent
  namespace: prefect
spec:
  values:
    #See <https://github.com/PrefectHQ/prefect-helm/blob/2022.09.26/charts/prefect-agent/values.yaml> for details
    agent:
      cloudApiConfig:
        accountId: <REDACTED ID>
        workspaceId: <REDACTED ID>
      containerSecurityContext:
        readOnlyRootFilesystem: false
      image:
        debug: true
Some background on the flow itself: The flow is using the SequentialTaskRunner (current config is:
@flow(task_runner=SequentialTaskRunner())
). There are three tasks within the flow, 1) pulls data from snowflake to local, 2) shell_run_command to run the model (
poetry run python src/driver/__main__.py
) and 3) upload outputted data to snowflake. Here is the flow:
@flow(task_runner=SequentialTaskRunner())
def flow():
    logger = get_run_logger()

######
# Retreive the input data needed for the transformations from Snowflake:
######
    <http://logger.info|logger.info>(f":large_yellow_square: INFO Downloading Snowflake Data")

    snowflake_to_local_task.submit()
    
    <http://logger.info|logger.info>(f':white_check_mark: INFO Finished Snowflake Downloads')

######
# Run transformation script
#######
    <http://logger.info|logger.info>(f":large_yellow_square: INFO Running Transformations")

    commands = ["cd models/model_folder/ && poetry install && poetry run python src/driver/__main__.py"] 
    for cmd in commands:
        <http://logger.info|logger.info>(f"INFO Running the command {cmd}:")
        shell_run_command(command=cmd, return_all=True)

    <http://logger.info|logger.info>(f':white_check_mark: INFO Finished Running Transformations')


#######
# Upload the results from the model to snowflake:
#######
    <http://logger.info|logger.info>(f":large_yellow_square: INFO Uploading Data to Snowflake")

    local_to_snowflake_task.submit()

    <http://logger.info|logger.info>(f':white_check_mark: INFO Finished Uploading Data to Snowflake')
    <http://logger.info|logger.info>(f':white_check_mark: :white_check_mark: :white_check_mark: INFO RUN COMPLETE')
n

Nate

11/03/2022, 8:09 PM
hmm - why did you choose to use 2 different
values.yaml
?
a

Amir

11/03/2022, 8:26 PM
We use Kustomize to set up a base manifest, and an overlay that allows specific overrides for each cluster that uses it
n

Nate

11/03/2022, 8:32 PM
ah cool - okay thanks for the context. when you run flows on this cluster, are you seeing pods being created for each flow run? (assuming you're using the KubernetesJob infra block on these deployments)
a

Amir

11/03/2022, 8:37 PM
Correct, deployment utilizes a k8s job block and not a k8s cluster config block. Pod is created and shows up in the cloud UI. Some context: I am able to run other flows successfully in k8s - this issue is specific to a monolith python transformation model spanning ~10 hours in compute time. When I run this flow and scale down some of the parameters within that model (smaller date range for its calculations), it does run successfully. This model also does run with those exact same specifications that is causing this issue when it's just in a VM and not called upon by Prefect.
Quick note: specified resource values within the base job manifest of the k8s block
"resources": {
              "requests": {
                "cpu": "4000m",
                "memory": "32Gi"
              },
              "limits": {
                "cpu": "8000m",
                "memory": "62Gi"
which is now reflected in the pod. I'm unsure if setting limits will help prevent the issue, but just noting it here (also a best practice habit I should get into). The flow is about 100m into its run, so the issue may appear again shortly
n

Nate

11/03/2022, 8:49 PM
ahh I see. Yes it does sound like upping the resource requests makes sense, and on third thought I guess I actually would be curious what the pod logs look like for a large date range / run that causes problems something that might make sense is to chunk / distribute the work across worker flows that would run on their own pods with smaller mem requests with something (roughly) like
from prefect.deployments import run_deployment

@flow
def parent(date_range: List, chunk_size: 5):

    chunked_dates = [
        date_range[i : i + chunk_size] 
        for i in range(0, len(date_range), chunk_size)
    ]

    run_deployment.map(name=unmapped("processNDatesFlow"), params=chunked_dates)
a

Amir

11/03/2022, 9:28 PM
So the CPU dropped again. Debug mode shows nothing. Pod still exists and is running, node and pod are showing ~0 CPU (a drop from 1). CPU usage, when using
top -c 1
from within the terminal, also shows 0. Such an odd problem. It's almost as if it just gives up.
Sorry, are you curious about the log results when using the date range I'm using in the flow you sent?
n

Nate

11/03/2022, 9:33 PM
I actually just realized that
run_deployment
isnt a task, so you can't use
map
but you can use some for loop or wrap `run_deployment`in a task and map that I just thought it might be useful to see logs from the pod you mentioned showing weird behaviour above