Tom Klein
07/11/2022, 12:29 AMDaskExecutor
(that relies on a dask KubeCluster
) - are task Results handled in the flow, or is the Result handling delegated in the dask workers?
I'm asking because when we swap from a LocalExecutor
or a LocalDaskExecutor
to a DaskExecutor
- suddenly our S3Results (which all our tasks are configured with) seem to fail on AccessDenied
errors (for PutObject
attempts)
So logically it seems like they are being run from somewhere else that doesn't have the proper permissions (whereas the k8s job running the flow itself, does) --- am i missing something?Tom Klein
07/11/2022, 12:30 AMUnexpected error: ClientError('An error occurred (AccessDenied) when calling the PutObject operation: Access Denied')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 930, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 96, in write
raise err
File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 88, in write
self.client.upload_fileobj(
File "/usr/local/lib/python3.8/site-packages/boto3/s3/inject.py", line 636, in upload_fileobj
return future.result()
File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 103, in result
return self._coordinator.result()
File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 266, in result
raise self._exception
File "/usr/local/lib/python3.8/site-packages/s3transfer/tasks.py", line 139, in __call__
return self._execute_main(kwargs)
File "/usr/local/lib/python3.8/site-packages/s3transfer/tasks.py", line 162, in _execute_main
return_value = self._main(**kwargs)
File "/usr/local/lib/python3.8/site-packages/s3transfer/upload.py", line 758, in _main
client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args)
File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 508, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 915, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
Tom Klein
07/11/2022, 12:34 AMLocalExecutor
- it all works perfectly
in both cases we're using a KubernetesRun
with service account, an IAM role with all the necessary permissions, etc. so in both cases the flow runs the same way, and only the executor is differentAnna Geller
Anna Geller
Tom Klein
07/11/2022, 12:52 AMLocalExecutor
manages to push the results to S3 correctlyTom Klein
07/11/2022, 12:53 AMTom Klein
07/11/2022, 2:03 AMResult
config to its flow
⢠this by itself was not sufficient to reproduce the problem, since it worked fine
⢠so, i tried to make it slightly closer to my own flow, and add a single ShellTask
that does nothing but echo potato
⢠However, when i tried to change the flow, i got:
KeyError: 'Task slug do_some_bash-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
⢠this is due to the fact that your example relies on a GitHub storage
⢠so, i changed it to an S3 storage so i could freely change the flow code
⢠However, as soon as i did that - i started getting bizarre errors - even if I reverted my change left the flow code unchanged. the errors were for example like in the attached screenshotTom Klein
07/11/2022, 2:06 AMFROM prefecthq/prefect:latest-python3.8
RUN /usr/local/bin/python -m pip install --upgrade pip
RUN pip install prefect[github,aws,snowflake]
RUN pip install dask-kubernetes==2021.10.0 pandas
⢠use the following code:
from dask_kubernetes import KubeCluster, make_pod_spec
from prefect import task, Flow
from prefect.engine.results import S3Result
from prefect.executors import DaskExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import GitHub
from prefect.tasks.shell import ShellTask
FLOW_NAME = "github_kubernetes_run_ephemeral_dask"
@task
def extract() -> list:
return [1, 2, 3, 4, 5, 6]
@task
def transform(number: int) -> int:
return number * 2
@task()
def load(numbers: list) -> list:
return [i for i in numbers if i]
@task
def do_some_bash():
return 5
with Flow(
FLOW_NAME,
executor=DaskExecutor(
cluster_class=lambda: KubeCluster(make_pod_spec(image="annageller/prefect-dask-k8s:latest")),
adapt_kwargs={"minimum": 2, "maximum": 3},
),
run_config=KubernetesRun(
image="<REFERENCE_TO_IMAGE_HERE>",
image_pull_policy="Always"
),
) as flow:
nrs = extract()
tranformed_numbers = transform.map(nrs)
numbers_twice = transform.map(tranformed_numbers)
result = load(numbers=numbers_twice)
do_some_bash(upstream_tasks=[result])
and register it like so:
from prefect.storage import S3
from flows import anna
anna.flow.storage = S3(bucket="hb-prefect", stored_as_script=True, local_script_path='flows/anna.py')
anna.flow.register(project_name='poc', build=True, labels=["k8s-prod"])
Tom Klein
07/11/2022, 2:08 AMTom Klein
07/11/2022, 2:33 AMTom Klein
07/11/2022, 3:04 AMmake_pod_spec(image="annageller/prefect-dask-k8s:latest")
to make_pod_spec(image=prefect.context.image)
--- fixes the weird errors (assuming that thereās no Result
configured on the flow)
⢠This indicates to me that there is a fundamental difference between running a DaskExecutor with a Github storage vs an S3 storage --- one that isnāt immediately obvious or intuitive (to me at least)
⢠And, despite the above working ā adding an S3 Result reproduces the AccessDenied
issue that was mentioned above
so if you wanna reproduce the S3 AccessDenied
issue specifically, you can use this flow instantiation:
with Flow(
FLOW_NAME,
result=S3Result(bucket='hb-prefect',
location='runs/{date:%Y}/{date:%m}/{date:%d}/{flow_name}/{flow_run_name}/{task_name}_{task_run_id}'),
executor=DaskExecutor(
cluster_class=lambda: KubeCluster(make_pod_spec(image=prefect.context.image)),
adapt_kwargs={"minimum": 2, "maximum": 3},
),
run_config=KubernetesRun(
image="<INSERT IMG HERE>",
image_pull_policy="Always",
),
) as flow:
Even using annageller/prefect-dask-k8s:latest
as the KubernetesRun image still reproduces this error
only changing to LocalExecutor
eliminates the S3 AccessDenied
error.
This example is purely python - thereās no Node in it in any shape or form (and not even Shell tasks), and the Docker image is virtually identical to the one in your repo, except for the Prefect version (latest rather than 0.15.16)Tom Klein
07/11/2022, 10:24 AMGithub
) - but adding:
extra_pod_config={"serviceAccount": "<name of service account>"}
to the KubeCluster
solved the AccessDenied
This also shows that it does matter which Executor is being used, since the Result checkpointing is (evidently, as these experiments show) handled at the dask worker rather than on the Prefect job
i assumed that the dask workers are created with the same service account as the flow itselfā¦
in any case, is there a way to extract the name of the current service account of the pod the Prefect flow is running on? maybe something like prefect.context.kubernetes.pod.service_name
(i made this up but this is what iād love if was available)Anna Geller
but why would the S3 permissions matter if the only thing i changed between a successful and a failed run - is the executor?because permissions must be available on the execution layer and you are changing your execution layer
iām testing various features and use-cases, even if they donāt directly serve any immediate business goal in a direct wayin that case, add this line to your Dockerfile, this should solve your problem if you prefer that over IAM roles:
COPY ~/.aws /root/.aws
or even:
ENV AWS_ACCESS_KEY_ID=xxx
ENV AWS_SECRET_ACCESS_KEY=xxx
ENV AWS_DEFAULT_REGION=us-east-1
not recommended for production, but just to test things out and validate if Prefect works for you, this should help youTom Klein
07/11/2022, 10:50 AMResults are handled by the Result abstractions and work the same way regardless of the executor
this is only correct in a general sense, but in a practical sense it means they (the results) are handled on a completely different pod in k8s - depending on executor
we do prefer service roles, but our DevOps at least found no way to make the Dask workers āautomaticallyā get the desired service account attached without me having to explicitly provide the name of the service account in the flow itself (to the pod spec given to the dask KubeCluster
)
In other words -
⢠Automatic Service account binding for Prefect Kubernetes Agent ā
⢠Automatic service account binding for Prefect flows being run via a KubernetesRun - ā
⢠Automatic service account binding for dask workers of ephemerally created clusters - ā
Donāt ask me how our DevOps made the first two work because i have no clue š¤·āāļø
The last case only worked when i added it to my code - is there a way to at least derive the flowās podās service account at runtime (so it can be given to the dask KubeCluster
)?
Looking at the Prefect source code, it seemed that this data is sent to the Prefect logs but not being set on the prefect.context
Part of the testing is indeed testing it the way it should work in production (including service roles), thatās part of checking the fit š again, we donāt want to bypass this just to be able to make other things work, we want to make everything work and see where we run into difficulties (and where not)
This isnāt a POC, weāre already trying to use Prefect for production use-cases.Anna Geller
is there a way to at least derive the flowās podās service account at runtime (so it can be given to the daskthe service account name is static and doesn't change at runtime I thought you are considering switching to Prefect 2.0 given you were interested in the streaming use case - if so, I'm not sure whether it makes sense to further spend time on this, you invested a lot of time on this already and it seems to be a hard problem to solve with your infra and with 1.0. In 2.0 this will be easier thanks to Blocks and cleaner separation between flow runners, task runners, storage & results and packaging of the code)?KubeCluster
Anna Geller
Tom Klein
07/11/2022, 1:05 PMKubeCluster
(otherwise it simply doesnāt have the required service account) - we do need it at runtime (and iād rather not hardcode or rely on an entire yaml
spec for the pod - it but instead derive it from the service account with which the prefect flow is running (this is exactly the same logic that leads prefect Flows to derive their service account from the Agent that ran them --- at least based on what i saw in the Prefect source code: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/kubernetes/agent.py#L510)
the Prefect config for example also doesnāt change at run-time but itās still available via: prefect.config
i believe (or was it prefect.contex.config
)Tom Klein
07/11/2022, 1:05 PMAnna Geller
Anna Geller
Tom Klein
07/12/2022, 1:19 PMdask-kubernetes
dependency it has now works even with newer (st) versions of it - or notTom Klein
07/12/2022, 1:20 PMdask-kubernetes
as part of the dependencies of prefect-dask
(or, maybe a separate package like prefect-dask-kubernetes
) that will at least guarantee people wonāt have to fumble with this version jigsaw puzzle that i had to face,
or - better yet- integrate it into your tests, etc. so that you always know you are keeping up-to-date with the releases of dask-kubernetes
- and know when something breaks (as it evidently has)Anna Geller
Tom Klein
07/12/2022, 1:38 PMdask
themselves (or snowflake, or mongo, or anything else)
yet prefect-dask
for example depends on Daskās distributed
package, it doesnāt leave it as the userās responsibility to add distributed
themselves separately
what makes dask-kuberentes
different? why shouldnāt prefect also have a package that directly relies on it?
weāre not talking abut some exotic ML package that iām ābringing from homeā - itās a fundamental component of how Prefect is designed to work (i.e. integrate with Dask clusters)Anna Geller
Tom Klein
07/12/2022, 1:42 PMdask-kuberentes
later versions simply donāt work with Prefect (i donāt know due to whose change etc. or what change), how is that a python problem?Anna Geller