https://prefect.io logo
Title
m

Marwan Sarieddine

08/21/2020, 8:29 PM
Hi folks, I am trying the S3 flow storage option - using a kubernetes agent and a DaskKubernetesEnvironment execution environment. The issue I am running into is that the prefect agent is creating a prefect-job pod that is throwing an S3 authentication error. I am using a k8s secret and I already have the proper AWS credentials set as environment variables on the agent and worker k8s spec. but for some reason the prefect-job is not being delegated these environment variables... What is the prefect way to running an s3 stored flow with this similar setup ?
j

josh

08/21/2020, 8:34 PM
Hi @Marwan Sarieddine I believe the K8s agent only propagates environment variables to the first job it creates. Are you providing both a scheduler and worker spec to the environment? The scheduler pod needs to be able to pull the flow from S3 so it would also need the credentials
m

Marwan Sarieddine

08/21/2020, 8:35 PM
Hi @josh - thanks for the prompt response.
I believe the K8s agent only propagates environment variables to the first job it creates
That is correct - I see the first job pod in a Running state, then the second job pod with an Error state. I guess as a followup - why there are two job pods that get created ?
Are you providing both a scheduler and worker spec to the environment?
No I am not _ I thought with a DaskKuberenetesEnvironment I only need to specify the dask-worker pod spec
j

josh

08/21/2020, 8:36 PM
In order to actually pull the flow from s3 storage the scheduler job (which is where the flow runner is initialized) needs to be authenticated. This is the default job spec that is used for the scheduler which you can modify to fit your needs https://github.com/PrefectHQ/prefect/blob/master/src/prefect/environments/execution/dask/job.yaml
m

Marwan Sarieddine

08/21/2020, 8:37 PM
I see
so the second job pod is for the scheduler - ok makes sense now
thank you - will try it out
j

josh

08/21/2020, 8:37 PM
There are two jobs created because the first one acts as an init-type process where the flow information is pulled and is kept separated from the API layer (that data isn’t sent to the backend). This is actually going to change hopefully in the near future to allow for the same secure separation but remove the need of the initial job as we have a new design for this coming down the pipe! 🙂
m

Marwan Sarieddine

08/21/2020, 8:38 PM
I see
@josh - I added the env variable to the
job.yaml
file and assigned it to
scheduler_spec_file
but I am still facing the same issue for some reason: below is the
job.yaml
spec
apiVersion: batch/v1
kind: Job
metadata:
  name: prefect-dask-job
  labels:
    app: prefect-dask-job
spec:
  template:
    metadata:
      labels:
        app: prefect-dask-job
    spec:
      containers:
        - name: flow
          image: xxxxx
          imagePullPolicy: IfNotPresent
          command: ["/bin/sh", "-c"]
          args:
            [
              'python -c "import prefect; prefect.environments.execution.load_and_run_flow()"',
            ]
          env:
            - name: PREFECT__CLOUD__GRAPHQL
              value: PREFECT__CLOUD__GRAPHQL
            - name: PREFECT__CLOUD__AUTH_TOKEN
              value: PREFECT__CLOUD__AUTH_TOKEN
            - name: PREFECT__CONTEXT__FLOW_RUN_ID
              value: PREFECT__CONTEXT__FLOW_RUN_ID
            - name: PREFECT__CONTEXT__NAMESPACE
              value: PREFECT__CONTEXT__NAMESPACE
            - name: PREFECT__CONTEXT__IMAGE
              value: PREFECT__CONTEXT__IMAGE
            - name: PREFECT__CLOUD__USE_LOCAL_SECRETS
              value: "false"
            - name: PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS
              value: "prefect.engine.cloud.CloudFlowRunner"
            - name: PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS
              value: "prefect.engine.cloud.CloudTaskRunner"
            - name: PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS
              value: "prefect.engine.executors.DaskExecutor"
            - name: PREFECT__LOGGING__LOG_TO_CLOUD
              value: "true"
            - name: PREFECT__LOGGING__LEVEL
              value: "INFO"
            - name: PREFECT__DEBUG
              value: "true"
            - name: DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING
              value: "True"
            - name: PREFECT__LOGGING__EXTRA_LOGGERS
              value: PREFECT__LOGGING__EXTRA_LOGGERS
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                secretKeyRef:
                  key: AWS_ACCESS_KEY_ID
                  name: aws-s3-secret
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                secretKeyRef:
                  key: AWS_SECRET_ACCESS_KEY
                  name: aws-s3-secret
          resources:
            requests:
              cpu: "100m"
            limits:
              cpu: "100m"
      restartPolicy: Never
