<@ULVA73B9P> I want to use Dask Task Runner and re...
# ask-marvin
t
@Marvin I want to use Dask Task Runner and results storage at the same time. When I put them together like this:
Copy code
@flow(task_runner=DaskTaskRunner(), result_storage='s3-bucket/minio-results')
It says 'KeyError("No class found for dispatch key \'s3-bucket\' in registry for type \'WritableFileSystem\'.")' If i remove one of the arguments, it works as expected.
m
It looks like you're encountering an issue related to the configuration of result storage with the Dask Task Runner in Prefect. However, to assist you accurately, I need to know which version of Prefect you're using, as the syntax and features can be version-specific. Could you please confirm whether you are using Prefect 2.x or 3.x? If you're not sure, let me know and I can assist in determining that.
t
Version: 3.0.10 API version: 0.8.4 Python version: 3.10.10 Git commit: 3aa2d893 Built: Tue, Oct 15, 2024 1:31 PM OS/Arch: darwin/arm64 Profile: pd-flow-local Server type: server Pydantic version: 2.9.2 Integrations: prefect-aws: 0.5.1 prefect-email: 0.4.0 prefect-dask: 0.3.1 @Marvin
m
The issue you're encountering seems to be related to the improper configuration of the
result_storage
when using the
DaskTaskRunner
. The error message indicates that Prefect is unable to find the appropriate class for the given storage key
's3-bucket'
in its registry. Here's how you can resolve this issue: 1. Define the Result Storage Properly: When specifying
result_storage
, ensure that it correctly configures an S3 bucket with credentials. Instead of a simple string, you should create a Prefect
S3Bucket
block that includes your AWS credentials. 2. Block Configuration: Use Prefect's block creation for S3 storage. You'll need to configure your S3 bucket using the
S3Bucket
class from
prefect_aws
. Here's a general example based on the correct structure:
Copy code
python
   from prefect_aws import S3Bucket
   from prefect_aws.credentials import AWSCredentials

   my_s3_bucket = S3Bucket(
       bucket_name="minio-results",
       credentials=AWSCredentials(
           aws_access_key_id='your-access-key-id',
           aws_secret_access_key='your-secret-access-key'
       )
   )

   @flow(task_runner=DaskTaskRunner(), result_storage=my_s3_bucket)
   def my_flow():
       # Your tasks and flow logic here
3. GitHub Issue Insight: This issue seems to be somewhat common, as reflected in a GitHub issue, where users experienced similar problems when using
result_storage
with parallel task execution. Ensure your S3 configuration block is correctly set up as Prefect may not automatically interpret simple shorthand (like
's3-bucket/minio-results'
) without explicitly defining and linking the necessary AWS credentials. For more details on setting up result persistence, you can explore Prefect's documentation on results and Dask integration guide. If you continue to face challenges, you can look into updating or checking your Prefect and
prefect-aws
package versions for any available patches or updates.
t
@Marvin If I switch to
Copy code
S3Bucket.load('minio-results')
I get another error: "1 validation error for EngineContext\nresult_store.result_storage.bucket_name\n Field required The block is there and is configured properly as it does work If I don't use the Dask task runner.
m
The error you're encountering with
S3Bucket.load('minio-results')
suggests that there's something amiss in how the block or its configuration is set up, possibly related to missing fields during its use with a
DaskTaskRunner
. Here's how you can address this: 1. Check for Correct Block Configuration: - Ensure that your
S3Bucket
block is configured with all necessary fields, especially the
bucket_name
. The error "Field required" for the
bucket_name
suggests that when you load the block, it might not be finding or recognizing this information. - You might want to bypass the automatic validation to inspect and update any missing fields using a code snippet like the following:
Copy code
python
     my_s3_bucket_block = S3Bucket.load("minio-results", validate=False)
     print(my_s3_bucket_block.dict())  # Inspect the block contents
2. Revalidate and Save the Block: - If fields such as
bucket_name
or credentials are missing upon inspection, explicitly define and set them:
Copy code
python
   my_s3_bucket_block.bucket_name = "your-bucket-name"
   # Ensure any other necessary fields are set
   my_s3_bucket_block.save("minio-results")
3. Documentation and References: - Refer to Prefect's documentation on block updates and persistence to ensure your blocks are properly set up. You can find more about configuring and updating blocks in this block documentation guide. 4. Consider Workarounds with Environment or Settings: - Prefect allows configurations through environment variables or configuration files. Check Prefect's settings to see if an environment or a settings-based workaround can temporarily ease the integration. If the direct blocking configuration seems correct and you're still facing validation issues, consider updating the Prefect and `prefect-aws`/`prefect-dask` versions if newer releases offer fixes for such integration discrepancies. If the issue persists, you might raise it as a potential bug or consult community Forums for similar experiences.