Hi Everyone, I’m new to Prefect and trying to figu...
# ask-community
j
Hi Everyone, I’m new to Prefect and trying to figure out if it would help us improve our data pipeline. Currently we use a proprietary orchestration architecture but we have the desire to move to something open-source with wide adoption. I have looked through the docs and I just have a particular situation of how our pipeline works that I am unsure how to integrate with Prefect. We have some platforms outside of our own cloud resources that collect data and has a subsequent ETL process on our own resources. In many cases these collections may take hours, and because they don’t happen inside the executer itself it seems atypical to most of the examples I have found in the docs. Really what I need to happen is for the Prefect flow to start a data collection via an API call to our outside service and then stay is some sort of ‘running’ state until it could be signaled by our outside service (maybe via API call?) that the collection is complete. Then the subsequent task in the flow would download that collected data from S3 and trigger subsequent tasks for our ETL process. Is there any way to 'push' a notification to a Prefect task to signal it to complete? I read through the docs and came up with the code below which would be more like a polling/listening solution (if I have even done it correctly at all). Would this work? is there a better way?
Copy code
import prefect
from prefect import task, Flow
from prefect.engine.signals import RETRY, FAIL, PAUSE
from datetime import datetime, timedelta


@task
def start_collection(timeout_interval, collection_name):
    invoke_outside_service(collection_name)
    timeout_time = datetime.now() + timedelta(seconds=timeout_interval)
    return timeout_time


#can I use code to define max retries? ideally 'max_retries=timeout_interval//600 + 2' to retry every 5 minutes until timeout period ends
@task(max_retries=50, retry_delay=timedelta(seconds=300))
def check_collection_status(timeout_time, collection_name):
    status = poll_outside_service_status(collection_name)
    if status == 'complete':
        return '<s3://nitorum-data/>' + collection_name + '/' + datetime.now(
        ).strftime('%Y-%m-%d') + '.csv'
    elif status == 'failed':
        raise FAIL(message='Collection failed.')
    elif status == 'running':
        if datetime.now() > timeout_time:
            raise PAUSE(message='Collection timed out.')
        else:
            raise RETRY(message='Collection still running.')


@task
def load_data(path):
    data = download_data_from_s3(path)
    return data


@task
def transform_task(data):
    data = transform(data)
    return data


@task
def save_data(data):
    save_to_database(data)


with Flow("ETL with Outside Service Data Collection") as flow:
    timeout_interval = Parameter("timeout_interval", default=3600)
    collection_name = Parameter("collection_name", default="colleciton_1")

    timeout_time = start_collection(timeout_interval, collection_name)

    path = check_collection_status(timeout_time, collection_name)

    data = load_data(path)

    data = transform_task(data)

    save_data(data)
k
Hi @James Brink! Just a friendly reminder to put the long code blocks in the thread to keep the main channel clean. I know what you’re going for. My suggestion is to define an inner function inside the
check_collection_status
. If the
status != complete
, recursively call the inner function until it is complete then raise
SUCCESS
. What do you think of this?
No need to dynamically set the retries to some arbitrary limit. Just define the polling time for the recursion
j
Sorry... i dont really use slack ever... Can you give me an example of what this would look like? Just replace raise retry with a call to execute the function again recursively?
is the raising of fails/pause like this proper for prefect?
k
Copy code
@task()
def check_collection_status(timeout_time, collection_name):

    def recursive_call():
        if datetime.now() > timeout_time:
            raise PAUSE(message='Collection timed out.')

        status = poll_outside_service_status(collection_name)
        
        if status == 'complete':
            return '<s3://nitorum-data/>' + collection_name + '/' + datetime.now(
            ).strftime('%Y-%m-%d') + '.csv'
        elif status == 'failed':
            raise FAIL(message='Collection failed.')

        time.sleep(some_interval_here)
        recursive_call()
Yes it right! and you get your messages in the UI by doing so. I’m just avoiding the task retries.
j
ok... so the whole flow would look like the below correct? This way of polling is the only way to accomplish this correct? There is no way to 'push' to a task, right? The best we can do is listen. I moved the timeout test after the 'if complete' block so that if something completes after the timeout time the paused task could easily be restarted manually (this is something you can do in the UI right?).
import prefect
from prefect import task, Flow from prefect.engine.signals import RETRY, FAIL, PAUSE from datetime import datetime, timedelta @task def start_collection(timeout_interval, collection_name): invoke_outside_service(collection_name) timeout_time = datetime.now() + timedelta(seconds=timeout_interval) return timeout_time @task() def check_collection_status(timeout_time, collection_name): def recursive_call(): status = poll_outside_service_status(collection_name) if status == 'complete': return 's3://nitorum-data/' + collection_name + '/' + datetime.now( ).strftime('%Y-%m-%d') + '.csv' elif status == 'failed': raise FAIL(message='Collection failed.') else: if datetime.now() > timeout_time: raise PAUSE(message='Collection timed out.') time.sleep(3600) recursive_call() recursive_call() @task def load_data(path): data = download_data_from_s3(path) return data @task def transform_task(data): data = transform(data) return data @task def save_data(data): save_to_database(data) with Flow("ETL with Outside Service Data Collection") as flow: timeout_interval = Parameter("timeout_interval", default=3600) collection_name = Parameter("collection_name", default="colleciton_1") timeout_time = start_collection(timeout_interval, collection_name) path = check_collection_status(timeout_time, collection_name) data = load_data(path) data = transform_task(data) save_data(data)
k
Yes it can be continued from the UI
It looks good to me
j
thanks for the help!
👍 1