the worker spec
kind: Pod
metadata:
  labels:
    app: prefect-dask-worker
spec:
  containers:
  - args:
    - dask-worker
    - --no-dashboard
    - --death-timeout
    - '60'
    - --nthreads
    - '1'
    - --nprocs
    - '1'
    - --memory-limit
    - 4GB
    env:
    - name: AWS_ACCESS_KEY_ID
      valueFrom:
        secretKeyRef:
          key: AWS_ACCESS_KEY_ID
          name: aws-s3-secret
    - name: AWS_SECRET_ACCESS_KEY
      valueFrom:
        secretKeyRef:
          key: AWS_SECRET_ACCESS_KEY
          name: aws-s3-secret
    image: xxxxx
    imagePullPolicy: IfNotPresent
    name: dask-worker
    resources:
      limits:
        cpu: 1000m
        memory: 4G
      requests:
        cpu: 1000m
        memory: 4G
  restartPolicy: Never
here is the call to
DaskKubernetesEnvironment
DaskKubernetesEnvironment(
    min_workers=1,
    max_workers=2,
    scheduler_spec_file="specs/job.yaml",
    worker_spec_file="specs/worker_spec.yaml",
)
j

josh

08/21/2020, 9:39 PM
@Marwan Sarieddine are you using Prefect Cloud to orchestrate these runs?
m

Marwan Sarieddine

08/21/2020, 9:40 PM
yep
here is the error I am getting
kubectl logs pod/prefect-job-ad7ab790-795nm  
[2020-08-21 21:39:29] INFO - prefect.S3 | Downloading simple-flow-4/2020-08-21t21-37-27-970539-00-00 from infima-etl-flows
[2020-08-21 21:39:31] ERROR - prefect.S3 | Error downloading Flow from S3: An error occurred (403) when calling the HeadObject operation: Forbidden
An error occurred (403) when calling the HeadObject operation: Forbidden
Traceback (most recent call last):
  File "/usr/local/bin/prefect", line 8, in <module>
    sys.exit(cli())
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 80, in cloud_flow
    raise exc
  File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 69, in cloud_flow
    flow = storage.get_flow(storage.flows[flow_data.name])
  File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/s3.py", line 92, in get_flow
    raise err
  File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/s3.py", line 87, in get_flow
    self._boto3_client.download_fileobj(
  File "/usr/local/lib/python3.8/site-packages/boto3/s3/inject.py", line 678, in download_fileobj
    return future.result()
  File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 106, in result
    return self._coordinator.result()
  File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 265, in result
    raise self._exception
  File "/usr/local/lib/python3.8/site-packages/s3transfer/tasks.py", line 255, in _main
    self._submit(transfer_future=transfer_future, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/s3transfer/download.py", line 340, in _submit
    response = client.head_object(
  File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 316, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 635, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden
inspecting the
pod/prefect-job-ad7ab790-795nm
it doesnt have the AWS env variables ...
j

josh

08/21/2020, 9:42 PM
Have you tried using Prefect Secrets for this? There is a default
AWS_CREDENTIALS
that is set up to solve issues like this https://docs.prefect.io/core/concepts/secrets.html#default-secrets
Not sure why it wouldn’t have the env vars 🤔
m

Marwan Sarieddine

08/21/2020, 9:43 PM
I haven't - will try the prefect secrets approach
j

josh

08/21/2020, 9:44 PM
Awesome! You can set those values in the secret and then in your storage do something like:
flow.storage = S3(..., secrets=["AWS_CREDENTIALS]")
and those credentials will automatically be used when pulling the flow
m

Marwan Sarieddine

08/21/2020, 10:02 PM
ok it seems I am able to authenticate to s3 now (I am using the prefect secrets approach) - thanks but now I am facing another issue ...
[5:57 PM] $ kubectl logs pod/prefect-job-e74114b4-26n6v 
[2020-08-21 21:55:08] INFO - prefect.S3 | Downloading simple-flow-8/2020-08-21t21-54-33-484226-00-00 from infima-etl-flows

Traceback (most recent call last):
  File "/usr/local/bin/prefect", line 8, in <module>
    sys.exit(cli())
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 80, in cloud_flow
    raise exc
  File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 69, in cloud_flow
    flow = storage.get_flow(storage.flows[flow_data.name])
  File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/s3.py", line 101, in get_flow
    return cloudpickle.loads(output)
TypeError: an integer is required (got type bytes)
EDIT _ sorry I suppose I should have posted this issue thread on the prefect-community channel .. just noticed I am on the wrong channel
(just reposted this last issue on the prefect-community channel)