Satyam Tandon
09/07/2021, 7:24 PM_run
creates a new key by default inside the context object so it can be available to every task that is added to the flow
class MyFlow(prefect.Flow):
def _run(
self,
parameters: Dict[str, Any],
runner_cls: type,
run_on_schedule: bool,
**kwargs: Any
) -> "prefect.engine.state.State":
context = dict(my_variable="my_variable")
return super()._run(
parameters,
runner_cls,
run_on_schedule=run_on_schedule,
context=context,
**kwargs
)
This works as intended when I call .run
and the task is able to access that key but it doesn’t work if I register the flow with a local ui and agent and try running it from the UI. Why is that and how would I make it work if I need to register a flow and run it from the UI?
@task(name="log-my-variable")
def log_run_id():
prefect.context.get("logger").info(f"{prefect.context.my_variable}")
with MyFlow("experimental-flow") as flow:
result = log_run_id()
if __name__ == "__main__":
# flow.run()
flow.register(project_name="test-flows")
Kevin Kho
09/07/2021, 7:43 PMflow.run
is not used in the triggered runs by the UI. I think the agent uses a CloudFlowRunner so even if you have this class, the agent is still deploying the flow with the CloudFlowRunner so you might need to edit the agent here but it’s not a trivial process.
I think our mechanism to deal with this is the KV Store on Prefect Cloud, but I have seen server users set up a Redis cache to pull this variable into their Flow.Satyam Tandon
09/07/2021, 7:47 PMKevin Kho
09/09/2021, 9:31 PMSatyam Tandon
09/09/2021, 9:35 PM"PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudFlowRunner"
with some other special class of FlowRunner, where would that class need to live for the agent to be able to use it?Kevin Kho
09/10/2021, 12:36 AMprefect.engine.cloud.CloudFlowRunner
is an import I believe so you would need to make it importable by the agent but not 100% sureSatyam Tandon
09/10/2021, 12:40 AMKevin Kho
09/10/2021, 12:44 AMSatyam Tandon
09/10/2021, 12:57 AMPREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS
on the local agent to the location of the python package that contains that special flow runner class working_directory.<module_name>.<package_name>.<class_name>
but I don’t think it’s able to access it when the flow is registered and said agent is run pointed at localhostKevin Kho
09/10/2021, 1:11 AMpip install prefect
or something so it would be like that. You package the flow runner you made as a library and then do, pip install the_new_library
.config.py
and the dict is my_config
, you can from config import myconfig
and you have that dictionary during runtime because of the import. It can then be used inside tasks.Satyam Tandon
09/10/2021, 3:12 AMCloudFlowRunner
and pass that as the context so again that would just be available to any flow being executed on the agent that uses my FlowRunner to execute flows without having to import that bit of code and having to add that to every flow we write and that’s why config or some other static env variable won’t work for this.