Hey everyone I have a huge problem with prefect sc...
# ask-community
t
Hey everyone I have a huge problem with prefect schedule , because it's not possible to have the schedule as a parameter as per my understanding of the docs. The current problem I have run into while creating an ETL flow is that I have an extract, transform, load template that has source, dest, transform code as parameters. But I also need to accept a Cron interval string as a parameter and monitor each flow run individually. How do I accomplish that ? EDIT 1 The whole scenario is this, users should be able to come to our website , enter source, dest, transform code, cron interval for their ETL and on the backend we should do the rest. EDIT 2 If it is possible to accomplish the above, how do I monitor each flow run. I want to in the future possibly be able to pause and unpause a flow run, not all the flow runs belonging to a particular flow id like shown here https://docs.prefect.io/orchestration/flow-runs/scheduling.html#toggling-flow-schedules
j
There are a few ways to implement this: 1. Have a flow that runs every 5 minutes and checks if any jobs need to run (I do not recommend this approach, we have this in our company and it’s a nightmare to balance load and debug things) 2. You can create a simple interface that creates Prefect schedules using GraphQL based on inputs from users. You could even have a flow that creates the schedule for the ETL flow. So people have a UI for it.
t
You need to expand on the second point
k
In this case what you need is a “flow builder” kind of script and when they input their stuff (including schedule), you need to register a new Flow for them. The issue with coupling them in the same Flow is that a Flow only has one schedule. This means that if user A turns off their schedule, you don’t want to turn off user B’s flows necessarily. So what needs to happen is that they submit their stuff, you have an API that goes ahead and registers a new Flow with the Parameters.
Copy code
def register_flow(param_A, sched):
    with Flow(..) as flow:
        a = Parameter("a", default=param_A)
        ...
    flow.schedule = ...
    flow.register()
marvin 1
flow.register()
returns a flow_id so you can keep track of what users own which flows (keeping it in your database separate from Prefect). So to pull up running flows for one of your users, you can use the GraphQL and query for them given the
flow_id
. You can also use projects as your logical separation and query by that.
I think what Jacobo is suggesting is to purely use the GraphQL like this to mutate schedules and their default values. I personally suggest splitting these into different Flows though
upvote 1
j
Yeah given our extensive experience with this pattern, you’re much better off creating separate flows.
t
@Kevin Kho so while registering new flows do you have to give them a new name each time ?
Copy code
def register_flow(param_A, sched):
    with Flow( NEW NAME  HERE ??) as flow:
        a = Parameter("a", default=param_A)
        ...
    flow.schedule = ...
    flow.register()
k
They can be the same name if they are in different projects. The project-name combination should be unique
t
Okay now I think there's even bigger problems because we need it in production. Like a vertex agent (we are using GCP fro sure) and github as source storage (what do you suggest). So how do we do it ? Considering that a flow file has imports from custom modules and stuff like that
k
You likely need Docker but you can do Github Storage + VertexRun and then your pre-built image goes to the Vertex RunConfiguration
t
will flow builder logic integrate well ?
k
Ehhh……yes for the image and VertexRun. The issue for Github Storage is that you are responsible for pushing for your to Github. You can use GCS Storage and I think that uploads the Flow file for you so that might be a lot easier.
Or you can find a way to automate pushing the Flow code to Github and that will be fine. I think using GCS Storage is a lot easier because you just upload the Flow script directly and there’s no additional steps (just the agent needs to be authenticated to pull it down)
t
Are you saying I have to create a new file each time I want to register a flow with a different schedule ?
with 95% of code remaining the same ? The reason I'm asking is because if I have to make a small change then it'll be difficult for me to update everywhere for all the flows
k
Without that file or not, the fact is that you will need to re-register all of the Flows anyway for those changes to take effect. Once you update the Flow builder code (just one place) and re-register everything, that replaces those files in storage so yes you have different files, but no you don’t have to maintain each one.
t
How do we tell the agent that there are new files in GCS and it needs to pull them all ? with ref. to https://prefect-community.slack.com/archives/CL09KU1K7/p1638485038270600?thread_ts=1638483576.263700&cid=CL09KU1K7
k
When you register, Prefect saves the metadata information. When a flow run starts, the agent pulls the metadata (including storage location) and fetches the Flow and executes it. It’s the registration process that tells the agent where to pull the file from. The agent won’t just pull all the files automatically. For each flow run, it will pull the relevant Flow file.
upvote 1
When you re-register, the file in storage is replaced or updated so the agent will know to pull the latest registered version