https://prefect.io logo
#prefect-community
Title
# prefect-community
j

James Phoenix

04/28/2022, 10:47 AM
Or how to make a task that runs after a new blob arrives in a google cloud storage bucket?
a

Anna Geller

04/28/2022, 10:54 AM
I'm so glad you mentioned your use case! It's always easier to help when we understand what do you want to accomplish rather than talking about specific features such as mutating run states with GraphQL You could leverage Google Cloud Functions to trigger a flow run once the file arrives in your storage bucket. Unfortunately., I don't have any examples for GCP, but we have some examples showing that pattern for AWS Lambda and S3 here. Alternatively, you could have a task repeatedly polling for a file in your GCS storage bucket and start processing downstream tasks only once this file arrives.
Copy code
import pendulum
from prefect.engine.signals import RETRY

def check_if_file_arrived_in_gcs():
    return gcs_does_object_exist("<gcs://bucket/example_file.csv>")

@task
def gcs_polling(**kwargs):
    bool_gcs_object_arrived = check_if_file_arrived_in_gcs()
    if bool_s3_object_arrived is False:
        raise RETRY(
            "File not available yet, retrying in 20 seconds.",
            start_time=pendulum.now().add(seconds=20),
        )
j

James Phoenix

04/28/2022, 12:19 PM
Thank you ❤️
👍 1
40 Views