Thread
#prefect-community
    Thomas Furmston

    Thomas Furmston

    11 months ago
    Hi, Probably a noob question, but is there an equivalent to Airflow sensors in Prefect? In particular, suppose there is another team running an ETL that updates a core table in a data warehouse and I want my flow to start when once that table has been updated. In airflow I would use a sensor on that table. Is there something equivalent in prefect?
    Sylvain Hazard

    Sylvain Hazard

    11 months ago
    Hey ! I don't know about sensors equivalents in Prefect but I think you could use Flow of Flows as a way to deal with your use case.
    Thomas Furmston

    Thomas Furmston

    11 months ago
    hey
    I was thinking more of the case in which the upstream team is potentially on a different tech stack, e.g., they are not orchestrating their ETLs through prefect.
    Or we don't want to integrate our flows across different teams as tightly as the flow of flows approach would require.
    For context, we have about 10 ML teams that would probably want this sensor type functionality on a core data engineering pipeline, so I don't think we want all 10 teams integrating their flows into the data engineering flow in the form of flow of flows.
    Sylvain Hazard

    Sylvain Hazard

    11 months ago
    Oh yeah, that doesn't work very well then. According to this GitHub issue, Prefect does not include an equivalent to sensors but you can use the API to react to a third party event emitter.
    Thomas Furmston

    Thomas Furmston

    11 months ago
    ok, great.
    thanks for the link!
    Sylvain Hazard

    Sylvain Hazard

    11 months ago
    You're welcome ! Maybe someone from the Prefect team will have some more stuff to say though.
    Anna Geller

    Anna Geller

    11 months ago
    @Thomas Furmston great question! Overall, there are a couple of ways how you could tackle it:1. Event based: once the event that you are looking for occurs, then trigger a specific flow to run - if you are on AWS, this event could trigger a Lambda function that triggers a FlowRun. The event could be: a new file arrived in S3, new row was streamed to DynamoDB/Kinesis/AWS Aurora -> note that this is very AWS specific 2. API-based: since starting a FlowRun is just an API call, you can do that from any flow or even a task, there is a lot what you could do that, e.g. as you mentioned one table got updated in a task, and subsequent task triggers some other flow or makes an API call to take action 3. State based: this would work in a similar way to Airflow sensors - you raise a RETRY signal if the condition you “poke” for is not met:
    import pendulum
    from prefect.engine.signals import RETRY
    
    
    @task
    def my_sensor(**kwargs):
        # check some state of the world
        if condition_not_met:
            raise RETRY("Condition not met, retrying in 5 seconds.", start_time=pendulum.now().add(seconds=5))
    Thomas Furmston

    Thomas Furmston

    11 months ago
    ok, cool
    Yes, I think option 3 is closest to an airflow sensor. The other options are interesting too though.
    Thanks for the info. 🙂
    Kevin Kho

    Kevin Kho

    11 months ago
    You can loop inside a task to poll for a condition and it’s effectively the same. The RETRY Anna proposed does give logging though