Sowmiya Anand07/24/2023, 6:27 PM
Marvin07/24/2023, 6:27 PM
to list all objects in the S3 bucket.
2. Filter the returned list for files with the specific key value in their name and based on timestamp. This can be done within the same task or you can create a separate task for it.
python from prefect import Task class ListS3Objects(Task): def run(self, bucket: str): import boto3 s3 = boto3.client('s3') return s3.list_objects(Bucket=bucket)
3. Chain these tasks in a Prefect Flow.
python class FilterS3Objects(Task): def run(self, s3_objects: list, key_value: str, timestamp: str): return [obj for obj in s3_objects if key_value in obj['Key'] and timestamp in obj['Key']]
This will give you a list of S3 files which have the specified key value in their name and are based on the given timestamp. Remember, this is just a basic example. You might need to handle pagination if your S3 bucket has more than 1000 objects. Also, you might want to fetch the objects' metadata to get the timestamp. Please adjust the logic according to your specific needs. Note: You need to have the necessary permissions to list and access the objects in the S3 bucket. Make sure to configure your AWS credentials correctly.
python from prefect import Flow with Flow("S3 Files") as flow: s3_objects = ListS3Objects()(bucket="my-bucket") filtered_objects = FilterS3Objects()(s3_objects, key_value="my-key", timestamp="2023-07-24") flow.run()
Nate07/24/2023, 6:33 PM
import boto3 from operator import itemgetter s3 = boto3.client('s3') def get_objects_sorted_by_last_modified(bucket): response = s3.list_objects_v2(Bucket=bucket) # Check if the bucket is not empty if response['KeyCount'] > 0: files = response['Contents'] sorted_files = sorted(files, key=itemgetter('LastModified')) return sorted_files else: return  bucket = 'my_bucket' sorted_files = get_objects_sorted_by_last_modified(bucket) for file in sorted_files: print(file['Key'], file['LastModified'])
Sowmiya Anand07/24/2023, 6:47 PM
Marvin07/24/2023, 6:47 PM
client to get the object and parse the JSON content. You can use the Prefect's secret management for your AWS credentials. An example of a Prefect Task to read the S3 files might look like this:
You can then use this task in your Prefect Flow to read the S3 JSON files:
python from prefect import Task import boto3 import json class ReadS3JSON(Task): def run(self, bucket: str, key: str): s3 = boto3.client('s3') file_object = s3.get_object(Bucket=bucket, Key=key) content = file_object["Body"].read().decode('utf-8') json_content = json.loads(content) return json_content
You will now need to compare the JSON content with the data in your Redshift tables. You can do this by creating a Prefect Task that connects to your Redshift database, execute your comparison queries, and return the results. You can use SQLAlchemy to interact with your Redshift database:
python from prefect import Flow with Flow("Read S3 JSON") as flow: json_content = ReadS3JSON()(bucket="my-bucket", key="my-file-key")
You can then use this task in your Prefect Flow to execute your comparison queries:
python from prefect import Task from sqlalchemy import create_engine class RedshiftQuery(Task): def run(self, query: str): engine = create_engine('<postgresql://user:password@hostname:port/database>') with engine.connect() as connection: result = connection.execute(query) return result.fetchall()
This will compare the value of the
python with Flow("Compare S3 JSON with Redshift") as flow: json_content = ReadS3JSON()(bucket="my-bucket", key="my-file-key") comparison_results = RedshiftQuery()(query="SELECT * FROM my_table WHERE my_column = :value", value=json_content["my_key"])
field in the JSON with the values in the
column of the
table in your Redshift database, and return the results. Note: You need to have the necessary permissions to access the objects in the S3 bucket and the tables in the Redshift database. Make sure to configure your AWS and PostgreSQL credentials correctly.