Anyone know if there’s an existing and preferred p...
# ask-community
j
Anyone know if there’s an existing and preferred pattern in prefect to monitor an empty folder and trigger an action if files are dropped in that folder by a user? I’d prefer not to have a superloop in my task/flow to handle this myself if there’s some observer. I’ve looked through the tests, examples, google search, and asked the AIs (they don’t know prefect2) but I don’t see a way to do this.
plus1 1
c
Hi James, thanks for the question! Prefect doesn't have built-in filesystem watching capabilities (this is an interesting idea that we can follow up on). Since Prefect flows are Python functions, they work well with other frameworks like
watchdog
. Here's a quick prototype I put together using
watchdog
that would do something like what you're describing:
Copy code
import sys
import time

from prefect import flow, get_run_logger
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer


@flow
def on_file_changed(path: str):
    get_run_logger().info(f"{path!r} has changed")


class PrefectEventHandler(FileSystemEventHandler):
    def on_created(self, event: FileSystemEvent):
        on_file_changed(path=event.src_path)

    def on_closed(self, event: FileSystemEvent):
        on_file_changed(path=event.src_path)


if __name__ == "__main__":
    path = sys.argv[1] if len(sys.argv) > 1 else "."
    print(f"start watching directory {path!r}")
    event_handler = PrefectEventHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    finally:
        observer.stop()
        observer.join()
w
Using watchdog is really interesting! Thanks for the example, I have a similar situation, but I'm using CEPH RAGW (a S3 compatibly API). Do you have any suggestion of a workflow that I can use to have this "Sensor like" capability?
c
Ah that's interesting, does Ceph support webhook callbacks? I'm looking at their bucket notifications API, and it seems like they do? This would let you take a couple of different approaches: a) with Prefect Cloud, use our webhooks to receive the callbacks from Ceph and emit an event in your workspace; from there, create an automation to run a deployment (or any other action you need) in response to that event b) with an open-source Prefect server, create a deployment and then see if you can configure the Ceph webhook to POST to the
/api/deployment/<id>/create_flow_run
endpoint to kick off the flow
j
Nice @Chris Guidry! Thanks so much! I'll let you know my mileage as we move forward. Will probably be some days, starting with some other use cases but spec'ing that out.