https://prefect.io logo
m

Matthew Webster

01/27/2022, 6:10 PM
Hi, I’m looking for some guidance on how to use the same flows across multiple projects. For context: we are doing ETL for customers and some flows are identical. We’re trying ECS Agent/Prefect Cloud with a custom Docker image and S3 storage. I am currently registering the flow with the CLI for multiple customers and passing in customer-specific information as Parameters when scheduling the run. This currently breaks down in a few places: 1. Having some problem with Storage and unpicking flows. Is there project-specific info stored there? So when a flow is registered/picked for one project it can’t be unpickled by another? 2. Using secrets. We have some customer-specific API Keys that need to be stored as Secrets and these appear to only be set globally in Prefect Cloud (or AWS SSM). There are no per-project secrets that I can pass in. One option I can think of would be to make a service that takes a Parameter and returns a secret that then gets passed to a task but maybe there’s a better way? I’m still pretty new to Prefect but couldn’t find answers to these. Hoping for some help from the community!
k

Kevin Kho

01/27/2022, 6:13 PM
Your thinking is definitely right on 2. Secrets are on the tenant level so you need to do something like prefixing these API Keys and then pulling the appropriate secret. What is your error for 1? I don’t think there is project-specific information stored there. We have some people on the set-up you described
m

Matthew Webster

01/27/2022, 11:47 PM
The problem on #2 is finding the name of the secret given it’s different per project. The best way to do this feels like creating another task based on a Parameter that looks up the Secret name and also get the value. Does that make sense?
k

Kevin Kho

01/28/2022, 1:53 AM
It does, but I think it can be fixed by standardizing the secret names?
USER1-KEY
,
USER2-KEY
so that if you can just do param+_KEY. There is a thread here I think will be useful one sec
https://prefect-community.slack.com/archives/CL09KU1K7/p1638483576263700 this is a similar pattern and might be a good read
m

Matthew Webster

01/28/2022, 3:06 AM
Thanks. This is helpful. I think the new task is going to work best for us in this case. I’m going to use AWS SSM secrets and have a naming convention that maps to a Parameter we can set in each project.
For #1 I changed some Python version and got rid of the pickle errors. The problem seems to be that my tasks are in separate files from the flow. I assumed that when pickled the tasks would be included but it looks like that’s not the case--only the flow script seems to be there.
Since my tasks are already in my custom Docker image which is in ECR I’m going to try to get them from there. I know it’s a little tricky since the Prefect Docker logic seems to be designed around building images rather than using existing ones but I’m going to try to make it work…
k

Kevin Kho

01/28/2022, 3:40 AM
You can certainly use existing ones. Let me give you a sample script
register with
build=False
to not build the Docker storage
m

Matthew Webster

01/28/2022, 4:07 PM
Thanks, this looks directionally what we’re after. However, the workflow we’re looking for is to add these flows to new projects without changing the code. I’ve been relying on the CLI to register flows and that seems like it won’t be possible with this approach. Is the build = False option still not available in the CLI? Do you have other ideas for how we can add this to new projects? (BTW, we also have multiple flows in one PY file. Will also test to make sure that works.)
k

Kevin Kho

01/28/2022, 7:13 PM
I feel this can still be done if you store as a script that is parameterized. When Prefect loads it, it will use the script inside the container then add parameters. Unfortunately, it doesn’t seem to be available on the CLI.
m

Matthew Webster

01/31/2022, 4:55 AM
Cool. Makes sense! I’m trying to get this into a CI/CD pipeline so I created a script called register.py that gets run after the image is built anytime code is changed. It does something like this:
Copy code
if __name__ == "__main__":
    flow_list = [setup_flow, semester_flow, daily_flow, hourly_flow]
    project_list = ["client_1", "client_2"]
    for flow in flow_list:
        docker_storage.add_flow(flow)
        for project_name in project_list:
            flow.register(project_name=project_name, build=False)
I’ll need to add a way to parameterize to add more projects without a build in the future. I guess the way to do that is to look at the cloud API?
k

Kevin Kho

01/31/2022, 5:02 AM
I don’t have an immediate answer. I will think about this more. Ping me for sure if I don’t respond to you tom.
m

Matthew Webster

01/31/2022, 4:01 PM
No worries! There’s really no urgency here. At some point we’re going to want our management tool to be able to create projects and add flows with schedules to them. We’re unblocked though and all that is a bridge for another day. Thanks for your help, Kevin!
k

Kevin Kho

01/31/2022, 4:02 PM
Yep will respond a bit later
I think you can use the Cloud API, but I think it’s easier to tackle at the Python registration level. Just some thoughts. If you have a flow building and registering function:
Copy code
def build_and_register(my_param):
    with Flow():
        x = Parameter("x", default=my_param)
    flow.register(...)

if __name__ == "__main__":
    for param in my_param:
        build_and_register(param)
then you can parameterize this using click to take in the values from the command line. You can also pass in some
config
file that gets loaded in to create these Flows. What I would personally do then is use DockerRun for dependencies but something like S3 for storage. This means registration gives me a new distinct file for every client with the appropriate defaults. If I have to upgrade the Flow, I would use just change it in one place and re-register everything for it to take effect.
There is only one edge case to this and it’s that executor needs to live in the Flow file because it’s not serialized with the Flow, so flow.executor = LocalDaskExecutor or DaskExecutor needs some extra steps to make it take effect. Does this give you a clearer picture?
👍 1
You can then take in arguments
python myflow.py -arg1 param1 -arg2 param2
so you can use it with CI/CD if that makes sense:
m

Matthew Webster

02/05/2022, 12:09 AM
Thanks, Kevin. I’m imagining that the CI/CD process will seed defaults but that there may need to be some other systems that are doing ongoing changes to parameters. That’s why I was interesting in the Cloud API. At any rate, we’re unblocked on this for now. Thanks!
k

Kevin Kho

02/05/2022, 12:12 AM
Gotcha
5 Views