Hi all! My company is just starting to use Prefect...
# prefect-community
e
Hi all! My company is just starting to use Prefect as our orchestration tool for our data pipeline for end customers and I have a few questions on how best to set it all up. Is there a way to reuse a single flow for every company using our product dynamically? We want to use Prefect to orchestrate each company’s data/ETL pipeline and we have the configuration for the pipeline (which system they’re integrating with etc..) stored in our database. The ideal is to dynamically create flows per company on their own schedule. Is that possible? Am I thinking about this the correct way?
k
Hey @Ethan Veres, we definitely have some people doing it. There are a few ways to achieve so let me walk you through was DOES NOT work first. Some people try to have one flow, and then add clocks like this to modify the default parameters and create a new clock to the schedule. This is fine for a small number of parameter sets, but it doesn’t scale when you have more than 10 companies you have to create flows from. For any given flow, the Prefect scheduler only schedules the next 10 flow runs. This job runs on a 90 seconds interval. So if you have 15 jobs schedules for 8 AM with different companies, 10 will be schedules and 5 will disappear. This can be fixed by doing something like 5 flow runs at 8 AM and 5 flow runs at 8:10 AM and 5 flow runs at 8:20 AM but I think at this point, you may as well look into other options. So the end goal is to create one Flow and register it for each company with different parameter values. For this you have a function to register flows:
Copy code
def registration_func(company):
    with Flow(..) as flow:
        company = Parameter("company", company)
        ...
    flow.schedule = ...
    flow.register()
and then you call this whenever you want to register a new function. And then if you use the default pickle-based storage that serializes the flow, this whole thing gets serialized and a copy will be uploaded (to S3 or GCS for example). This makes that file independent now of changes for any other company. And then if the Flow inside the registration_func changes, just re-register all of them for it to take effect with some kind of loop
e
Thanks for the detailed answer @Kevin Kho! To spell it out even further, once I wrap the flow in its own function and make that callable, how do I deploy it?
k
It’s a bit open-ended depending on your CI/CD process, but as long have you have a script that imports this and calls it with the appropriate parameters you want to pass, then it will work. For example, I could see something like you have this registration script and call it like:
Copy code
python registration_flow.py --arg1 company1 --arg2 parameter2
I don’t know the exact syntax but think you can accept arguments to this script through the command line. You can even go so far to build your own CLI, and then now you can call this from CI/CD right? I think there are a lot of possibilities and this is just one.
e
I like that. This seems really straightforward. Thank you and appreciate your time!
k
Of course!
d
@Kevin Kho "For any given flow, the Prefect scheduler only schedules the next 10 flow runs. This job runs on a 90 seconds interval. So if you have 15 jobs schedules for 8 AM with different companies, 10 will be schedules and 5 will disappear." Can you deploy a flow and then send multiple api requests to schedule flow runs (of that flow) at the same time. For example, can you send 20 api requests to schedule a flow run for 10am? If so, will they all execute or will 10 be missed?
k
You can do it with
create_flow_run
because those are not runs created by the scheduler service. Those are runs created to “run now”. This is all for 1.0 though, not Orion
d
thats cool, is the plan to also have this for Orion?
or does Orion already behave in this way?
k
I believe Orion’s scheduler is more generous already with run creation, and I think the concept of multiple clocks with a schedule is not 1:1 in Orion. I think they’d be multiple deployments with different default parameters. It’s actually a lot easier to code this setup because the logic is decoupled from the scheduling and parameters since those happen in the deployment file
e
@Kevin Kho Finally getting around to implementing this and I’m running into this warning:
Copy code
UserWarning: No result handler was specified on your Flow. Cloud features such as input caching and resuming task runs from failure may not work properly.
any ideas?
k
Is that 1.0 or 2.0? I think it’s 1.0 and I think that’s just a warning. It’s just saying you can use results on the Flowto support things like checkpointing