Thread
#prefect-community
    James Brink

    James Brink

    1 year ago
    Hi Everyone, I’m new to Prefect and trying to figure out if it would help us improve our data pipeline. Currently we use a proprietary orchestration architecture but we have the desire to move to something open-source with wide adoption. I have looked through the docs and I just have a particular situation of how our pipeline works that I am unsure how to integrate with Prefect. We have some platforms outside of our own cloud resources that collect data and has a subsequent ETL process on our own resources. In many cases these collections may take hours, and because they don’t happen inside the executer itself it seems atypical to most of the examples I have found in the docs. Really what I need to happen is for the Prefect flow to start a data collection via an API call to our outside service and then stay is some sort of ‘running’ state until it could be signaled by our outside service (maybe via API call?) that the collection is complete. Then the subsequent task in the flow would download that collected data from S3 and trigger subsequent tasks for our ETL process. Is there any way to 'push' a notification to a Prefect task to signal it to complete? I read through the docs and came up with the code below which would be more like a polling/listening solution (if I have even done it correctly at all). Would this work? is there a better way?
    import prefect
    from prefect import task, Flow
    from prefect.engine.signals import RETRY, FAIL, PAUSE
    from datetime import datetime, timedelta
    
    
    @task
    def start_collection(timeout_interval, collection_name):
        invoke_outside_service(collection_name)
        timeout_time = datetime.now() + timedelta(seconds=timeout_interval)
        return timeout_time
    
    
    #can I use code to define max retries? ideally 'max_retries=timeout_interval//600 + 2' to retry every 5 minutes until timeout period ends
    @task(max_retries=50, retry_delay=timedelta(seconds=300))
    def check_collection_status(timeout_time, collection_name):
        status = poll_outside_service_status(collection_name)
        if status == 'complete':
            return '<s3://nitorum-data/>' + collection_name + '/' + datetime.now(
            ).strftime('%Y-%m-%d') + '.csv'
        elif status == 'failed':
            raise FAIL(message='Collection failed.')
        elif status == 'running':
            if datetime.now() > timeout_time:
                raise PAUSE(message='Collection timed out.')
            else:
                raise RETRY(message='Collection still running.')
    
    
    @task
    def load_data(path):
        data = download_data_from_s3(path)
        return data
    
    
    @task
    def transform_task(data):
        data = transform(data)
        return data
    
    
    @task
    def save_data(data):
        save_to_database(data)
    
    
    with Flow("ETL with Outside Service Data Collection") as flow:
        timeout_interval = Parameter("timeout_interval", default=3600)
        collection_name = Parameter("collection_name", default="colleciton_1")
    
        timeout_time = start_collection(timeout_interval, collection_name)
    
        path = check_collection_status(timeout_time, collection_name)
    
        data = load_data(path)
    
        data = transform_task(data)
    
        save_data(data)
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @James Brink! Just a friendly reminder to put the long code blocks in the thread to keep the main channel clean. I know what you’re going for. My suggestion is to define an inner function inside the
    check_collection_status
    . If the
    status != complete
    , recursively call the inner function until it is complete then raise
    SUCCESS
    . What do you think of this?
    No need to dynamically set the retries to some arbitrary limit. Just define the polling time for the recursion
    James Brink

    James Brink

    1 year ago
    Sorry... i dont really use slack ever... Can you give me an example of what this would look like? Just replace raise retry with a call to execute the function again recursively?
    is the raising of fails/pause like this proper for prefect?
    Kevin Kho

    Kevin Kho

    1 year ago
    @task()
    def check_collection_status(timeout_time, collection_name):
    
        def recursive_call():
            if datetime.now() > timeout_time:
                raise PAUSE(message='Collection timed out.')
    
            status = poll_outside_service_status(collection_name)
            
            if status == 'complete':
                return '<s3://nitorum-data/>' + collection_name + '/' + datetime.now(
                ).strftime('%Y-%m-%d') + '.csv'
            elif status == 'failed':
                raise FAIL(message='Collection failed.')
    
            time.sleep(some_interval_here)
            recursive_call()
    Yes it right! and you get your messages in the UI by doing so. I’m just avoiding the task retries.
    James Brink

    James Brink

    1 year ago
    ok... so the whole flow would look like the below correct? This way of polling is the only way to accomplish this correct? There is no way to 'push' to a task, right? The best we can do is listen. I moved the timeout test after the 'if complete' block so that if something completes after the timeout time the paused task could easily be restarted manually (this is something you can do in the UI right?).
    import prefect
    from prefect import task, Flow from prefect.engine.signals import RETRY, FAIL, PAUSE from datetime import datetime, timedelta @task def start_collection(timeout_interval, collection_name): invoke_outside_service(collection_name) timeout_time = datetime.now() + timedelta(seconds=timeout_interval) return timeout_time @task() def check_collection_status(timeout_time, collection_name): def recursive_call(): status = poll_outside_service_status(collection_name) if status == 'complete': return 's3://nitorum-data/' + collection_name + '/' + datetime.now( ).strftime('%Y-%m-%d') + '.csv' elif status == 'failed': raise FAIL(message='Collection failed.') else: if datetime.now() > timeout_time: raise PAUSE(message='Collection timed out.') time.sleep(3600) recursive_call() recursive_call() @task def load_data(path): data = download_data_from_s3(path) return data @task def transform_task(data): data = transform(data) return data @task def save_data(data): save_to_database(data) with Flow("ETL with Outside Service Data Collection") as flow: timeout_interval = Parameter("timeout_interval", default=3600) collection_name = Parameter("collection_name", default="colleciton_1") timeout_time = start_collection(timeout_interval, collection_name) path = check_collection_status(timeout_time, collection_name) data = load_data(path) data = transform_task(data) save_data(data)
    Kevin Kho

    Kevin Kho

    1 year ago
    Yes it can be continued from the UI
    It looks good to me
    James Brink

    James Brink

    1 year ago
    thanks for the help!