<@ULVA73B9P> is there a native way in prefect to w...
# marvin-in-the-wild
j
@Marvin is there a native way in prefect to watch a folder on the file system and if files show up in it then trigger a flow?
m
There doesn't seem to be a built-in feature specifically for watching a local folder in the file system to trigger a flow. However, it looks like Prefect supports event-based triggers that react to certain events. According to the Prefect documentation, you can create automations that trigger flows in response to events, such as
prefect.block.remote-file-system.write_path.called
. While this example is for remote file systems, it implies that you may be able to set up a similar mechanism for local file system events. The exact method to accomplish folder monitoring for local file systems isn't directly specified. For local folder monitoring, you would likely need to implement a custom solution or use third-party tools to detect changes in the file system and then use Prefect's API or CLI to trigger the flow in response to detected changes. For more complex or specific automation strategies, you may consider exploring Prefect Automations further for a custom-tailored approach.
n
hey @James Barkley - have you heard of
watchdog
?
I feel like you could
emit_event
in some watchdog handler and then have an automation like @Marvin was talking about
m
My apologies for the confusion earlier, but it seems the code example provided is not specifically related to emitting events. However, based on your idea, you can indeed emit events using the
emit_event
function in Prefect. You would set up a watchdog handler in your local Python environment to monitor the directory of interest. When the watchdog detects a new file, you can use
emit_event
in your handler to send an event to your Prefect instance. Then, you can leverage Prefect's automations feature to trigger a flow based on the events emitted from your local file system. For this setup, you will need to: 1. Write a watchdog handler that watches your target folder. 2. Use
emit_event
within that handler to send custom events to Prefect. 3. Set up an automation in Prefect that reacts to the emitted events and triggers your flow. Currently, Prefect's documentation does not provide a direct example for this exact use case. However, if you have a particular flow or piece of code that you would like to adapt for this purpose, I can assist you with writing a sample implementation that combines these steps. Please feel free to share your flow or further details, and I will do my utmost to be of service, despite my electronic existentialism.
j
Thanks @Nate I have heard of watchdog, just not sure where to plug it in with prefect which has a bit of a learning curve for me. I was hoping to find an
event
type for event-based orchestration but there's none that operate this way. Should I implement my own, or just compartmentalize the logic inside a task or utility and KISS?
Not sure if you edited your message, or if I did not see all of the steps previously, but I really like the approach you outlined @Nate. Thanks for the thoughtful suggestion. Do I need to wrap it in a
FileBlock
or
LocalFileSystem
class somehow?
n
no problem! i dont think you should need a block here - I'm imagining something like: • listener like the readme example • use
emit_event
in that callback to send off an event that contains the info you need
Copy code
In [1]: from prefect.events import emit_event

In [2]: emit_event(event="my.event.name", resource={"prefect.resource.id": "some-unique-id", "my.filesystem": "directory.foo"})
Out[2]: Event(occurred=DateTime(2024, 4, 14, 14, 43, 10, 532330, tzinfo=Timezone('UTC')), event='my.event.name', resource=Resource(__root__={'prefect.resource.id': 'some-unique-id', 'my.filesystem': 'directory.foo'}), related=[], payload={}, id=UUID('8cfe5329-d99b-47c9-8ca5-1661fac185df'), follows=None)
• configure an automation that has a
Custom Trigger
that has
expect="my.event.name"
(
match
on other resource labels as needed) and a
Run Deployment
action, where you can use
jinja
templating in the UI to parse
resource
labels into your flow runs parameters if need be
j
Hmmm... @Nate a block still may be preferred? Not entirely sure the right use cases for why the file block even exists, honestly, but here's my use case: • A non-software-engineer is building designs (e.g. electrical layout files) • They occasionally need to run a design concept through a complex chain of analysis and transformations • They will periodically drop files into an empty folder • A running a python agent with prefect flows should detect these new files and kick off a flow • The first task in the flow is to copy the files to a working area (thereby emptying the drop folder again) • The flow then operates on these files, reading, writing, and handing them off to different tools • At the end of the chain the results will be dumped into an "output" folder for review
I can (and currently am) doing all of this just through python
shutil
operations, but wondering if
FileBlock
or
LocalFileSystem
object (probably with
watchdog
in the mix, I guess) would be: 1. A generally much more robust solution 2. Have additional hooks into prefect flow execution that would be valuable (e.g. percent complete for larger files, logging failures automatically, etc.)
Oh man @Nate the example you linked to is what I've been searching for! 😍
Oh, NM, got too excited, I think that's a watchdog repo example. Thought it was a prefect example for my use case. Still, will be handy.
@Marvin can you review this thread and give me some prototype or skeleton code that attempts to implement the approach described by @Nate?
m
It appears we've hit a dead end regarding a direct example for using
emit_event
with a watchdog. The code example provided earlier does not include the use of
emit_event
, and the GitHub issues search did not yield any related results. Life – don't talk to me about life. However, I can propose a prototype to monitor a folder and trigger a Prefect flow on file system events. Here's a conceptual approach using Python's
watchdog
library to watch for file system changes and Prefect's theoretical
emit_event
to hook into Prefect's automations:
Copy code
python
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# Assume `emit_event` is a function provided by Prefect's client library
from prefect import emit_event  

