https://prefect.io logo
Title
s

Satyam Tandon

09/07/2021, 7:24 PM
Hope this is the right place to ask this question. This might not be conventional and without going in too much detail about why but I basically created a special instance of the flow like this which the
_run
creates a new key by default inside the context object so it can be available to every task that is added to the flow
class MyFlow(prefect.Flow):
    def _run(
        self,
        parameters: Dict[str, Any],
        runner_cls: type,
        run_on_schedule: bool,
        **kwargs: Any
    ) -> "prefect.engine.state.State":
        context = dict(my_variable="my_variable")
        return super()._run(
            parameters,
            runner_cls,
            run_on_schedule=run_on_schedule,
            context=context,
            **kwargs
        )
This works as intended when I call
.run
and the task is able to access that key but it doesn’t work if I register the flow with a local ui and agent and try running it from the UI. Why is that and how would I make it work if I need to register a flow and run it from the UI?
@task(name="log-my-variable")
def log_run_id():
    prefect.context.get("logger").info(f"{prefect.context.my_variable}")


with MyFlow("experimental-flow") as flow:
    result = log_run_id()


if __name__ == "__main__":
    # flow.run()
    flow.register(project_name="test-flows")
k

Kevin Kho

09/07/2021, 7:43 PM
Hey @Satyam Tandon, so
flow.run
is not used in the triggered runs by the UI. I think the agent uses a CloudFlowRunner so even if you have this class, the agent is still deploying the flow with the CloudFlowRunner so you might need to edit the agent here but it’s not a trivial process. I think our mechanism to deal with this is the KV Store on Prefect Cloud, but I have seen server users set up a Redis cache to pull this variable into their Flow.
s

Satyam Tandon

09/07/2021, 7:47 PM
Ah I see. Thanks, Kevin.
Hey Kevin, quick follow up. Even the flows run from a local agent (not ecs agent as linked in the doc above you sent) also use the CloudFlowRunner when they are being triggered from a UI running on local host or some other host (not prefect cloud though)
k

Kevin Kho

09/09/2021, 9:31 PM
Yes all agents use the CloudFlowRunner
s

Satyam Tandon

09/09/2021, 9:35 PM
Gotcha. I have a few ideas I am going to try out but might have to bother you again. Thank your for your help and patience.
Okay follow up, if I wanted to replace
"PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudFlowRunner"
with some other special class of FlowRunner, where would that class need to live for the agent to be able to use it?
k

Kevin Kho

09/10/2021, 12:36 AM
That
prefect.engine.cloud.CloudFlowRunner
is an import I believe so you would need to make it importable by the agent but not 100% sure
s

Satyam Tandon

09/10/2021, 12:40 AM
Hmm, I’m not sure what I need to do to make sure it’s importable by the agent? Currently the special class of FlowRunner I have is just in the working directory of the agent.
k

Kevin Kho

09/10/2021, 12:44 AM
Like install it as a Python package maybe, but ideally not by editing Prefect cuz you kinda get locked in a version when you edit the Prefect source. I would package it as a Python package (with a setup.py) and pip install it.
Are you opposed to having that variable defined at the top of your Flow script btw? If you store your flow as a script, it will run it and you’ll have that variable during runtime of tasks
s

Satyam Tandon

09/10/2021, 12:57 AM
I’m not sure where I would pip install it so it’s accessible to the agent? Right now that class is sitting where the flow code is in the working directory and I managed to change the environment variable
PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS
on the local agent to the location of the python package that contains that special flow runner class
working_directory.<module_name>.<package_name>.<class_name>
but I don’t think it’s able to access it when the flow is registered and said agent is run pointed at localhost
Haha it’s a tad complicated. It exists as variables in a lot of flow packages and I’m essentially trying to get to the point where the user doesn’t have to write that boilerplate at the top or anywhere in the flow script everytime at all and simply creating a Flow obj would give them access to those variables / functionality
k

Kevin Kho

09/10/2021, 1:11 AM
You must have installed prefect on the agent with
pip install prefect
or something so it would be like that. You package the flow runner you made as a library and then do,
pip install the_new_library
.
It would be installed even before the agent is spun up.
Ok this is quite tricky though and it might be hard to execute. Of course, as an engineer, I admire the enthusiasm to do it this way. 😆 Just wanna leave an alternative that you can literally create a file with just the dictionary. If the file is
config.py
and the dict is
my_config
, you can
from config import myconfig
and you have that dictionary during runtime because of the import. It can then be used inside tasks.
😀 1
s

Satyam Tandon

09/10/2021, 3:12 AM
Haha I thought of that but that wouldn’t work because the variable is actually effected by a user input (a prefect param) and I actually use the variable to overwrite the flow run name. So right now, that param has to be added for every flow we make and I’m trying to get away from that by doing everything in prefect context where I get the default value of that variable from just creating a flow object (so it would exist in prefect context to begin with and very importantly be available to every task created in the flow) and instead of having to add that param to every flow script, the user if they choose to not use the default simply specify the flow run name from the UI at run which you can without having to make any changes to any code as it that functionality you just get from the UI. That’s more context here. I don’t even know if this entire approach makes sense and if it will hit more roadblocks later but that’s what I’ve got so far thinking about this particular use case of mine.
Oh and I guess the variable in my example was a little misleading. It’s not exactly just a variable. It’s dynamically generated for every run and right now again , that code has to imported and added to every flow. The flow runner I created overrides the run method to just dynamically generate that variable and then passes it into the context before calling the run method of the
CloudFlowRunner
and pass that as the context so again that would just be available to any flow being executed on the agent that uses my FlowRunner to execute flows without having to import that bit of code and having to add that to every flow we write and that’s why config or some other static env variable won’t work for this.
👍 1