https://prefect.io logo
Title
t

Tom Klein

07/11/2022, 12:29 AM
Aaand -- me again 😳 with a related but completely different issue 🤷‍♂️ Question: when using a
DaskExecutor
(that relies on a dask
KubeCluster
) - are task Results handled in the flow, or is the Result handling delegated in the dask workers? I'm asking because when we swap from a
LocalExecutor
or a
LocalDaskExecutor
to a
DaskExecutor
- suddenly our S3Results (which all our tasks are configured with) seem to fail on
AccessDenied
errors (for
PutObject
attempts) So logically it seems like they are being run from somewhere else that doesn't have the proper permissions (whereas the k8s job running the flow itself, does) --- am i missing something?
1
For reference, this is the error we see:
Unexpected error: ClientError('An error occurred (AccessDenied) when calling the PutObject operation: Access Denied')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 930, in get_task_run_state
    result = self.result.write(value, **formatting_kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 96, in write
    raise err
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 88, in write
    self.client.upload_fileobj(
  File "/usr/local/lib/python3.8/site-packages/boto3/s3/inject.py", line 636, in upload_fileobj
    return future.result()
  File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 103, in result
    return self._coordinator.result()
  File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 266, in result
    raise self._exception
  File "/usr/local/lib/python3.8/site-packages/s3transfer/tasks.py", line 139, in __call__
    return self._execute_main(kwargs)
  File "/usr/local/lib/python3.8/site-packages/s3transfer/tasks.py", line 162, in _execute_main
    return_value = self._main(**kwargs)
  File "/usr/local/lib/python3.8/site-packages/s3transfer/upload.py", line 758, in _main
    client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args)
  File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 508, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 915, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
The moment i change the executor back to
LocalExecutor
- it all works perfectly in both cases we're using a
KubernetesRun
with service account, an IAM role with all the necessary permissions, etc. so in both cases the flow runs the same way, and only the executor is different
a

Anna Geller

07/11/2022, 12:46 AM
Could you: 1. share your flow code 2. explain why are you making that switch? Dask is usually helpful to parallelize Python, but you have Node applications triggered from a Kubernetes task
But let me answer the question directly: • Results are handled by the Result abstractions and work the same way regardless of the executor • The issue you see occurs due to lacking S3 permissions which you can solve using IAM Roles for Service Accounts
t

Tom Klein

07/11/2022, 12:52 AM
but why would the S3 permissions matter if the only thing i changed between a successful and a failed run - is the executor? obviously they have to be sufficient, if using a
LocalExecutor
manages to push the results to S3 correctly
anyway, i'll provide an example
@Anna Geller • in order to provide you with an example, i took the original git repo you provided a few hours ago - and tried to add a default
Result
config to its flow • this by itself was not sufficient to reproduce the problem, since it worked fine • so, i tried to make it slightly closer to my own flow, and add a single
ShellTask
that does nothing but
echo potato
• However, when i tried to change the flow, i got:
KeyError: 'Task slug do_some_bash-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
• this is due to the fact that your example relies on a GitHub storage • so, i changed it to an S3 storage so i could freely change the flow code • However, as soon as i did that - i started getting bizarre errors - even if I reverted my change left the flow code unchanged. the errors were for example like in the attached screenshot
in order to reproduce: • use an image with the following dockerfile:
FROM prefecthq/prefect:latest-python3.8
RUN /usr/local/bin/python -m pip install --upgrade pip
RUN pip install prefect[github,aws,snowflake]
RUN pip install dask-kubernetes==2021.10.0 pandas
• use the following code:
from dask_kubernetes import KubeCluster, make_pod_spec
from prefect import task, Flow
from prefect.engine.results import S3Result
from prefect.executors import DaskExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import GitHub
from prefect.tasks.shell import ShellTask

FLOW_NAME = "github_kubernetes_run_ephemeral_dask"



@task
def extract() -> list:
    return [1, 2, 3, 4, 5, 6]


@task
def transform(number: int) -> int:
    return number * 2


@task()
def load(numbers: list) -> list:
    return [i for i in numbers if i]

@task
def do_some_bash():
    return 5


with Flow(
        FLOW_NAME,
        executor=DaskExecutor(
            cluster_class=lambda: KubeCluster(make_pod_spec(image="annageller/prefect-dask-k8s:latest")),
            adapt_kwargs={"minimum": 2, "maximum": 3},
        ),
        run_config=KubernetesRun(
            image="<REFERENCE_TO_IMAGE_HERE>",
            image_pull_policy="Always"
        ),
) as flow:
    nrs = extract()
    tranformed_numbers = transform.map(nrs)
    numbers_twice = transform.map(tranformed_numbers)
    result = load(numbers=numbers_twice)
    do_some_bash(upstream_tasks=[result])
and register it like so:
from prefect.storage import S3
from flows import anna


anna.flow.storage = S3(bucket="hb-prefect", stored_as_script=True, local_script_path='flows/anna.py')
anna.flow.register(project_name='poc', build=True, labels=["k8s-prod"])
it certainly might be related to the other git issue i already opened, though i’m not sure why the Storage of all things would make such a dramatic difference then - between “working” and “not-working” (for the same exact code, with the same image)
and to answer you previous question labeled (2) : • Regardless of this specific use-case (Node, etc.), we’re also trying out prefect and trying to understand if choosing it over Airflow (for example) was a correct decision. Among other things, i’m testing various features and use-cases, even if they don’t directly serve any immediate business goal in a direct way. • I suggest we separate the two topics (“is this the right approach for the use-case?” and “is there a technical issue here or a misconfiguration?“) from each-other - because talking about them at the same time just makes the discussion more complex. • it might be true that doing things in a completely different way would “bypass” the problem entirely from the get-go, but solving the business problem at hand ISN’T the only thing i’m interested in. I’m ALSO interested in testing out the platform to see what works and what doesn’t including possibly executing shell tasks, or using custom images - on a dask cluster (for that matter). If that’s officially not supported, then fine - but i didn’t get the impression it isn’t supported. The docs & other marketing materials made it seem like the task Executor is agnostic to what tasks it runs (and certain custom images were shown as a feature) • In other words, when i run into technical difficulties (that are at least in principle likely for me to run into again in one form or another on other opportunities) - it is crucial to me to understand if it’s due to a bug or a limitation of the platform, or due to my own mistakes or mis-use. Working around problems isn’t the high priority here.
and a few remarks about the above code: • Changing
make_pod_spec(image="annageller/prefect-dask-k8s:latest")
to
make_pod_spec(image=prefect.context.image)
--- fixes the weird errors (assuming that there’s no
Result
configured on the flow) • This indicates to me that there is a fundamental difference between running a DaskExecutor with a Github storage vs an S3 storage --- one that isn’t immediately obvious or intuitive (to me at least) • And, despite the above working — adding an S3 Result reproduces the
AccessDenied
issue that was mentioned above so if you wanna reproduce the S3
AccessDenied
issue specifically, you can use this flow instantiation:
with Flow(
        FLOW_NAME,
        result=S3Result(bucket='hb-prefect',
                        location='runs/{date:%Y}/{date:%m}/{date:%d}/{flow_name}/{flow_run_name}/{task_name}_{task_run_id}'),
        executor=DaskExecutor(
            cluster_class=lambda: KubeCluster(make_pod_spec(image=prefect.context.image)),
            adapt_kwargs={"minimum": 2, "maximum": 3},
        ),
        run_config=KubernetesRun(
            image="<INSERT IMG HERE>",
            image_pull_policy="Always",
        ),
) as flow:
Even using
annageller/prefect-dask-k8s:latest
as the KubernetesRun image still reproduces this error only changing to
LocalExecutor
eliminates the S3
AccessDenied
error. This example is purely python - there’s no Node in it in any shape or form (and not even Shell tasks), and the Docker image is virtually identical to the one in your repo, except for the Prefect version (latest rather than 0.15.16)
Update: i have no idea how it relates to the storage (and why this was not needed when the storage was
Github
) - but adding:
extra_pod_config={"serviceAccount": "<name of service account>"}
to the
KubeCluster
solved the
AccessDenied
This also shows that it does matter which Executor is being used, since the Result checkpointing is (evidently, as these experiments show) handled at the dask worker rather than on the Prefect job i assumed that the dask workers are created with the same service account as the flow itself… in any case, is there a way to extract the name of the current service account of the pod the Prefect flow is running on? maybe something like
prefect.context.kubernetes.pod.service_name
(i made this up but this is what i’d love if was available)
a

Anna Geller

07/11/2022, 10:41 AM
but why would the S3 permissions matter if the only thing i changed between a successful and a failed run - is the executor?
because permissions must be available on the execution layer and you are changing your execution layer
i’m testing various features and use-cases, even if they don’t directly serve any immediate business goal in a direct way
in that case, add this line to your Dockerfile, this should solve your problem if you prefer that over IAM roles:
COPY ~/.aws /root/.aws
or even:
ENV AWS_ACCESS_KEY_ID=xxx
ENV AWS_SECRET_ACCESS_KEY=xxx
ENV AWS_DEFAULT_REGION=us-east-1
not recommended for production, but just to test things out and validate if Prefect works for you, this should help you
t

Tom Klein

07/11/2022, 10:50 AM
ok but that was my first question 🙂 if changing the execution matters than it definitely does matter which executor we use - in other words:
Results are handled by the Result abstractions and work the same way regardless of the executor
this is only correct in a general sense, but in a practical sense it means they (the results) are handled on a completely different pod in k8s - depending on executor we do prefer service roles, but our DevOps at least found no way to make the Dask workers “automatically” get the desired service account attached without me having to explicitly provide the name of the service account in the flow itself (to the pod spec given to the dask
KubeCluster
) In other words - • Automatic Service account binding for Prefect Kubernetes Agent • Automatic service account binding for Prefect flows being run via a KubernetesRun - • Automatic service account binding for dask workers of ephemerally created clusters - Don’t ask me how our DevOps made the first two work because i have no clue 🤷‍♂️ The last case only worked when i added it to my code - is there a way to at least derive the flow’s pod’s service account at runtime (so it can be given to the dask
KubeCluster
)? Looking at the Prefect source code, it seemed that this data is sent to the Prefect logs but not being set on the
prefect.context
Part of the testing is indeed testing it the way it should work in production (including service roles), that’s part of checking the fit 🙂 again, we don’t want to bypass this just to be able to make other things work, we want to make everything work and see where we run into difficulties (and where not) This isn’t a POC, we’re already trying to use Prefect for production use-cases.
a

Anna Geller

07/11/2022, 12:55 PM
any execution layer (your local machine, Kubernetes, Dask on Kubernetes, ...) must have the credentials required to use external systems e.g. credentials to store Results in S3 or pull Docker image
is there a way to at least derive the flow’s pod’s service account at runtime (so it can be given to the dask
KubeCluster
)?
the service account name is static and doesn't change at runtime I thought you are considering switching to Prefect 2.0 given you were interested in the streaming use case - if so, I'm not sure whether it makes sense to further spend time on this, you invested a lot of time on this already and it seems to be a hard problem to solve with your infra and with 1.0. In 2.0 this will be easier thanks to Blocks and cleaner separation between flow runners, task runners, storage & results and packaging of the code
plus, I think you may parallelize your work (in your use case) with ConcurrentTaskRunner which will work out of the box without dealing with Dask and Kubernetes complexity
t

Tom Klein

07/11/2022, 1:05 PM
@Anna Geller Re: the service account - it’s true that it doesn’t change in runtime but because we are passing it as an argument for the
KubeCluster
(otherwise it simply doesn’t have the required service account) - we do need it at runtime (and i’d rather not hardcode or rely on an entire
yaml
spec for the pod - it but instead derive it from the service account with which the prefect flow is running (this is exactly the same logic that leads prefect Flows to derive their service account from the Agent that ran them --- at least based on what i saw in the Prefect source code: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/kubernetes/agent.py#L510) the Prefect config for example also doesn’t change at run-time but it’s still available via:
prefect.config
i believe (or was it
prefect.contex.config
)
Re: 2.0 -- that’s a fair point, the main reason i’m trying to make sure this works well (including parallelization) is that we have a very big task coming up of processing hundreds of thousands of rows of data (each one of them leading to a distinct enrichment “sub-task” that includes a lot of net I/O, and processing logic) so we don’t practically have the luxury of waiting for 2.0 for this (despite it sounding great and like a major leap forward) - when the time horizon for this big run is within a week or two anyway, the entire flow works now (including running the NodeJS script in the dask workers successfully and with checkpointing to S3) - the only issue i have now is some bizarre case where there seems to be a mixup in the input/output of one of multiple tasks this is however a completely separate issue so lmk if you want me to open a new thread for it except for a weird edge-case that i’m still investigating (happened in 1 out of 6 dask tasks, probably not related to prefect) 🙂
a

Anna Geller

07/11/2022, 7:47 PM
I'm glad you figured it out, thanks for the update
💯 1
@Tom Klein here is a user-contributed example for 2.0 https://github.com/tekumara/orion-demo/blob/main/flows/dask_kubes_flow.py
t

Tom Klein

07/12/2022, 1:19 PM
@Anna Geller thanks - the main Q i think (which i didn’t raise in the github issue to not go off-topic there) is whether this
dask-kubernetes
dependency it has now works even with newer (st) versions of it - or not
if it still only works up to 2022.5 then i think it’s still a problem, because newer versions might contain security fixes, or other important updates, and i don’t think if your platform directly integrates with it - it’s a good idea to “go out of sync” like that… why actually not have
dask-kubernetes
as part of the dependencies of
prefect-dask
(or, maybe a separate package like
prefect-dask-kubernetes
) that will at least guarantee people won’t have to fumble with this version jigsaw puzzle that i had to face, or - better yet- integrate it into your tests, etc. so that you always know you are keeping up-to-date with the releases of
dask-kubernetes
- and know when something breaks (as it evidently has)
a

Anna Geller

07/12/2022, 1:33 PM
generally speaking, dependency management is something you would need to take care of based on your infra/Python version etc - I only wanted to share the example because we've just received it and you asked about it before
t

Tom Klein

07/12/2022, 1:38 PM
i understand, im just using this opportunity to comment on the subject 🙂 you could say the same thing about dask itself, that users should manage the dependency on
dask
themselves (or snowflake, or mongo, or anything else) yet
prefect-dask
for example depends on Dask’s
distributed
package, it doesn’t leave it as the user’s responsibility to add
distributed
themselves separately what makes
dask-kuberentes
different? why shouldn’t prefect also have a package that directly relies on it? we’re not talking abut some exotic ML package that i’m “bringing from home” - it’s a fundamental component of how Prefect is designed to work (i.e. integrate with Dask clusters)
a

Anna Geller

07/12/2022, 1:41 PM
I understand the pain here but it's more of a Python problem than Prefect problem
t

Tom Klein

07/12/2022, 1:42 PM
why?
dask-kuberentes
later versions simply don’t work with Prefect (i don’t know due to whose change etc. or what change), how is that a python problem?
a

Anna Geller

07/12/2022, 1:43 PM
sorry, I have to run, I said everything I know about the subject. Perhaps you can ask on Dask community given it's issue with dask-kubernetes? https://dask.discourse.group/
👍 1