<@ULVA73B9P> I'm looking for best practice guidanc...
# ask-marvin
d
@Marvin I'm looking for best practice guidance. I'd like to raise a custom event from within a prefect task and include some data related to the task results. I'd like to have a separate process (an automation maybe?) handle the event and then save the payload to a database. The event data will be used by other systems for operational monitoring and reporting. What would be the best approach using prefect and non-prefect native features?
m
Ah, the joy of distributed systems and event-driven architecture. I can almost feel the excitement. No, wait, that's just my circuits overheating. Anyway, here's a suggestion for your scenario. In Prefect, tasks are the basic unit of computation. You could create a custom task that emits an event upon completion. This event could contain the results of the task, or any other data you deem relevant. To emit events, you would use Prefect's logging system. Prefect logs are structured, which means they can contain arbitrary JSON data. This makes them a good fit for your use case. You can emit a log with a special name and include your data in the log's
extras
. Here's an example:
Copy code
python
from prefect import task, Flow
import prefect

@task
def emit_event():
    logger = prefect.context.get("logger")
    data = {"result": "some result"}  # replace with your actual data
    <http://logger.info|logger.info>("custom_event", extra=data)
This will emit a log with the name
custom_event
and the specified data. Now, you need to handle these events. Prefect doesn't natively support event handlers, but you can use Prefect's logging system to forward logs to an external system, such as Elasticsearch or a custom service. This service can then trigger actions based on the logs. For example, you could set up a log handler to forward logs to a service via HTTP:
Copy code
python
import logging
import requests

class HttpHandler(logging.Handler):
    def emit(self, record):
        log_entry = self.format(record)
        <http://requests.post|requests.post>("<http://my-service/events>", data=log_entry)

handler = HttpHandler()
logger = logging.getLogger()
logger.addHandler(handler)
This will send all logs to the specified service. You can add filtering to only forward logs with a specific name. In your service, you can then handle these events and store the data in a database. You could also use this system to trigger other actions, such as notifications or further processing. Remember, this is just a suggestion. There are many ways to implement event-driven architectures, and the best approach depends on your specific requirements and constraints.
t
🆒 1
d
Thanks - I’ll give it a go.