Second question is whether or not there is a way t...
# ask-community
t
Second question is whether or not there is a way to provide state (and define within the Flow) state than using the Parameter tasks. I know the
context
exists, but I am looking for something that can read a file and set the state. I don't think the context object is setup for that. It seems to be more runtime to me
k
For smaller pieces of state, we have the KV Store . This is not limited to runtime and can be used as long as you are authenticated. What do you think of this?
t
Looking for something local to the agent rather than out on the cloud.
that way my config/state can be defined in version control
k
Other than parameters, you can query for the labels based on the Flow, and then do an action based on it. The agent itself doesn’t really hold state. What kind of state are you looking to set? Is it like environments? Is it data your flow depends on?
t
Right, I am not asking for the agent to hold state, just that I can read/load state from something that is local to the agent (the piece out on our environments with Prefect Cloud). Now that I am thinking this through I would like to be able to read a config/state, and then set Parameter values, or use a provided parameter file.
Maybe a better way to put it would be how can I read a local JSON in as Parameters cleanly.
k
Are you using Docker in your flows?
t
no
k
Ok so yeah I think this can be done with a Local Agent. Probably not with Docker because paths then become relative to the container and it needs to be in the container. The Local Agent should be able to read a file from the Local FileSystem and perform actions based on that. Something like S3 seems more reliable, because you absolutely need the Flow to be picked up by a certain agent. I suppose it’s fine though as long as the Labels are straight.
t
So how would I load the file with the agent? We are currently just running a generic agent start command
k
Thinking out loud though, you can have that file in the Docker container that you create and reference it that way since it seems to be a dependency. I should also add that I think JSON Parameters should work.
So the agent is a process on a machine and you would need that file on the same machine and your Flow would read it in, but yeah this becomes runtime already as well.
t
None of that is a problem with our setup. I just don't know how to load that file when a Flow is called. And I have been trying to approach it from the Flow-side which is probably the wrong side.
k
You can read it with in a task (
pd.read_json
or however).
What would your vision look like without Prefect? Would it be a json loaded it at the top as a global variable and then referenced throughout the Flow?
t
Is this where I should switch from th Functional API to the Imperative API, so I can run logic in my flow definition instad of only bing able to call tasks and functions?
Yeah, I would set state when the flow is called, accepting any state provided in the call (with some merging in of the k:v pairs) and then run the flow.
k
You can have logic with in the Flow definition. There is the
case
task for if-else for example
t
Right now I set the same parameters in most of my Flows, which means I define them in most of my flows. Which I just don't like when they are just static values I need for processing (even if the value is based on the Flow I want)
To put it another way, I want to programmatically generate the parameter values and have them available to me in the flow. Would I want a Task that loads up the
context
?
k
You can load the context, but you can’t modify it during runtime already. So by the time the Flow is loaded, the context is initialized so I don’t think that the route to go.
t
Right
k
Do your Flows share common tasks and dependencies that they can belong in the same package?
t
yeah, we are already importing the tasks that are common, which at this point is most of them, from another python file. I think that most of these Flows can fold into a single Flow, or a Flow that is subclassed by the outliers
And we have now merged my two questions into a single thread....
k
I know exactly what you’re saying now. It’s like if I had a JSON list of a hundred connections and all those change between environments from dev to prod. Let me think…lol 😅
t
Ah. Here is what I am looking for. A way to do whatever Prefect Cloud does when I pass a JSON of parameter values. But I want to also not have to specifically define each parameter as a Task, I just want to call the JSON values
s/JSON/Dict/
k
Ok so let’s separate this into getting this JSON in to the runtime way and the buildtime way. For the buildtime way, it would be just loading in the JSON file as a dictionary and using that around the Flow. It would then be your registration process that carries that duplicate effort as your would need to register that flow 3x for 3 different configs. Some users do this with the CI/CD pipeline where they just register with a registration script and pass in the config there. You can do this by wrapping the Flow definition in a function, and you can return the Flow once it’s created for registration. Now for the runtime…
t
I have a question about passing data in the register command
k
Go for it
t
How would I do that? I don't see anything on the
--help
page letting me pass in a configuration
k
Copy code
def create_flow(flow_name, file_path):
    config = read_file(file_path)
    with Flow(flow_name) as flow:
        env = Parameter('env', config['env'])
        shell_task(command="dbt run", env={'ENV': config['env']})
    return flow

flow = create_flow()
flow.register()
t
ok, we can put a pin in that for now, I need ot figure out what I can change/should change in our setup to register from Python
Runtime stuff... hit me.
k
Now for the runtime, you can’t have a global dictionary type of setup because those are evaluated during buildtime. Tasks have deferred execution, which is why Parameters are Tasks, so a value gets injected during runtime. This means we either need to inject a file path, or the config itself. But the problem is that we’ll need to pass this whole config to all tasks that need it. If that is okay, then we can pass the JSON as a parameter, that way you only need to register your script once. Or we could pass in a file path, and then load that from the Local FileSystem through a task, which then returns the config. The config would then be passed to all succeeding tasks.
t
if I had a task that read the config (
config = read_config()
) I could then pass
config['key']
to another task?
or would I have to pass just the whole config?
k
No you would have to pass
config
and then inside the task use
config['key']
t
ok, that isn't horrible (just doesn't drop into the current task signatures.. 😁 )
This was a great converstaion, thank you. I have a lot to take back and hack on
k
Just one last thing. For your use case of subclassing the flow, you can do:
Copy code
def create_flow(flow_name, file_path):
    config = read_file(file_path)
    with Flow(flow_name) as flow:
        env = Parameter('env', config['env'])
        shell_task(command="dbt run", env={'ENV': config['env']})
    return flow

flow = create_flow()

with flow:
    # more edits here

flow.register()
Anytime!
t
👍
k
And I just spoke to the team and this experience is something on our minds to try to make more elegant
t
oh wow, thank you