Hi All, I want to schedule a prefect flow every minute and within the flow have a check to determine...
p
Hi All, I want to schedule a prefect flow every minute and within the flow have a check to determine a file in s3 has changed, if the file is changed a bunch of tasks are executed, otherwise flow exits, for this I need to maintain state across flows, how can I do that, thanks a ton
a
@Prasanth Kothuri To schedule your flow every minute, you need to attach a schedule and then register your flow, e.g.
Copy code
from prefect import task, Flow
from datetime import timedelta
from prefect.schedules import IntervalSchedule

@task
def say_hello():
    print("Hello, world!")

schedule = IntervalSchedule(interval=timedelta(minutes=1))

with Flow("Hello", schedule) as flow:
    say_hello()
To implement a conditional logic that checks e.g. if a file has changed, you can leverage the
case
statement: https://docs.prefect.io/core/idioms/conditional.html#using-conditional-logic-in-a-flow Alternatively, you can implement this conditional logic by retrying so many times until the file arrives in S3, e.g.:
Copy code
import pendulum
from prefect.engine.signals import RETRY
import awswrangler as wr


def check_if_file_arrived_in_s3():
    return wr.s3.does_object_exist("<s3://bucket/example_file.csv>")


@task
def s3_sensor(**kwargs):
    bool_s3_object_arrived = check_if_file_arrived_in_s3()
    if bool_s3_object_arrived is False:
        raise RETRY(
            "File not available yet, retrying in 20 seconds.",
            start_time=pendulum.now().add(seconds=20),
        )
p
@Anna Geller thank you, in my case the file is always there and I need to figure out if the file has changed since the last run... so something like current_checksum_of_the_file != previous_checksum_of_the_file , the question is where do I store that info across flow runs
a
@Prasanth Kothuri The best option to do that would be the KV Store available in Prefect Cloud. Here is an example of how it can be used in a flow: https://docs.prefect.io/orchestration/concepts/kv_store.html#using-key-value-pairs-in-flows
p
@Anna Geller currently we are running on-prem, is this feature available on-prem as well ?
a
Yes, as long as you use Prefect Cloud. You can sign up for Prefect Cloud here and deploy your agents on-prem. But if you use on-prem Prefect Server, then this feature is not available. You would then have to store this information somewhere else, perhaps Redis?
p
yes I can setup a KV store, as long as there is a way for prefect to store and query from this store.. is it possible ?
I would love to use prefect cloud but the bureaucracy at my current place doesn't allow it 😉
a
as long as there is a way for prefect to store and query from this store
You would have to write the logic to write and read from this data store yourself. You can write e.g. a function and wrap it with @task decorator. Or you can use a data store for which there is a Prefect task, e,g. with Redis you have those at your disposal: https://docs.prefect.io/api/latest/tasks/redis.html
There is RedisSet and RedisGet tasks to set and retrieve key value pairs. But of course you would have to manage this Redis instance
a
Since you're reading from S3, perhaps you could write to it as well? It would be simpler than managing Redis just for this. Whether you believe AWS claim of strong consistency, at minute-level intervals, I think eventual consistency problems shouldn't occur.
194 Views