Marwan Sarieddine
08/21/2020, 10:38 PM[5:57 PM] $ kubectl logs pod/prefect-job-e74114b4-26n6v
[2020-08-21 21:55:08] INFO - prefect.S3 | Downloading simple-flow-8/2020-08-21t21-54-33-484226-00-00 from infima-etl-flows
Traceback (most recent call last):
File "/usr/local/bin/prefect", line 8, in <module>
sys.exit(cli())
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 80, in cloud_flow
raise exc
File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 69, in cloud_flow
flow = storage.get_flow(storage.flows[flow_data.name])
File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/s3.py", line 101, in get_flow
return cloudpickle.loads(output)
TypeError: an integer is required (got type bytes)
I am wondering if anyone else faced something similarAlex Papanicolaou
08/22/2020, 1:07 AMprefecthq/prefect:all_extras-0.13.3
. We tried to figure out why this image was being used and we found that it comes from DaskKubernetesEnvironment.execute
. Here is what happens:
1. Inside .execute
, Prefect needs to build a spec for the job and it uses the scheduler spec as a base. The function prefect.utilities.storage.get_flow_image
pulls an image name from flow.environment.metadata
with the intention of overriding the image in the scheduler spec if an image in the metadata is detected. This is where prefecthq/prefect:all_extras-0.13.3
comes from but we had no idea how it was set since we don’t do anything of the sort.
2. We detected that flow.environment.metadata
was updated to the prefect all_extras image after calling flow.register
and traced it to here:
https://github.com/PrefectHQ/prefect/blob/389eabd68238edda57c23f58c5907bc0edd6c443/src/prefect/client/client.py#L736-746
The upshot is this: because we are using S3 storage instead of Docker storage, Prefect assumes there’s isn’t a worthy image in the scheduler spec and it must be overridden. And since we are using 3.7, the conflict with the all_extras image causes that cloudpickle error.
To solve this, we have to explicitly pass {"image": image_name}"
to the metadata
argument in DaskKubernetesEnvironment
https://github.com/PrefectHQ/prefect/blob/0.13.3/src/prefect/environments/execution/dask/k8s.py#L72
One question is, should the default behavior change? I think I get why the scheduler’s image is overridden: there’s very little guarantee on the image for the scheduler that it would even have the dependencies for Prefect or the flow code. Maybe it would be better to fall back on the worker’s image? Or perhaps a more explicit argument could be added to DaskKubernetesEnvironment
about what image the K8s job should use?
As it currently is written, I don’t know how one could add documentation to such a niche thing that is a bit convoluted.DaskKubernetesEnvironment._populate_scheduler_spec_yaml
, the environment variables in the scheduler spec are completely wiped out here: https://github.com/PrefectHQ/prefect/blob/389eabd68238edda57c23f58c5907bc0edd6c443/src/prefect/environments/execution/dask/k8s.py#L501-L503if not env:
wrong. We’ll keep exploring this to figure this out.