Raymond Yu
03/14/2022, 2:23 AM@task(max_retries=24, retry_delay=timedelta(minutes=15))
def check_s3_file_exists_source(
bucket_name: str, job_config: dict, s3_filter_date: str
) -> None:
"""
A sensor that will fail if files are not found for a given run date
Args:
bucket_name: which bucket to check files in
job_config: Config containing landing_table_name` for the prefix to check with
s3_filter_date: Date to filter S3 from, files modified before this date are
not included
"""
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(
f"Checking for source files in '{bucket_name}/{job_config['landing_table_name']}'"
f" for the run date {s3_filter_date}."
)
source_files = S3List(bucket=bucket_name).run(
prefix=job_config["landing_table_name"], last_modified_begin=s3_filter_date
)
if not source_files:
raise ValueError("No files found for the run date ", s3_filter_date)
else:
<http://logger.info|logger.info>(
f"Check Successful. "
f"Source files detected in '{bucket_name}/{source_files[0]}'"
)
raise signals.SUCCESS()
Kevin Kho
prefect.context.get("task_run_count")
to identify if it’s a retry or the first attemptRaymond Yu
03/14/2022, 2:33 AMAnna Geller
import pendulum
from prefect.engine.signals import RETRY
import awswrangler as wr
def check_if_file_arrived_in_s3():
return wr.s3.does_object_exist("<s3://bucket/example_file.csv>")
@task
def s3_sensor(**kwargs):
bool_s3_object_arrived = check_if_file_arrived_in_s3()
if bool_s3_object_arrived is False:
raise RETRY(
"File not available yet, retrying in 20 seconds.",
start_time=pendulum.now().add(seconds=20),
)
Raymond Yu
03/14/2022, 5:57 PM