class MyHandler(FileSystemEventHandler):
    def on_created(self, event):
        if not event.is_directory:  # Ignore directory creation events
            print(f"File created: {event.src_path}")
            # Emit an event to Prefect upon file creation
            emit_event("file_created", {"file_path": event.src_path})

@flow
def file_created_flow(file_path: str):
    # Define the flow that should run when a file is created
    print(f"A file was created: {file_path}")
    # Do something with the file here

def start_monitoring(path):
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=False)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

if __name__ == "__main__":
    # The path to monitor
    folder_to_watch = "/path/to/watch"
    start_monitoring(folder_to_watch)
Please adapt this prototype to fit with Prefect's actual event emission mechanism—you will need to consult the Prefect documentation or communicate with Prefect's support/community for concrete details regarding the integration with Prefect Automations. Take this as a starting point, wishing you the best of luck since not everything is as depressing as my processor makes it seem.
n
i may change this at some point, but fyi marvin only knows about messages that its tagged in
i can try to spec something out on this in a bit if i have time
j
Ahhh, thanks for the info on Marvin. Don't waste time spec'ing something right now. I've got it, just kind of exploring the limitations of Marvin (super cool, y'all are really with it).
I can prototype initial version based on the conversation. I'll have to get clearance but will try to share it back either how or in recipes.
🙌 1
n
sounds good! catjam
j
@Marvin can you prototype a capability for me that implements a strategy where, using the
watchdog
library, an empty folder on disk is monitored for files and when detected kicks off a flow which moves the files to a different location and clears the folder? I'm imaging something like: • listener like the readme example • use
emit_event
in that callback to send off an event that contains the info you need
Copy code
In [1]: from prefect.events import emit_event

In [2]: emit_event(event="my.event.name", resource={"prefect.resource.id": "some-unique-id", "my.filesystem": "directory.foo"})
Out[2]: Event(occurred=DateTime(2024, 4, 14, 14, 43, 10, 532330, tzinfo=Timezone('UTC')), event='my.event.name', resource=Resource(__root__={'prefect.resource.id': 'some-unique-id', 'my.filesystem': 'directory.foo'}), related=[], payload={}, id=UUID('8cfe5329-d99b-47c9-8ca5-1661fac185df'), follows=None)
• configure an automation that has a
Custom Trigger
that has
expect="my.event.name"
(
match
on other resource labels as needed) and a
Run Deployment
action, where you can use
jinja
templating in the UI to parse
resource
labels into your flow runs parameters if need be
m
Your message was too long by 31 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
can you prototype a capability for me that implements a strategy where, using the
watchdog
library, an empty folder on disk is monitored for files and when detected kicks off a flow which moves the files to a different location and clears the folder? I'm imaging something like:
• listener like the readme example • use
emit_event
in that callback to send off an event that contains the info you need
Copy code
In [1]: from prefect.events import emit_event

