Thread
#prefect-contributors
    m

    Marwan Sarieddine

    2 years ago
    Hi folks, I am trying the S3 flow storage option - using a kubernetes agent and a DaskKubernetesEnvironment execution environment. The issue I am running into is that the prefect agent is creating a prefect-job pod that is throwing an S3 authentication error. I am using a k8s secret and I already have the proper AWS credentials set as environment variables on the agent and worker k8s spec. but for some reason the prefect-job is not being delegated these environment variables... What is the prefect way to running an s3 stored flow with this similar setup ?
    j

    josh

    2 years ago
    Hi @Marwan Sarieddine I believe the K8s agent only propagates environment variables to the first job it creates. Are you providing both a scheduler and worker spec to the environment? The scheduler pod needs to be able to pull the flow from S3 so it would also need the credentials
    m

    Marwan Sarieddine

    2 years ago
    Hi @josh - thanks for the prompt response.
    I believe the K8s agent only propagates environment variables to the first job it creates
    That is correct - I see the first job pod in a Running state, then the second job pod with an Error state. I guess as a followup - why there are two job pods that get created ?
    Are you providing both a scheduler and worker spec to the environment?
    No I am not _ I thought with a DaskKuberenetesEnvironment I only need to specify the dask-worker pod spec
    j

    josh

    2 years ago
    In order to actually pull the flow from s3 storage the scheduler job (which is where the flow runner is initialized) needs to be authenticated. This is the default job spec that is used for the scheduler which you can modify to fit your needs https://github.com/PrefectHQ/prefect/blob/master/src/prefect/environments/execution/dask/job.yaml
    m

    Marwan Sarieddine

    2 years ago
    I see
    so the second job pod is for the scheduler - ok makes sense now
    thank you - will try it out
    j

    josh

    2 years ago
    There are two jobs created because the first one acts as an init-type process where the flow information is pulled and is kept separated from the API layer (that data isn’t sent to the backend). This is actually going to change hopefully in the near future to allow for the same secure separation but remove the need of the initial job as we have a new design for this coming down the pipe! 🙂
    m

    Marwan Sarieddine

    2 years ago
    I see
    @josh - I added the env variable to the
    job.yaml
    file and assigned it to
    scheduler_spec_file
    but I am still facing the same issue for some reason: below is the
    job.yaml
    spec
    apiVersion: batch/v1
    kind: Job
    metadata:
      name: prefect-dask-job
      labels:
        app: prefect-dask-job
    spec:
      template:
        metadata:
          labels:
            app: prefect-dask-job
        spec:
          containers:
            - name: flow
              image: xxxxx
              imagePullPolicy: IfNotPresent
              command: ["/bin/sh", "-c"]
              args:
                [
                  'python -c "import prefect; prefect.environments.execution.load_and_run_flow()"',
                ]
              env:
                - name: PREFECT__CLOUD__GRAPHQL
                  value: PREFECT__CLOUD__GRAPHQL
                - name: PREFECT__CLOUD__AUTH_TOKEN
                  value: PREFECT__CLOUD__AUTH_TOKEN
                - name: PREFECT__CONTEXT__FLOW_RUN_ID
                  value: PREFECT__CONTEXT__FLOW_RUN_ID
                - name: PREFECT__CONTEXT__NAMESPACE
                  value: PREFECT__CONTEXT__NAMESPACE
                - name: PREFECT__CONTEXT__IMAGE
                  value: PREFECT__CONTEXT__IMAGE
                - name: PREFECT__CLOUD__USE_LOCAL_SECRETS
                  value: "false"
                - name: PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS
                  value: "prefect.engine.cloud.CloudFlowRunner"
                - name: PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS
                  value: "prefect.engine.cloud.CloudTaskRunner"
                - name: PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS
                  value: "prefect.engine.executors.DaskExecutor"
                - name: PREFECT__LOGGING__LOG_TO_CLOUD
                  value: "true"
                - name: PREFECT__LOGGING__LEVEL
                  value: "INFO"
                - name: PREFECT__DEBUG
                  value: "true"
                - name: DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING
                  value: "True"
                - name: PREFECT__LOGGING__EXTRA_LOGGERS
                  value: PREFECT__LOGGING__EXTRA_LOGGERS
                - name: AWS_ACCESS_KEY_ID
                  valueFrom:
                    secretKeyRef:
                      key: AWS_ACCESS_KEY_ID
                      name: aws-s3-secret
                - name: AWS_SECRET_ACCESS_KEY
                  valueFrom:
                    secretKeyRef:
                      key: AWS_SECRET_ACCESS_KEY
                      name: aws-s3-secret
              resources:
                requests:
                  cpu: "100m"
                limits:
                  cpu: "100m"
          restartPolicy: Never
    the worker spec
    kind: Pod
    metadata:
      labels:
        app: prefect-dask-worker
    spec:
      containers:
      - args:
        - dask-worker
        - --no-dashboard
        - --death-timeout
        - '60'
        - --nthreads
        - '1'
        - --nprocs
        - '1'
        - --memory-limit
        - 4GB
        env:
        - name: AWS_ACCESS_KEY_ID
          valueFrom:
            secretKeyRef:
              key: AWS_ACCESS_KEY_ID
              name: aws-s3-secret
        - name: AWS_SECRET_ACCESS_KEY
          valueFrom:
            secretKeyRef:
              key: AWS_SECRET_ACCESS_KEY
              name: aws-s3-secret
        image: xxxxx
        imagePullPolicy: IfNotPresent
        name: dask-worker
        resources:
          limits:
            cpu: 1000m
            memory: 4G
          requests:
            cpu: 1000m
            memory: 4G
      restartPolicy: Never
    here is the call to
    DaskKubernetesEnvironment
    DaskKubernetesEnvironment(
        min_workers=1,
        max_workers=2,
        scheduler_spec_file="specs/job.yaml",
        worker_spec_file="specs/worker_spec.yaml",
    )
    j

    josh

    2 years ago
    @Marwan Sarieddine are you using Prefect Cloud to orchestrate these runs?
    m

    Marwan Sarieddine

    2 years ago
    yep
    here is the error I am getting
    kubectl logs pod/prefect-job-ad7ab790-795nm  
    [2020-08-21 21:39:29] INFO - prefect.S3 | Downloading simple-flow-4/2020-08-21t21-37-27-970539-00-00 from infima-etl-flows
    [2020-08-21 21:39:31] ERROR - prefect.S3 | Error downloading Flow from S3: An error occurred (403) when calling the HeadObject operation: Forbidden
    An error occurred (403) when calling the HeadObject operation: Forbidden
    Traceback (most recent call last):
      File "/usr/local/bin/prefect", line 8, in <module>
        sys.exit(cli())
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 80, in cloud_flow
        raise exc
      File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 69, in cloud_flow
        flow = storage.get_flow(storage.flows[flow_data.name])
      File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/s3.py", line 92, in get_flow
        raise err
      File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/s3.py", line 87, in get_flow
        self._boto3_client.download_fileobj(
      File "/usr/local/lib/python3.8/site-packages/boto3/s3/inject.py", line 678, in download_fileobj
        return future.result()
      File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 106, in result
        return self._coordinator.result()
      File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 265, in result
        raise self._exception
      File "/usr/local/lib/python3.8/site-packages/s3transfer/tasks.py", line 255, in _main
        self._submit(transfer_future=transfer_future, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/s3transfer/download.py", line 340, in _submit
        response = client.head_object(
      File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 316, in _api_call
        return self._make_api_call(operation_name, kwargs)
      File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 635, in _make_api_call
        raise error_class(parsed_response, operation_name)
    botocore.exceptions.ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden
    inspecting the
    pod/prefect-job-ad7ab790-795nm
    it doesnt have the AWS env variables ...
    j

    josh

    2 years ago
    Have you tried using Prefect Secrets for this? There is a default
    AWS_CREDENTIALS
    that is set up to solve issues like this https://docs.prefect.io/core/concepts/secrets.html#default-secrets
    Not sure why it wouldn’t have the env vars 🤔
    m

    Marwan Sarieddine

    2 years ago
    I haven't - will try the prefect secrets approach
    j

    josh

    2 years ago
    Awesome! You can set those values in the secret and then in your storage do something like:
    flow.storage = S3(..., secrets=["AWS_CREDENTIALS]")
    and those credentials will automatically be used when pulling the flow
    m

    Marwan Sarieddine

    2 years ago
    ok it seems I am able to authenticate to s3 now (I am using the prefect secrets approach) - thanks but now I am facing another issue ...
    [5:57 PM] $ kubectl logs pod/prefect-job-e74114b4-26n6v 
    [2020-08-21 21:55:08] INFO - prefect.S3 | Downloading simple-flow-8/2020-08-21t21-54-33-484226-00-00 from infima-etl-flows
    
    Traceback (most recent call last):
      File "/usr/local/bin/prefect", line 8, in <module>
        sys.exit(cli())
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 80, in cloud_flow
        raise exc
      File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 69, in cloud_flow
        flow = storage.get_flow(storage.flows[flow_data.name])
      File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/s3.py", line 101, in get_flow
        return cloudpickle.loads(output)
    TypeError: an integer is required (got type bytes)
    EDIT _ sorry I suppose I should have posted this issue thread on the prefect-community channel .. just noticed I am on the wrong channel
    (just reposted this last issue on the prefect-community channel)