Scott Hood
12/10/2021, 5:21 PMAnna Geller
import pendulum
from prefect.engine.signals import RETRY
import awswrangler as wr
def check_if_file_arrived_in_s3():
return wr.s3.does_object_exist("<s3://bucket/example_file.csv>")
@task
def s3_sensor(**kwargs):
bool_s3_object_arrived = check_if_file_arrived_in_s3()
if bool_s3_object_arrived is False:
raise RETRY(
"File not available yet, retrying in 20 seconds.",
start_time=pendulum.now().add(seconds=20),
)
Additionally, when it comes to using specific cloud resources, many libraries such as AWS’s boto3 include waiters for similar use cases, which make sensors almost superfluous.Anna Geller
Scott Hood
12/10/2021, 7:49 PMAnna Geller
create_flow_run
GraphQL mutation. You don’t even need to have Prefect installed to trigger a flow run, here are some examples using requests:
#1 Creating a flow run with a version group ID that will remain the same regardless of new versions later:
import requests
query = """
mutation {
create_flow_run(input: { version_group_id: "fb919cc4-7f74-4ed7-9f3c-71bdd9be69e8" }) {
id
}
}
"""
url = "<https://api.prefect.io>"
response = <http://requests.post|requests.post>(
url, json={"query": query}, headers={"authorization": "Bearer XXX"}
)
print(response.status_code)
print(response.text)
#2 Or using a specific flow_id:
import requests
query = """
mutation {
create_flow_run(input: { flow_id: "e9827ce5-f686-4183-b3e9-c629d63d7510" }) {
id
}
}
"""
url = "<https://api.prefect.io>"
response = <http://requests.post|requests.post>(
url, json={"query": query}, headers={"authorization": "Bearer XXX"}
)
print(response.status_code)
print(response.text)
#3 To do the same with Parameters:
import requests
create_mutation = """
mutation($input: createFlowRunInput!){
createFlowRun(input: $input){
flow_run{
id
}
}
}
"""
inputs = dict(
versionGroupId="339c86be-5c1c-48f0-b8d3-fe57654afe22", parameters=dict(x=6)
)
response = <http://requests.post|requests.post>(
url="<https://api.prefect.io>",
json=dict(query=create_mutation, variables=dict(input=inputs)),
headers=dict(authorization=f"Bearer {API_KEY}"),
)
print(response.status_code)
print(response.text)
Scott Hood
12/10/2021, 8:24 PMAnna Geller
Anna Geller
Scott Hood
12/10/2021, 8:46 PM