Tom Klein

    Tom Klein

    2 months ago
    Aaand -- me again 😳 with a related but completely different issue 🤷‍♂️ Question: when using a
    DaskExecutor
    (that relies on a dask
    KubeCluster
    ) - are task Results handled in the flow, or is the Result handling delegated in the dask workers? I'm asking because when we swap from a
    LocalExecutor
    or a
    LocalDaskExecutor
    to a
    DaskExecutor
    - suddenly our S3Results (which all our tasks are configured with) seem to fail on
    AccessDenied
    errors (for
    PutObject
    attempts) So logically it seems like they are being run from somewhere else that doesn't have the proper permissions (whereas the k8s job running the flow itself, does) --- am i missing something?
    For reference, this is the error we see:
    Unexpected error: ClientError('An error occurred (AccessDenied) when calling the PutObject operation: Access Denied')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 930, in get_task_run_state
        result = self.result.write(value, **formatting_kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 96, in write
        raise err
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 88, in write
        self.client.upload_fileobj(
      File "/usr/local/lib/python3.8/site-packages/boto3/s3/inject.py", line 636, in upload_fileobj
        return future.result()
      File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 103, in result
        return self._coordinator.result()
      File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 266, in result
        raise self._exception
      File "/usr/local/lib/python3.8/site-packages/s3transfer/tasks.py", line 139, in __call__
        return self._execute_main(kwargs)
      File "/usr/local/lib/python3.8/site-packages/s3transfer/tasks.py", line 162, in _execute_main
        return_value = self._main(**kwargs)
      File "/usr/local/lib/python3.8/site-packages/s3transfer/upload.py", line 758, in _main
        client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args)
      File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 508, in _api_call
        return self._make_api_call(operation_name, kwargs)
      File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 915, in _make_api_call
        raise error_class(parsed_response, operation_name)
    botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
    The moment i change the executor back to
    LocalExecutor
    - it all works perfectly in both cases we're using a
    KubernetesRun
    with service account, an IAM role with all the necessary permissions, etc. so in both cases the flow runs the same way, and only the executor is different
    Anna Geller

    Anna Geller

    2 months ago
    Could you:1. share your flow code 2. explain why are you making that switch? Dask is usually helpful to parallelize Python, but you have Node applications triggered from a Kubernetes task
    But let me answer the question directly: • Results are handled by the Result abstractions and work the same way regardless of the executor • The issue you see occurs due to lacking S3 permissions which you can solve using IAM Roles for Service Accounts
    Tom Klein

    Tom Klein

    2 months ago
    but why would the S3 permissions matter if the only thing i changed between a successful and a failed run - is the executor? obviously they have to be sufficient, if using a
    LocalExecutor
    manages to push the results to S3 correctly
    anyway, i'll provide an example
    @Anna Geller • in order to provide you with an example, i took the original git repo you provided a few hours ago - and tried to add a default
    Result
    config to its flow • this by itself was not sufficient to reproduce the problem, since it worked fine • so, i tried to make it slightly closer to my own flow, and add a single
    ShellTask
    that does nothing but
    echo potato
    • However, when i tried to change the flow, i got:
    KeyError: 'Task slug do_some_bash-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
    • this is due to the fact that your example relies on a GitHub storage • so, i changed it to an S3 storage so i could freely change the flow code • However, as soon as i did that - i started getting bizarre errors - even if I reverted my change left the flow code unchanged. the errors were for example like in the attached screenshot
    in order to reproduce: • use an image with the following dockerfile:
    FROM prefecthq/prefect:latest-python3.8
    RUN /usr/local/bin/python -m pip install --upgrade pip
    RUN pip install prefect[github,aws,snowflake]
    RUN pip install dask-kubernetes==2021.10.0 pandas
    • use the following code:
    from dask_kubernetes import KubeCluster, make_pod_spec
    from prefect import task, Flow
    from prefect.engine.results import S3Result
    from prefect.executors import DaskExecutor
    from prefect.run_configs import KubernetesRun
    from prefect.storage import GitHub
    from prefect.tasks.shell import ShellTask
    
    FLOW_NAME = "github_kubernetes_run_ephemeral_dask"
    
    
    
    @task
    def extract() -> list:
        return [1, 2, 3, 4, 5, 6]
    
    
    @task
    def transform(number: int) -> int:
        return number * 2
    
    
    @task()
    def load(numbers: list) -> list:
        return [i for i in numbers if i]
    
    @task
    def do_some_bash():
        return 5
    
    
    with Flow(
            FLOW_NAME,
            executor=DaskExecutor(
                cluster_class=lambda: KubeCluster(make_pod_spec(image="annageller/prefect-dask-k8s:latest")),
                adapt_kwargs={"minimum": 2, "maximum": 3},
            ),
            run_config=KubernetesRun(
                image="<REFERENCE_TO_IMAGE_HERE>",
                image_pull_policy="Always"
            ),
    ) as flow:
        nrs = extract()
        tranformed_numbers = transform.map(nrs)
        numbers_twice = transform.map(tranformed_numbers)
        result = load(numbers=numbers_twice)
        do_some_bash(upstream_tasks=[result])
    and register it like so:
    from prefect.storage import S3
    from flows import anna
    
    
    anna.flow.storage = S3(bucket="hb-prefect", stored_as_script=True, local_script_path='flows/anna.py')
    anna.flow.register(project_name='poc', build=True, labels=["k8s-prod"])
    it certainly might be related to the other git issue i already opened, though i’m not sure why the Storage of all things would make such a dramatic difference then - between “working” and “not-working” (for the same exact code, with the same image)
    and to answer you previous question labeled (2) : • Regardless of this specific use-case (Node, etc.), we’re also trying out prefect and trying to understand if choosing it over Airflow (for example) was a correct decision. Among other things, i’m testing various features and use-cases, even if they don’t directly serve any immediate business goal in a direct way. • I suggest we separate the two topics (“is this the right approach for the use-case?” and “is there a technical issue here or a misconfiguration?“) from each-other - because talking about them at the same time just makes the discussion more complex. • it might be true that doing things in a completely different way would “bypass” the problem entirely from the get-go, but solving the business problem at hand ISN’T the only thing i’m interested in. I’m ALSO interested in testing out the platform to see what works and what doesn’t including possibly executing shell tasks, or using custom images - on a dask cluster (for that matter). If that’s officially not supported, then fine - but i didn’t get the impression it isn’t supported. The docs & other marketing materials made it seem like the task Executor is agnostic to what tasks it runs (and certain custom images were shown as a feature) • In other words, when i run into technical difficulties (that are at least in principle likely for me to run into again in one form or another on other opportunities) - it is crucial to me to understand if it’s due to a bug or a limitation of the platform, or due to my own mistakes or mis-use. Working around problems isn’t the high priority here.
    and a few remarks about the above code: • Changing
    make_pod_spec(image="annageller/prefect-dask-k8s:latest")
    to
    make_pod_spec(image=prefect.context.image)
    --- fixes the weird errors (assuming that there’s no
    Result
    configured on the flow) • This indicates to me that there is a fundamental difference between running a DaskExecutor with a Github storage vs an S3 storage --- one that isn’t immediately obvious or intuitive (to me at least) • And, despite the above working — adding an S3 Result reproduces the
    AccessDenied
    issue that was mentioned above so if you wanna reproduce the S3
    AccessDenied
    issue specifically, you can use this flow instantiation:
    with Flow(
            FLOW_NAME,
            result=S3Result(bucket='hb-prefect',
                            location='runs/{date:%Y}/{date:%m}/{date:%d}/{flow_name}/{flow_run_name}/{task_name}_{task_run_id}'),
            executor=DaskExecutor(
                cluster_class=lambda: KubeCluster(make_pod_spec(image=prefect.context.image)),
                adapt_kwargs={"minimum": 2, "maximum": 3},
            ),
            run_config=KubernetesRun(
                image="<INSERT IMG HERE>",
                image_pull_policy="Always",
            ),
    ) as flow:
    Even using
    annageller/prefect-dask-k8s:latest
    as the KubernetesRun image still reproduces this error only changing to
    LocalExecutor
    eliminates the S3
    AccessDenied
    error. This example is purely python - there’s no Node in it in any shape or form (and not even Shell tasks), and the Docker image is virtually identical to the one in your repo, except for the Prefect version (latest rather than 0.15.16)
    Update: i have no idea how it relates to the storage (and why this was not needed when the storage was
    Github
    ) - but adding:
    extra_pod_config={"serviceAccount": "<name of service account>"}
    to the
    KubeCluster
    solved the
    AccessDenied
    This also shows that it does matter which Executor is being used, since the Result checkpointing is (evidently, as these experiments show) handled at the dask worker rather than on the Prefect job i assumed that the dask workers are created with the same service account as the flow itself… in any case, is there a way to extract the name of the current service account of the pod the Prefect flow is running on? maybe something like
    prefect.context.kubernetes.pod.service_name
    (i made this up but this is what i’d love if was available)
    Anna Geller

    Anna Geller

    2 months ago
    but why would the S3 permissions matter if the only thing i changed between a successful and a failed run - is the executor?
    because permissions must be available on the execution layer and you are changing your execution layer
    i’m testing various features and use-cases, even if they don’t directly serve any immediate business goal in a direct way
    in that case, add this line to your Dockerfile, this should solve your problem if you prefer that over IAM roles:
    COPY ~/.aws /root/.aws
    or even:
    ENV AWS_ACCESS_KEY_ID=xxx
    ENV AWS_SECRET_ACCESS_KEY=xxx
    ENV AWS_DEFAULT_REGION=us-east-1
    not recommended for production, but just to test things out and validate if Prefect works for you, this should help you
    Tom Klein

    Tom Klein

    2 months ago
    ok but that was my first question 🙂 if changing the execution matters than it definitely does matter which executor we use - in other words:
    Results are handled by the Result abstractions and work the same way regardless of the executor
    this is only correct in a general sense, but in a practical sense it means they (the results) are handled on a completely different pod in k8s - depending on executor we do prefer service roles, but our DevOps at least found no way to make the Dask workers “automatically” get the desired service account attached without me having to explicitly provide the name of the service account in the flow itself (to the pod spec given to the dask
    KubeCluster
    ) In other words - • Automatic Service account binding for Prefect Kubernetes Agent • Automatic service account binding for Prefect flows being run via a KubernetesRun - • Automatic service account binding for dask workers of ephemerally created clusters - Don’t ask me how our DevOps made the first two work because i have no clue 🤷‍♂️ The last case only worked when i added it to my code - is there a way to at least derive the flow’s pod’s service account at runtime (so it can be given to the dask
    KubeCluster
    )? Looking at the Prefect source code, it seemed that this data is sent to the Prefect logs but not being set on the
    prefect.context
    Part of the testing is indeed testing it the way it should work in production (including service roles), that’s part of checking the fit 🙂 again, we don’t want to bypass this just to be able to make other things work, we want to make everything work and see where we run into difficulties (and where not) This isn’t a POC, we’re already trying to use Prefect for production use-cases.
    Anna Geller

    Anna Geller

    2 months ago
    any execution layer (your local machine, Kubernetes, Dask on Kubernetes, ...) must have the credentials required to use external systems e.g. credentials to store Results in S3 or pull Docker image
    is there a way to at least derive the flow’s pod’s service account at runtime (so it can be given to the dask
    KubeCluster
    )?
    the service account name is static and doesn't change at runtime I thought you are considering switching to Prefect 2.0 given you were interested in the streaming use case - if so, I'm not sure whether it makes sense to further spend time on this, you invested a lot of time on this already and it seems to be a hard problem to solve with your infra and with 1.0. In 2.0 this will be easier thanks to Blocks and cleaner separation between flow runners, task runners, storage & results and packaging of the code
    plus, I think you may parallelize your work (in your use case) with ConcurrentTaskRunner which will work out of the box without dealing with Dask and Kubernetes complexity
    Tom Klein

    Tom Klein

    2 months ago
    @Anna Geller Re: the service account - it’s true that it doesn’t change in runtime but because we are passing it as an argument for the
    KubeCluster
    (otherwise it simply doesn’t have the required service account) - we do need it at runtime (and i’d rather not hardcode or rely on an entire
    yaml
    spec for the pod - it but instead derive it from the service account with which the prefect flow is running (this is exactly the same logic that leads prefect Flows to derive their service account from the Agent that ran them --- at least based on what i saw in the Prefect source code: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/kubernetes/agent.py#L510) the Prefect config for example also doesn’t change at run-time but it’s still available via:
    prefect.config
    i believe (or was it
    prefect.contex.config
    )
    Re: 2.0 -- that’s a fair point, the main reason i’m trying to make sure this works well (including parallelization) is that we have a very big task coming up of processing hundreds of thousands of rows of data (each one of them leading to a distinct enrichment “sub-task” that includes a lot of net I/O, and processing logic) so we don’t practically have the luxury of waiting for 2.0 for this (despite it sounding great and like a major leap forward) - when the time horizon for this big run is within a week or two anyway, the entire flow works now (including running the NodeJS script in the dask workers successfully and with checkpointing to S3) -the only issue i have now is some bizarre case where there seems to be a mixup in the input/output of one of multiple tasks this is however a completely separate issue so lmk if you want me to open a new thread for it except for a weird edge-case that i’m still investigating (happened in 1 out of 6 dask tasks, probably not related to prefect) 🙂
    Anna Geller

    Anna Geller

    2 months ago
    I'm glad you figured it out, thanks for the update
    @Tom Klein here is a user-contributed example for 2.0 https://github.com/tekumara/orion-demo/blob/main/flows/dask_kubes_flow.py
    Tom Klein

    Tom Klein

    2 months ago
    @Anna Geller thanks - the main Q i think (which i didn’t raise in the github issue to not go off-topic there) is whether this
    dask-kubernetes
    dependency it has now works even with newer (st) versions of it - or not
    if it still only works up to 2022.5 then i think it’s still a problem, because newer versions might contain security fixes, or other important updates, and i don’t think if your platform directly integrates with it - it’s a good idea to “go out of sync” like that… why actually not have
    dask-kubernetes
    as part of the dependencies of
    prefect-dask
    (or, maybe a separate package like
    prefect-dask-kubernetes
    ) that will at least guarantee people won’t have to fumble with this version jigsaw puzzle that i had to face, or - better yet- integrate it into your tests, etc. so that you always know you are keeping up-to-date with the releases of
    dask-kubernetes
    - and know when something breaks (as it evidently has)
    Anna Geller

    Anna Geller

    2 months ago
    generally speaking, dependency management is something you would need to take care of based on your infra/Python version etc - I only wanted to share the example because we've just received it and you asked about it before
    Tom Klein

    Tom Klein

    2 months ago
    i understand, im just using this opportunity to comment on the subject 🙂 you could say the same thing about dask itself, that users should manage the dependency on
    dask
    themselves (or snowflake, or mongo, or anything else) yet
    prefect-dask
    for example depends on Dask’s
    distributed
    package, it doesn’t leave it as the user’s responsibility to add
    distributed
    themselves separately what makes
    dask-kuberentes
    different? why shouldn’t prefect also have a package that directly relies on it? we’re not talking abut some exotic ML package that i’m “bringing from home” - it’s a fundamental component of how Prefect is designed to work (i.e. integrate with Dask clusters)
    Anna Geller

    Anna Geller

    2 months ago
    I understand the pain here but it's more of a Python problem than Prefect problem
    Tom Klein

    Tom Klein

    2 months ago
    why?
    dask-kuberentes
    later versions simply don’t work with Prefect (i don’t know due to whose change etc. or what change), how is that a python problem?
    Anna Geller

    Anna Geller

    2 months ago
    sorry, I have to run, I said everything I know about the subject. Perhaps you can ask on Dask community given it's issue with dask-kubernetes? https://dask.discourse.group/