<@ULVA73B9P> how to dynamically fetch a list of s3...
# ask-marvin
s
@Marvin how to dynamically fetch a list of s3 files which has key value in there name based on time stamp in prefect 2.0?
m
In Prefect 2.0, you can use the S3Result class to interact with data stored in S3. However, fetching a list of S3 files with a particular key value in their name based on timestamp might require a combination of AWS SDK (Boto3 in Python) and Prefect tasks. Here's a simple workflow: 1. Create a
PythonTask
that uses
boto3
to list all objects in the S3 bucket.
Copy code
python
from prefect import Task

class ListS3Objects(Task):
    def run(self, bucket: str):
        import boto3
        s3 = boto3.client('s3')
        return s3.list_objects(Bucket=bucket)
2. Filter the returned list for files with the specific key value in their name and based on timestamp. This can be done within the same task or you can create a separate task for it.
Copy code
python
class FilterS3Objects(Task):
    def run(self, s3_objects: list, key_value: str, timestamp: str):
        return [obj for obj in s3_objects if key_value in obj['Key'] and timestamp in obj['Key']]
3. Chain these tasks in a Prefect Flow.
Copy code
python
from prefect import Flow

with Flow("S3 Files") as flow:
    s3_objects = ListS3Objects()(bucket="my-bucket")
    filtered_objects = FilterS3Objects()(s3_objects, key_value="my-key", timestamp="2023-07-24")

flow.run()
This will give you a list of S3 files which have the specified key value in their name and are based on the given timestamp. Remember, this is just a basic example. You might need to handle pagination if your S3 bucket has more than 1000 objects. Also, you might want to fetch the objects' metadata to get the timestamp. Please adjust the logic according to your specific needs. Note: You need to have the necessary permissions to list and access the objects in the S3 bucket. Make sure to configure your AWS credentials correctly.
n
hi @Sowmiya Anand - this is not right (for prefect 2) - there's an issue with marvin's knowledge retrieval (that I am working on now) thats causing low quality answers - apologies you can always use boto3 like
Copy code
import boto3
from operator import itemgetter

s3 = boto3.client('s3')

def get_objects_sorted_by_last_modified(bucket):
    response = s3.list_objects_v2(Bucket=bucket)
    # Check if the bucket is not empty
    if response['KeyCount'] > 0:
        files = response['Contents']
        sorted_files = sorted(files, key=itemgetter('LastModified'))
        return sorted_files
    else:
        return []

bucket = 'my_bucket'
sorted_files = get_objects_sorted_by_last_modified(bucket)
for file in sorted_files:
    print(file['Key'], file['LastModified'])
s
I also have the key value present inside the file, this is a json file so I have key:value @Marvin has mentioned we can read the data inside the file in s3 using prefect 2.0? I need to compare this key value with the tables in redshift and run some comparison queries. each file has a unique key value which will be a row in table where I will be comparing new data with the old one.
m
Yes, you can read the data inside the S3 files using Prefect 2.0. To do this, you need to create a Prefect Task that uses the
boto3
client to get the object and parse the JSON content. You can use the Prefect's secret management for your AWS credentials. An example of a Prefect Task to read the S3 files might look like this:
Copy code
python
from prefect import Task
import boto3
import json

class ReadS3JSON(Task):
    def run(self, bucket: str, key: str):
        s3 = boto3.client('s3')
        file_object = s3.get_object(Bucket=bucket, Key=key)
        content = file_object["Body"].read().decode('utf-8')
        json_content = json.loads(content)
        return json_content
You can then use this task in your Prefect Flow to read the S3 JSON files:
Copy code
python
from prefect import Flow

with Flow("Read S3 JSON") as flow:
    json_content = ReadS3JSON()(bucket="my-bucket", key="my-file-key")
You will now need to compare the JSON content with the data in your Redshift tables. You can do this by creating a Prefect Task that connects to your Redshift database, execute your comparison queries, and return the results. You can use SQLAlchemy to interact with your Redshift database:
Copy code
python
from prefect import Task
from sqlalchemy import create_engine

class RedshiftQuery(Task):
    def run(self, query: str):
        engine = create_engine('<postgresql://user:password@hostname:port/database>')
        with engine.connect() as connection:
            result = connection.execute(query)
            return result.fetchall()
You can then use this task in your Prefect Flow to execute your comparison queries:
Copy code
python
with Flow("Compare S3 JSON with Redshift") as flow:
    json_content = ReadS3JSON()(bucket="my-bucket", key="my-file-key")
    comparison_results = RedshiftQuery()(query="SELECT * FROM my_table WHERE my_column = :value", value=json_content["my_key"])
This will compare the value of the
my_key
field in the JSON with the values in the
my_column
column of the
my_table
table in your Redshift database, and return the results. Note: You need to have the necessary permissions to access the objects in the S3 bucket and the tables in the Redshift database. Make sure to configure your AWS and PostgreSQL credentials correctly.