https://prefect.io logo
r

Raymond Yu

03/14/2022, 2:23 AM
Heya Prefectionists, I’ve built something of a dumb s3 sensor, but it appears to get stuck whenever it attempts to retry. Any suggestions or ideas as to why it could hang on retry? I’ll include a snippet of the code in the comments, all the upstream tasks have result types in PrefectResult().
Copy code
@task(max_retries=24, retry_delay=timedelta(minutes=15))
def check_s3_file_exists_source(
    bucket_name: str, job_config: dict, s3_filter_date: str
) -> None:
    """
    A sensor that will fail if files are not found for a given run date
    Args:
        bucket_name: which bucket to check files in
        job_config: Config containing landing_table_name` for the prefix to check with
        s3_filter_date: Date to filter S3 from, files modified before this date are
    not included
    """
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(
        f"Checking for source files in '{bucket_name}/{job_config['landing_table_name']}'"
        f" for the run date {s3_filter_date}."
    )

    source_files = S3List(bucket=bucket_name).run(
        prefix=job_config["landing_table_name"], last_modified_begin=s3_filter_date
    )
    if not source_files:
        raise ValueError("No files found for the run date ", s3_filter_date)
    else:
        <http://logger.info|logger.info>(
            f"Check Successful. "
            f"Source files detected in '{bucket_name}/{source_files[0]}'"
        )
        raise signals.SUCCESS()
Do I just need to raise a retry signal here? Also how does everyone go about testing something like this?
k

Kevin Kho

03/14/2022, 2:27 AM
I think this looks pretty right to me. Are you using flow.run() or running with an agent? I think you can add more logging if you want by putting the sleep in the code and using
prefect.context.get("task_run_count")
to identify if it’s a retry or the first attempt
r

Raymond Yu

03/14/2022, 2:33 AM
Oh thanks for the fast response, we’re running this inside docker with an agent, the interesting thing is that when I trigger this locally via a flow.run() it does retry correctly.
a

Anna Geller

03/14/2022, 9:20 AM
the signals should be a bit different - instead of ValueError, you should raise the RETRY signal, and if the file you need arrived, no need to raise the SUCCESS signal explicitly - this will simply finish the task run successfully and exit - no need to add SUCCESS signal there. We have an entire Discourse topic about it in much more detail https://discourse.prefect.io/t/is-there-an-equivalent-to-sensors-in-prefect-how-do-i-trigger-event-driven-workflows/76 but here's a snippet from there:
Copy code
import pendulum
from prefect.engine.signals import RETRY
import awswrangler as wr

def check_if_file_arrived_in_s3():
    return wr.s3.does_object_exist("<s3://bucket/example_file.csv>")

@task
def s3_sensor(**kwargs):
    bool_s3_object_arrived = check_if_file_arrived_in_s3()
    if bool_s3_object_arrived is False:
        raise RETRY(
            "File not available yet, retrying in 20 seconds.",
            start_time=pendulum.now().add(seconds=20),
        )
This should work the same way in a local run ☝️
r

Raymond Yu

03/14/2022, 5:57 PM
Ahhh perfect! Thanks Anna 🤟
👍 1
Upon further investigation we found that this didn’t quite work for what we intended. As the retry overwrote the parameters we have in place in the task decorator and caused the sensor to poll infinitely rather than for a certain number of retries as we’ve wanted. Ultimately this weird retry hang we found was due to a downstream task with a .bind() to the sensor task that was the issue. Which is a bit weird, but I guess we were mistakenly putting in a keyed edge between the tasks when it should be non-keyed edges.
3 Views