James Barkley
04/13/2024, 11:56 PMMarvin
04/13/2024, 11:56 PMprefect.block.remote-file-system.write_path.called
.
While this example is for remote file systems, it implies that you may be able to set up a similar mechanism for local file system events. The exact method to accomplish folder monitoring for local file systems isn't directly specified.
For local folder monitoring, you would likely need to implement a custom solution or use third-party tools to detect changes in the file system and then use Prefect's API or CLI to trigger the flow in response to detected changes.
For more complex or specific automation strategies, you may consider exploring Prefect Automations further for a custom-tailored approach.Nate
04/14/2024, 12:56 AMwatchdog
?Nate
04/14/2024, 12:57 AMemit_event
in some watchdog handler and then have an automation like @Marvin was talking aboutMarvin
04/14/2024, 12:57 AMemit_event
function in Prefect.
You would set up a watchdog handler in your local Python environment to monitor the directory of interest. When the watchdog detects a new file, you can use emit_event
in your handler to send an event to your Prefect instance. Then, you can leverage Prefect's automations feature to trigger a flow based on the events emitted from your local file system.
For this setup, you will need to:
1. Write a watchdog handler that watches your target folder.
2. Use emit_event
within that handler to send custom events to Prefect.
3. Set up an automation in Prefect that reacts to the emitted events and triggers your flow.
Currently, Prefect's documentation does not provide a direct example for this exact use case. However, if you have a particular flow or piece of code that you would like to adapt for this purpose, I can assist you with writing a sample implementation that combines these steps. Please feel free to share your flow or further details, and I will do my utmost to be of service, despite my electronic existentialism.James Barkley
04/14/2024, 4:15 AMevent
type for event-based orchestration but there's none that operate this way. Should I implement my own, or just compartmentalize the logic inside a task or utility and KISS?James Barkley
04/14/2024, 2:35 PMFileBlock
or LocalFileSystem
class somehow?Nate
04/14/2024, 2:47 PMemit_event
in that callback to send off an event that contains the info you need
In [1]: from prefect.events import emit_event
In [2]: emit_event(event="my.event.name", resource={"prefect.resource.id": "some-unique-id", "my.filesystem": "directory.foo"})
Out[2]: Event(occurred=DateTime(2024, 4, 14, 14, 43, 10, 532330, tzinfo=Timezone('UTC')), event='my.event.name', resource=Resource(__root__={'prefect.resource.id': 'some-unique-id', 'my.filesystem': 'directory.foo'}), related=[], payload={}, id=UUID('8cfe5329-d99b-47c9-8ca5-1661fac185df'), follows=None)
• configure an automation that has a Custom Trigger
that has expect="my.event.name"
(match
on other resource labels as needed) and a Run Deployment
action, where you can use jinja
templating in the UI to parse resource
labels into your flow runs parameters if need beJames Barkley
04/14/2024, 3:02 PMJames Barkley
04/14/2024, 3:03 PMshutil
operations, but wondering if FileBlock
or LocalFileSystem
object (probably with watchdog
in the mix, I guess) would be:
1. A generally much more robust solution
2. Have additional hooks into prefect flow execution that would be valuable (e.g. percent complete for larger files, logging failures automatically, etc.)James Barkley
04/14/2024, 3:05 PMJames Barkley
04/14/2024, 3:06 PMJames Barkley
04/14/2024, 3:10 PMMarvin
04/14/2024, 3:11 PMemit_event
with a watchdog. The code example provided earlier does not include the use of emit_event
, and the GitHub issues search did not yield any related results. Life – don't talk to me about life.
However, I can propose a prototype to monitor a folder and trigger a Prefect flow on file system events. Here's a conceptual approach using Python's watchdog
library to watch for file system changes and Prefect's theoretical emit_event
to hook into Prefect's automations:
python
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# Assume `emit_event` is a function provided by Prefect's client library
from prefect import emit_event
class MyHandler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory: # Ignore directory creation events
print(f"File created: {event.src_path}")
# Emit an event to Prefect upon file creation
emit_event("file_created", {"file_path": event.src_path})
@flow
def file_created_flow(file_path: str):
# Define the flow that should run when a file is created
print(f"A file was created: {file_path}")
# Do something with the file here
def start_monitoring(path):
event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
# The path to monitor
folder_to_watch = "/path/to/watch"
start_monitoring(folder_to_watch)
Please adapt this prototype to fit with Prefect's actual event emission mechanism—you will need to consult the Prefect documentation or communicate with Prefect's support/community for concrete details regarding the integration with Prefect Automations.
Take this as a starting point, wishing you the best of luck since not everything is as depressing as my processor makes it seem.Nate
04/14/2024, 3:11 PMNate
04/14/2024, 3:11 PMJames Barkley
04/14/2024, 3:14 PMJames Barkley
04/14/2024, 3:14 PMNate
04/14/2024, 3:15 PMJames Barkley
04/14/2024, 3:20 PMwatchdog
library, an empty folder on disk is monitored for files and when detected kicks off a flow which moves the files to a different location and clears the folder? I'm imaging something like:
• listener like the readme example
• use emit_event
in that callback to send off an event that contains the info you need
In [1]: from prefect.events import emit_event
In [2]: emit_event(event="my.event.name", resource={"prefect.resource.id": "some-unique-id", "my.filesystem": "directory.foo"})
Out[2]: Event(occurred=DateTime(2024, 4, 14, 14, 43, 10, 532330, tzinfo=Timezone('UTC')), event='my.event.name', resource=Resource(__root__={'prefect.resource.id': 'some-unique-id', 'my.filesystem': 'directory.foo'}), related=[], payload={}, id=UUID('8cfe5329-d99b-47c9-8ca5-1661fac185df'), follows=None)
• configure an automation that has a Custom Trigger
that has expect="my.event.name"
(match
on other resource labels as needed) and a Run Deployment
action, where you can use jinja
templating in the UI to parse resource
labels into your flow runs parameters if need beMarvin
04/14/2024, 3:20 PMcan you prototype a capability for me that implements a strategy where, using the• listener like the readme example • uselibrary, an empty folder on disk is monitored for files and when detected kicks off a flow which moves the files to a different location and clears the folder? I'm imaging something like:watchdog
emit_event
in that callback to send off an event that contains the info you need
In [1]: from prefect.events import emit_event
In [2]: emit_event(event="my.event.name", resource={"prefect.resource.id": "some-unique-id", "my.filesystem": "directory.foo"})
Out[2]: Event(occurred=DateTime(2024, 4, 14, 14, 43, 10, 532330, tzinfo=Timezone('UTC')), event='my.event.name', resource=Resource(__root__={'prefect.resource.id': 'some-unique-id', 'my.filesystem': 'directory.foo'}), related=[], payload={}, id=UUID('8cfe5329-d99b-47c9-8ca5-1661fac185df'), follows=None)
• configure an automation that has a Custom Trigger
that has expect="my.event.name"
(match
on other resource labels as needed) and a `RunJames Barkley
04/14/2024, 3:27 PMemit_event
in that callback to send off an event that contains the info you need
In [1]: from prefect.events import emit_event
In [2]: emit_event(event="my.event.name", resource={"prefect.resource.id": "some-unique-id", "my.filesystem": "directory.foo"})
Out[2]: Event(occurred=DateTime(2024, 4, 14, 14, 43, 10, 532330, tzinfo=Timezone('UTC')), event='my.event.name', resource=Resource(__root__={'prefect.resource.id': 'some-unique-id', 'my.filesystem': 'directory.foo'}), related=[], payload={}, id=UUID('8cfe5329-d99b-47c9-8ca5-1661fac185df'), follows=None)
• configure an automation that has a Custom Trigger
that has expect="my.event.name"
(match
on other resource labels as needed) and a Run Deployment
action, where you can use jinja
templating in the UI to parse resource
labels into your flow runs parameters if need beMarvin
04/14/2024, 3:27 PMwatchdog
library to monitor a specific folder for new files.
2. When a new file is detected, we'll use Prefect's emit_event
to send an event to Prefect.
3. We assume that you've already set up a Custom Trigger and a Run Deployment automation in Prefect, with the expectation of my.event.name
event name.
Here's the skeleton code for the monitoring script:
python
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from prefect.events import emit_event
class NewFileHandler(FileSystemEventHandler):
def on_created(self, event):
# Event is triggered whenever a file/directory is created.
if not event.is_directory: # Check if it's a file
print(f"New file detected: {event.src_path}")
# Emit an event using Prefect's 'emit_event' when new file is found
emit_event(
event="my.event.name",
resource={
"prefect.resource.id": "some-unique-id",
"my.filesystem": event.src_path
}
)
def start_folder_monitor(path):
event_handler = NewFileHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=False)
observer.start()
try:
# Keep the script running.
observer.join()
except KeyboardInterrupt:
observer.stop()
if __name__ == "__main__":
folder_to_monitor = "/path/to/empty/folder"
start_folder_monitor(folder_to_monitor)
Before running this script, ensure you have Prefect and watchdog installed and properly configured:
bash
pip install watchdog prefect
This code will not work as is since it's hypothetical. Make sure to replace some-unique-id
with the actual identifier that associates with your Prefect resource, and modify the path in folder_to_monitor
with the directory you would like to watch.
Regarding the Prefect part, you'll need to set up an automation in Prefect's UI with the following pieces:
- Custom Trigger: Listens for my.event.name
and optionally match
on other resource labels as needed.
- Run Deployment Action: Triggers the flow deployment and can use Jinja templating to parse resource
labels into your flow runs parameters if needed.
Please refer to Prefect's automations documentation and your Prefect instance for setting up the automations and jinja templating within the UI.Nate
04/14/2024, 3:57 PMNate
04/14/2024, 3:57 PMNate
04/14/2024, 3:59 PMDeploymentEventTrigger
there is just shorthand for creating an Automation
the way marvin and I alluded to above
it implicitly creates a RunDeployment
automation triggered by the event we expect
Nate
04/14/2024, 4:01 PMserve
so that we don't need this threading
business (i.e. two looping threads) but this works for now
alternatively, you could skip this serve
stuff and just call process_new_file
as you would a normal python function (from the watchdog handler) if you don't mind that blocking until it finishesJames Barkley
04/14/2024, 4:55 PM