https://prefect.io logo
Title
m

Marc Lipoff

09/08/2022, 6:30 PM
Im looking to implement a real-time (ish) pipeline in v1. An external system drops s3 files every minute or so. I'd like to poll s3 on a fixed-time-basis to see what files have been modified since the last poll. And if there are new files, grab them, and put them in the database. I used this as inspiration. What I did was first create "chunks" (or windows) to look at. So a list of n windows, with the tuple of the start/end time of the window. Then, mapping over each window, run a task that: (a) if the end of the window hasn't passed yet, throw RETRY signal with start_time=window_end_time, or (b) find the s3 files that were modified within the window. What seems to work pretty good, EXCEPT... when running the flow, the tasks that are in a pending state (from (a) above) are utilizing the available slots (flow is setup with num_threads=8). So essentially, my pipeline is not progressing to push data to a db until the last window has passed (not the intention).
Exceprts of the code: tasks in flow...
last_modified_begin, last_modified_end, partition = make_chunks(
        target_start_time,
        to_timedelta(poll_freq),
        to_timedelta(total_duration_to_run),
    )

    # lists the files in the chunk (based on modified timestamp)
    files = S3ListGracefulAndWait(bucket=s3_bucket_name.run()).map(
        prefix=unmapped("fargate"),
        last_modified_begin=last_modified_begin,
        last_modified_end=last_modified_end,
    )

    df = read_files.map(files)
    ### and then take the dataframe and push to database, ...
and the s3 task class
class S3ListGracefulAndWait(S3List):
    def run(
        self,
        prefix: str,
        partition: str,
        last_modified_begin: datetime.datetime,
        last_modified_end: datetime.datetime,
    ) -> list[str]:

        # using the partitions is important because of the number of files. without at least pre-filtering on dt, the s3list takes way too long
        prefix += "/dt=" + partition

        if last_modified_end < now_():  # has passed
            try:
                log().info("Starting to list s3 files...")
                res = super().run(
                    prefix=prefix,
                    last_modified_begin=datetime_to_string(last_modified_begin),
                    last_modified_end=datetime_to_string(last_modified_end),
                )
                log().info(
                    f"S3ListGracefulAndWait run prefix={prefix} last_modified_begin={last_modified_begin} last_modified_end={last_modified_end}. Result={res}"
                )
                if len(res) == 0:
                    raise signals.SKIP(
                        f"No files available for dt={partition} {last_modified_begin} to {last_modified_end}"
                    )
                return [f"s3://{self.bucket}/{x}" for x in res]
            except Exception as e:
                log().error(e, exc_info=True)
                raise signals.SKIP(
                    f"Failed to get s3 files for dt={partition} {last_modified_begin} to {last_modified_end}. {e}"
                )
        else:
            raise signals.RETRY(
                message=f"Going to retry at {last_modified_end}",
                start_time=last_modified_end,
            )
While this the way I thought was best to do it, I'm open to other ideas. I want the flow to be able to poll regularly for new files. And if there is a new file, proceed asynchronously with processing the file (ie, dont wait until the previous file is processed to do the next poll)
a

Anna Geller

09/09/2022, 1:43 AM
Hi Marc, we have a couple of blog posts on that for Prefect 2, check if you're interested https://annageller.medium.com/serverless-real-time-data-pipelines-on-aws-with-prefect-ecs-and-github-actions-1737c80da3f5