Hi! I'm looking at the code and can't find my way ...
# prefect-community
s
Hi! I'm looking at the code and can't find my way around, can you kindly point me to the place where Flows are run on the server ? I use eliot for logs, and I'd like flow runs to live in an action context. Easily done locally overriding the run method, but I can't find my way around server code ( coming from airflow, love your job, it's a lifesaver ! )
👀 1
n
Hi @Simone Cittadini! Flows in Prefect Server and Prefect Cloud are run by an Agent. I'm not familiar with the specifics of Eliot, but it's possible a custom logger might fit your use case as well 🙂
s
Hi ! Thanks for the reply, sorry I was tight on context :) Eliot logging ha this concept of an action, if you open a context manager with start_action(action_type="some_label") every log inside the context will have two tags injected : a unique "event.id" and an "event.task_level" keeping track of order and nesting of actions (being a list of integers.) Now, if I use an extended Flow like that :
Copy code
from prefect import Flow as BaseFlow
   from eliot import start_action
	
   class Flow(BaseFlow):
       def run(self, ... ):
           with start_action(self.name):
               return super().run(...)
And in every Task run I do the same :
Copy code
class SomeTask(Task):
        def run(self):
            with start_action(self.name):
                ...
I get logs like these from a direct invocation ( I mean calling flow.run() in the code itself, no agent / runner involved) :
Copy code
{"action_status": "started", "event.id": "7cd02f09-...", "action_type": "flow name", "event.task_level": [1], ...}
{"log_level": "INFO", "logger": "prefect.FlowRunner", "event.id": "7cd02f09-...", "event.task_level": [2], ...}
...
{"action_status": "started", "event.id": "7cd02f09-...", "action_type": "task name", "event.task_level": [5, 1], ...}
 etc...
Pushing the event ids into the calls to our APIs enables a complete tracing of order and nesting of each event, done into and triggered by each flow run. But if I register the flow code and run it from the UI I get different logs :
Copy code
{"log_level": "INFO", "logger": "prefect.CloudFlowRunner", "event.id": "0445644e-...", "event.task_level": [1], ...}
{"log_level": "INFO", "logger": "prefect.CloudFlowRunner", "event.id": "05b36d25-...", "event.task_level": [1], ...}
...
As you can see there are no "action_status" entries, ids are different for each line and the level is always 1, no context was opened during this run. Hence I supposed that the runner calls some other method to run the flow, and there's another place in the code which I need to patch / override ( Hope I've been clear, thank you for your interest )
n
Thanks for the added context @Simone Cittadini! Let me dig into this a little, will report back
s
Hi @nicholas, FYI I think I've nailed it ( tested against prefect server start in my machine ) : the class I was searching for was prefect.engine.flow_runner import FlowRunner , overriding that run() in the same way I did with Flow did the trick. anyway it's not even necessary as I've found I can make my own RemoteEnvironment, where I open the logging action inside execute :
Copy code
class _Environment(RemoteEnvironment):  # type: ignore
    def execute(  # type: ignore
        self,
        storage,  # Storage
        flow_location: str,
        **kwargs: Any,
    ) -> None:
        if self.on_start:
            self.on_start()

        try:
            from prefect.engine import (
                get_default_executor_class,
                get_default_flow_runner_class,
            )

            flow = storage.get_flow(flow_location)
            with set_temporary_config({"engine.executor.default_class": self.executor}):
                executor = get_default_executor_class()

            executor = executor(**self.executor_kwargs)
            runner_cls = get_default_flow_runner_class()
            with _log.start_action(flow.name):   ### HERE ! ####
                runner_cls(flow=flow).run(executor=executor)
        except Exception as exc:
            self.logger.exception("Unexpected error raised during flow run: {}".format(exc))
            raise exc
        finally:
            if self.on_exit:
                self.on_exit()