Prasanth Kothuri
11/26/2021, 3:41 PMAnna Geller
from prefect import task, Flow
from datetime import timedelta
from prefect.schedules import IntervalSchedule
@task
def say_hello():
print("Hello, world!")
schedule = IntervalSchedule(interval=timedelta(minutes=1))
with Flow("Hello", schedule) as flow:
say_hello()
To implement a conditional logic that checks e.g. if a file has changed, you can leverage the case
statement: https://docs.prefect.io/core/idioms/conditional.html#using-conditional-logic-in-a-flow
Alternatively, you can implement this conditional logic by retrying so many times until the file arrives in S3, e.g.:
import pendulum
from prefect.engine.signals import RETRY
import awswrangler as wr
def check_if_file_arrived_in_s3():
return wr.s3.does_object_exist("<s3://bucket/example_file.csv>")
@task
def s3_sensor(**kwargs):
bool_s3_object_arrived = check_if_file_arrived_in_s3()
if bool_s3_object_arrived is False:
raise RETRY(
"File not available yet, retrying in 20 seconds.",
start_time=pendulum.now().add(seconds=20),
)
Prasanth Kothuri
11/26/2021, 4:05 PMAnna Geller
Prasanth Kothuri
11/26/2021, 4:11 PMAnna Geller
Prasanth Kothuri
11/26/2021, 4:23 PMPrasanth Kothuri
11/26/2021, 4:24 PMAnna Geller
as long as there is a way for prefect to store and query from this storeYou would have to write the logic to write and read from this data store yourself. You can write e.g. a function and wrap it with @task decorator. Or you can use a data store for which there is a Prefect task, e,g. with Redis you have those at your disposal: https://docs.prefect.io/api/latest/tasks/redis.html
Anna Geller
Amanda Wee
11/26/2021, 8:09 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by