Hey all! Just learning about Prefect, really cool ...
# ask-community
s
Hey all! Just learning about Prefect, really cool stuff so far! I have primarily been looking into what the current solutions are for event-driven flows, and looking through the comments and documentation seems like in Prefect Core there isn't anything built in like Airflow Sensors (I have seen some of the AWS Lambda recommendations and blog posts). Looking at the Orion site however looks like this is being added with the newest engine? Is there any documentation regarding that or is that more so a feature coming soon to Orion and not yet hashed out?
a
@Scott Hood if you look at what Sensors are doing, they are only polling for some state of the world (e.g. a file arrived in S3/SFTP) and if the condition is not met, the sensor sleeps and tries again in a couple of seconds/minutes later. If you want to implement something similar in Prefect, you could leverage the RETRY signal to implement such polling. Here is an example checking if “`example_file.csv`” file arrived in S3:
Copy code
import pendulum
from prefect.engine.signals import RETRY
import awswrangler as wr


def check_if_file_arrived_in_s3():
    return wr.s3.does_object_exist("<s3://bucket/example_file.csv>")


@task
def s3_sensor(**kwargs):
    bool_s3_object_arrived = check_if_file_arrived_in_s3()
    if bool_s3_object_arrived is False:
        raise RETRY(
            "File not available yet, retrying in 20 seconds.",
            start_time=pendulum.now().add(seconds=20),
        )
Additionally, when it comes to using specific cloud resources, many libraries such as AWS’s boto3 include waiters for similar use cases, which make sensors almost superfluous.
I would be interested to hear what would you need this sensors pattern for to give more specific recommendations
s
Hey Anna! For the most part it would be things like listening to Azure Service Bus, Storage Account Events / Event Grid. Creating polling tasks and such easily doable, however if there were native integrations within Prefect it would just make it that much better / friendlier to use.
a
Gotcha. Regarding event-based processing of data, you can always trigger a flow run upon request from your event grid using the
create_flow_run
GraphQL mutation. You don’t even need to have Prefect installed to trigger a flow run, here are some examples using requests: #1 Creating a flow run with a version group ID that will remain the same regardless of new versions later:
Copy code
import requests

query = """
 mutation {
  create_flow_run(input: { version_group_id: "fb919cc4-7f74-4ed7-9f3c-71bdd9be69e8" }) {
    id
  }
}
"""

url = "<https://api.prefect.io>"
response = <http://requests.post|requests.post>(
    url, json={"query": query}, headers={"authorization": "Bearer XXX"}
)
print(response.status_code)
print(response.text)
#2 Or using a specific flow_id:
Copy code
import requests

query = """
 mutation {
  create_flow_run(input: { flow_id: "e9827ce5-f686-4183-b3e9-c629d63d7510" }) {
    id
  }
}
"""

url = "<https://api.prefect.io>"
response = <http://requests.post|requests.post>(
    url, json={"query": query}, headers={"authorization": "Bearer XXX"}
)
print(response.status_code)
print(response.text)
#3 To do the same with Parameters:
Copy code
import requests

create_mutation = """
mutation($input: createFlowRunInput!){
    createFlowRun(input: $input){
        flow_run{
            id
        }
    }
}
"""

inputs = dict(
    versionGroupId="339c86be-5c1c-48f0-b8d3-fe57654afe22", parameters=dict(x=6)
)
response = <http://requests.post|requests.post>(
    url="<https://api.prefect.io>",
    json=dict(query=create_mutation, variables=dict(input=inputs)),
    headers=dict(authorization=f"Bearer {API_KEY}"),
)
print(response.status_code)
print(response.text)
s
Cool thanks! In the scenario of utilizing service bus I would assume the recommendation would be the same as the previous answer to where instead of calling S3 you have a retry listening for new messages?
a
no no, with event based, you’re simply triggering a flow run in response to an event. E.g. Azure Event Grid could send you event about something (let’s assume about a new message that arrived in the message bus). Then, you could have a flow that processes this message (e,g, perhaps extracting some information from them and storing the end result in a database). So the process for this scenario would be: 1. A message arrived in the message bus 2. This message arrival triggers a new flow run of a flow called “Message Processor” - perhaps passing this message to your flow as a string in a flow Parameter (see option #3 above to create a flow run and passing a parameter from a request) 3. This flow is a normal Prefect flow that can have multiple normal tasks e.g. one task a Parameter and another one that does something with the parameter value This is just one example. But the point here is that you don’t need any sensors or polling in this case because the flow run is triggered event based. I don’t know Azure enough to give more specifics, but with AWS this could be implemented using: 1. SNS topic 2. AWS Lambda triggered from the SNS message event 3. Prefect flow run triggered from Lambda
cc @Scott Hood
s
Gotcha, ya push model where cloud resources trigger the flows.
👍 1