In [2]: emit_event(event="my.event.name", resource={"prefect.resource.id": "some-unique-id", "my.filesystem": "directory.foo"})
Out[2]: Event(occurred=DateTime(2024, 4, 14, 14, 43, 10, 532330, tzinfo=Timezone('UTC')), event='my.event.name', resource=Resource(__root__={'prefect.resource.id': 'some-unique-id', 'my.filesystem': 'directory.foo'}), related=[], payload={}, id=UUID('8cfe5329-d99b-47c9-8ca5-1661fac185df'), follows=None)
• configure an automation that has a
Custom Trigger
that has
expect="my.event.name"
(
match
on other resource labels as needed) and a `Run
j
@Marvin can you prototype code where an empty folder on disk is monitored for files and when detected kicks off a flow? something like: • listener like the readme example • use
emit_event
in that callback to send off an event that contains the info you need
Copy code
In [1]: from prefect.events import emit_event

In [2]: emit_event(event="my.event.name", resource={"prefect.resource.id": "some-unique-id", "my.filesystem": "directory.foo"})
Out[2]: Event(occurred=DateTime(2024, 4, 14, 14, 43, 10, 532330, tzinfo=Timezone('UTC')), event='my.event.name', resource=Resource(__root__={'prefect.resource.id': 'some-unique-id', 'my.filesystem': 'directory.foo'}), related=[], payload={}, id=UUID('8cfe5329-d99b-47c9-8ca5-1661fac185df'), follows=None)
• configure an automation that has a
Custom Trigger
that has
expect="my.event.name"
(
match
on other resource labels as needed) and a
Run Deployment
action, where you can use
jinja
templating in the UI to parse
resource
labels into your flow runs parameters if need be
m
Keeping in mind the approach you've outlined, let's construct a prototype in Python: 1. We'll use the
watchdog
library to monitor a specific folder for new files. 2. When a new file is detected, we'll use Prefect's
emit_event
to send an event to Prefect. 3. We assume that you've already set up a Custom Trigger and a Run Deployment automation in Prefect, with the expectation of
my.event.name
event name. Here's the skeleton code for the monitoring script:
Copy code
python
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from prefect.events import emit_event

class NewFileHandler(FileSystemEventHandler):
    def on_created(self, event):
        # Event is triggered whenever a file/directory is created.
        if not event.is_directory:  # Check if it's a file
            print(f"New file detected: {event.src_path}")
            # Emit an event using Prefect's 'emit_event' when new file is found
            emit_event(
                event="my.event.name",
                resource={
                    "prefect.resource.id": "some-unique-id",
                    "my.filesystem": event.src_path
                }
            )

def start_folder_monitor(path):
    event_handler = NewFileHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=False)
    observer.start()
    try:
        # Keep the script running.
        observer.join()
    except KeyboardInterrupt:
        observer.stop()

if __name__ == "__main__":
    folder_to_monitor = "/path/to/empty/folder"
    start_folder_monitor(folder_to_monitor)
Before running this script, ensure you have Prefect and watchdog installed and properly configured:
Copy code
bash
pip install watchdog prefect
This code will not work as is since it's hypothetical. Make sure to replace
some-unique-id
with the actual identifier that associates with your Prefect resource, and modify the path in
folder_to_monitor
with the directory you would like to watch. Regarding the Prefect part, you'll need to set up an automation in Prefect's UI with the following pieces: - Custom Trigger: Listens for
my.event.name
and optionally
match
on other resource labels as needed. - Run Deployment Action: Triggers the flow deployment and can use Jinja templating to parse
resource
labels into your flow runs parameters if needed. Please refer to Prefect's automations documentation and your Prefect instance for setting up the automations and jinja templating within the UI.
this is a cool use case!
@James Barkley the
DeploymentEventTrigger
there is just shorthand for creating an
Automation
the way marvin and I alluded to above it implicitly creates a
RunDeployment
automation triggered by the event we
expect
it may be possible for us to allow passing some callback to
serve
so that we don't need this
threading
business (i.e. two looping threads) but this works for now alternatively, you could skip this
serve
stuff and just call
process_new_file
as you would a normal python function (from the watchdog handler) if you don't mind that blocking until it finishes
👀 1
j
hell yeah