Hi Prefect Team, I have been using Airflow for som...
# prefect-server
n
Hi Prefect Team, I have been using Airflow for some time. Heard about Prefect and gave it a try. Awesome product. I wanted some help on the following as it is an essential requirement for us :- • I have a ETL flow which runs every minute. It loads data from available in Kafka and processes it. • Sometime, the data in Kafka is very high and the flow continues beyond 1 minute. This leads to a pileup of scheduled flow runs. • Wanted to know if it is possible to not start the next scheduled flow run if the previous run of the flow is still running? I think, the Cloud concurrency feature is one solution. But unfortunately my system has very patchy internet(which is why data load fluctuates). So Cloud is not an option at present. Thanks.
j
Hi Nitin, Hope you are well. NB: I'm pretty new with Prefect also, but I think (yes) that your request is possible. Note: There may be a better/nicer was to do this, than what I suggest below :) • Add an initial task to your flow, to test and skip if a previous one is running • You can query the backend about flows - either through the existing Python methods, or through GraphQL directly. ◦ Have a look at
src/prefect/backend/flow.py
(github link). • I think (haven't tried) that
from_flow_group_id
will probably end up giving your test task, the flow that it itself is part of (rather than any previous one) ◦ but the internal (static) method should help (
_query_for_flows)
◦ either to use directly, or generate your own query
upvote 1
k
Hey @Nitin Madhavan, John is pretty right here. In general, I think 1 minute flows is the lowest Prefect can go reliably. You can use the GraphQL API to query for existing flow runs, and if there are, just SKIP the rest to the flow.