Simone Cittadini
06/03/2020, 2:59 PMnicholas
06/03/2020, 3:32 PMSimone Cittadini
06/04/2020, 6:20 AMfrom prefect import Flow as BaseFlow
from eliot import start_action
class Flow(BaseFlow):
def run(self, ... ):
with start_action(self.name):
return super().run(...)
And in every Task run I do the same :
class SomeTask(Task):
def run(self):
with start_action(self.name):
...
I get logs like these from a direct invocation ( I mean calling flow.run() in the code itself, no agent / runner involved) :
{"action_status": "started", "event.id": "7cd02f09-...", "action_type": "flow name", "event.task_level": [1], ...}
{"log_level": "INFO", "logger": "prefect.FlowRunner", "event.id": "7cd02f09-...", "event.task_level": [2], ...}
...
{"action_status": "started", "event.id": "7cd02f09-...", "action_type": "task name", "event.task_level": [5, 1], ...}
etc...
Pushing the event ids into the calls to our APIs enables a complete tracing of order and nesting of each event, done into and triggered by each flow run.
But if I register the flow code and run it from the UI I get different logs :
{"log_level": "INFO", "logger": "prefect.CloudFlowRunner", "event.id": "0445644e-...", "event.task_level": [1], ...}
{"log_level": "INFO", "logger": "prefect.CloudFlowRunner", "event.id": "05b36d25-...", "event.task_level": [1], ...}
...
As you can see there are no "action_status" entries, ids are different for each line and the level is always 1, no context was opened during this run.
Hence I supposed that the runner calls some other method to run the flow, and there's another place in the code which I need to patch / override
( Hope I've been clear, thank you for your interest )nicholas
06/04/2020, 4:15 PMSimone Cittadini
06/09/2020, 10:20 AMclass _Environment(RemoteEnvironment): # type: ignore
def execute( # type: ignore
self,
storage, # Storage
flow_location: str,
**kwargs: Any,
) -> None:
if self.on_start:
self.on_start()
try:
from prefect.engine import (
get_default_executor_class,
get_default_flow_runner_class,
)
flow = storage.get_flow(flow_location)
with set_temporary_config({"engine.executor.default_class": self.executor}):
executor = get_default_executor_class()
executor = executor(**self.executor_kwargs)
runner_cls = get_default_flow_runner_class()
with _log.start_action(flow.name): ### HERE ! ####
runner_cls(flow=flow).run(executor=executor)
except Exception as exc:
self.logger.exception("Unexpected error raised during flow run: {}".format(exc))
raise exc
finally:
if self.on_exit:
self.on_exit()