Hi, Probably a noob question, but is there an equi...
# ask-community
t
Hi, Probably a noob question, but is there an equivalent to Airflow sensors in Prefect? In particular, suppose there is another team running an ETL that updates a core table in a data warehouse and I want my flow to start when once that table has been updated. In airflow I would use a sensor on that table. Is there something equivalent in prefect?
s
Hey ! I don't know about sensors equivalents in Prefect but I think you could use Flow of Flows as a way to deal with your use case.
upvote 1
t
hey
I was thinking more of the case in which the upstream team is potentially on a different tech stack, e.g., they are not orchestrating their ETLs through prefect.
Or we don't want to integrate our flows across different teams as tightly as the flow of flows approach would require.
For context, we have about 10 ML teams that would probably want this sensor type functionality on a core data engineering pipeline, so I don't think we want all 10 teams integrating their flows into the data engineering flow in the form of flow of flows.
s
Oh yeah, that doesn't work very well then. According to this GitHub issue, Prefect does not include an equivalent to sensors but you can use the API to react to a third party event emitter.
t
ok, great.
thanks for the link!
s
You're welcome ! Maybe someone from the Prefect team will have some more stuff to say though.
a
@Thomas Furmston great question! Overall, there are a couple of ways how you could tackle it: 1. Event based: once the event that you are looking for occurs, then trigger a specific flow to run - if you are on AWS, this event could trigger a Lambda function that triggers a FlowRun. The event could be: a new file arrived in S3, new row was streamed to DynamoDB/Kinesis/AWS Aurora -> note that this is very AWS specific 2. API-based: since starting a FlowRun is just an API call, you can do that from any flow or even a task, there is a lot what you could do that, e.g. as you mentioned one table got updated in a task, and subsequent task triggers some other flow or makes an API call to take action 3. State based: this would work in a similar way to Airflow sensors - you raise a RETRY signal if the condition you “poke” for is not met:
Copy code
import pendulum
from prefect.engine.signals import RETRY


@task
def my_sensor(**kwargs):
    # check some state of the world
    if condition_not_met:
        raise RETRY("Condition not met, retrying in 5 seconds.", start_time=pendulum.now().add(seconds=5))
t
ok, cool
Yes, I think option 3 is closest to an airflow sensor. The other options are interesting too though.
Thanks for the info. 🙂
👍 1
k
You can loop inside a task to poll for a condition and it’s effectively the same. The RETRY Anna proposed does give logging though