Heya! I’m developing a flow to run a bunch of mat...
# prefect-community
j
Heya! I’m developing a flow to run a bunch of materialised view refreshes with Prefect 2 (local agent + Cloud). These flows must never backfill as doing so yields no benefit. How can I tell my deployment to never backfill scheduled runs or is this something I should handle within the flow by checking the state (i.e. “late”) and have it report a successful flow run without running any tasks? It would be absolutely amazing, if someone can point me to a recipe or demo of any kind that I can lean on 🤷
1
s
Wdym by backfill
And you're using Postgres?
j
I’ve shut down my agent. Below is how the queue looks like at the moment. If I start the agent now, it will try to run all “Late” jobs, essentially performing needless refreshes. I’d like it to either • Only run next scheduled • Run last “late” run and forget about all previous ones I strongly suspect this is something I need to handle within the flow, but not sure where to start 😞
m
Hey @jpuris I actually think an easier way to manage this might be to limit the number of scheduled runs that prefect creates, i.e. by defaults it attempts to schedule out 100 runs up to 100 days in the future, you can limit this in the configuration to just 1 preventing a backlog of flow runs from building up https://docs.prefect.io/concepts/schedules/.
j
Oh! I had no idea this option existed! I’ll look into this ASAP. Thank you @Mason Menges! 🙏
m
No problem 😄 Admittedly this can depend on how many runs you'd want to schedule in the same day but it might be worthing looking into, from there you could also implement some logic in your flow to check the scheduled start date for the run and compare that to the current time, if it falls out side of some predetermined range you could skip the rest of the tasks in the flow.
j
I see the 100 day / 100 runs default is for the scheduler service itself and not for deployment 😞 I’m looking for a solution per deployment or within a flow. Also since I’m using Prefect 2 (Cloud), I don’t think these scheduler service settings are available to me for alter.
I feel like I may have asked the question the wrong way.. let me rephrase ☀️ How can I tell my deployment / flow to not run any late schedules from the queue, only upcoming. I’m thinking retrieving
expected_start_time
(A) from flow context and compare it with
current time - 5 minutes
(B), if A < B, then have flow complete without running any tasks. There are couple of issues with my idea and it feels fairly hacky 😞 What would be the recommended approach to achieve something like this?
1
m
I don't believe there's a way to control this from the deployment or within the UI. Implementing some logic around the expected start time as you suggested would definitely be one way to accomplish this. All that said I think Cacheing the task run states would also be a viable option, essentially this wouldn't prevent the flow runs from triggering however you'd be able to persist the results of your task runs so that when the flow triggers again it just uses the cached state of the tasks instead of recomputing anything for some predetermined amount of time. https://docs.prefect.io/concepts/tasks/#task-arguments https://discourse.prefect.io/t/how-can-i-cache-a-task-result-for-two-hours-to-prevent-re-computation/67
j
Thank you, @Mason Menges! I’ll look into cache as well. I’ve attempted to retrieve some of the context metadata, which sort of works for some keys, but not others..
Copy code
from prefect import flow, context

@flow(name="Hello Flow")
def hello_world():
    
    ctx = context.get_run_context()
    print('================')
    print('----CONTEXT----')
    print(ctx)
    print('----CONTEXT, start_time ----')
    print(ctx.get().start_time)
    print('----CONTEXT, expected_start_time ----')
    print(ctx.get().expected_start_time)
    print('================')

if __name__ == '__main__':
    hello_world()
I found this out solely by experimentation. I suspect I’m querying the flow’s context incorrectly with
ctx.get().expected_start_time
as it produces following error (full run in attachment)
Copy code
...
AttributeError: 'FlowRunContext' object has no attribute 'expected_start_time'
...
If you have any docs on mechanics of Context object for prefect 2, please do point me in the correct direction 🙏
m
No problem 😄 You should be able to access the context for the flow run this way
Copy code
from prefect import context

test_context = context.FlowRunContext.get()

start_time = test_context.flow_run.expected_start_time
As for docs this is a good place to reference https://docs.prefect.io/api-ref/prefect/context/
j
AH! I’m need new glasses 😄 It is in the
flow_run=FlowRun(..., estimated_run_time=foo)
. Thank you, thank you, thank you!
m
No problem at all 😄