https://prefect.io logo
Title
m

Marwan Sarieddine

08/21/2020, 10:38 PM
Hi folks, moving my issue post to the prefect-community channel. I am trying the S3 flow storage option - using a kubernetes agent and a DaskKubernetesEnvironment execution environment. I was facing aws s3 authentication issues but thanks to Josh - I got them resolved. I am now facing a new issue which is being raised by the prefect-job pod before the flow ever gets to a running state ... Here is the error I am getting:
[5:57 PM] $ kubectl logs pod/prefect-job-e74114b4-26n6v 
[2020-08-21 21:55:08] INFO - prefect.S3 | Downloading simple-flow-8/2020-08-21t21-54-33-484226-00-00 from infima-etl-flows
Traceback (most recent call last):
  File "/usr/local/bin/prefect", line 8, in <module>
    sys.exit(cli())
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 80, in cloud_flow
    raise exc
  File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 69, in cloud_flow
    flow = storage.get_flow(storage.flows[flow_data.name])
  File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/s3.py", line 101, in get_flow
    return cloudpickle.loads(output)
TypeError: an integer is required (got type bytes)
I am wondering if anyone else faced something similar
a

Alex Papanicolaou

08/22/2020, 1:07 AM
@Marwan Sarieddine and I figured out what the issue is with this. Our flow was developed with python 3.7 and as you can see in the traceback, prefect is is running with python3.8. We detected this as a result of the K8s job using the image
prefecthq/prefect:all_extras-0.13.3
. We tried to figure out why this image was being used and we found that it comes from
DaskKubernetesEnvironment.execute
. Here is what happens: 1. Inside
.execute
, Prefect needs to build a spec for the job and it uses the scheduler spec as a base. The function
prefect.utilities.storage.get_flow_image
pulls an image name from
flow.environment.metadata
with the intention of overriding the image in the scheduler spec if an image in the metadata is detected. This is where
prefecthq/prefect:all_extras-0.13.3
comes from but we had no idea how it was set since we don’t do anything of the sort. 2. We detected that
flow.environment.metadata
was updated to the prefect all_extras image after calling
flow.register
and traced it to here: https://github.com/PrefectHQ/prefect/blob/389eabd68238edda57c23f58c5907bc0edd6c443/src/prefect/client/client.py#L736-746 The upshot is this: because we are using S3 storage instead of Docker storage, Prefect assumes there’s isn’t a worthy image in the scheduler spec and it must be overridden. And since we are using 3.7, the conflict with the all_extras image causes that cloudpickle error. To solve this, we have to explicitly pass
{"image": image_name}"
to the
metadata
argument in
DaskKubernetesEnvironment
https://github.com/PrefectHQ/prefect/blob/0.13.3/src/prefect/environments/execution/dask/k8s.py#L72 One question is, should the default behavior change? I think I get why the scheduler’s image is overridden: there’s very little guarantee on the image for the scheduler that it would even have the dependencies for Prefect or the flow code. Maybe it would be better to fall back on the worker’s image? Or perhaps a more explicit argument could be added to
DaskKubernetesEnvironment
about what image the K8s job should use? As it currently is written, I don’t know how one could add documentation to such a niche thing that is a bit convoluted.
Hey @josh, I think the issue with the environment variables is related to this issue with the K8s job image. https://prefect-community.slack.com/archives/C0106HZ1CMS/p1598046158030600?thread_ts=1598041768.027200&amp;cid=C0106HZ1CMS In
DaskKubernetesEnvironment._populate_scheduler_spec_yaml
, the environment variables in the scheduler spec are completely wiped out here: https://github.com/PrefectHQ/prefect/blob/389eabd68238edda57c23f58c5907bc0edd6c443/src/prefect/environments/execution/dask/k8s.py#L501-L503
Oof. I read
if not env:
wrong. We’ll keep exploring this to figure this out.