• Zach Schumacher

    Zach Schumacher

    1 year ago
    Hey guys! Have a kubernetes run question. I have a use case where I need to dynamically set a kubernetes run config. I want to find out what the latest tag is of a docker image, and then use that same docker image for each task in a flow. I don’t know what the latest tag is at registration time, only at run time. Using :latest does not work, because it is not impossible :latest could change between the first task in a flow and the last task in a flow. Is there a recipe for doing this?
    Zach Schumacher
    Kevin Kho
    18 replies
    Copy to Clipboard
  • Robin

    Robin

    1 year ago
    Another question concerning result caching. Let's say we run a flow and there are some failures caused by bugs. Assuming that these bugs only affect the tasks that failed, we would want to fix those bugs locally, reregister the flow and rerun only those tasks that have not been successful before. So I configured the task results to be cached based on all inputs for 48 h as follows:
    from prefect.engine.cache_validators import all
    
    version = "0_1_17"
    
    s3_result_battery_id = S3Result(
        bucket="accure-prefect-results",
        location=f"{{flow_name}}/{version}{{task_name}}/{{battery_id}}.prefect",
    )
    
    import datetime
    
    
    @task(
        result=s3_result_battery_id,
        # max_retries=max_retry,
        # retry_delay=retry_delay,
        # timeout=300,
        log_stdout=True,
        tags=["analysis", "task"],
        cache_validator=prefect.engine.cache_validators.all_parameters,
        cache_for=datetime.timedelta(hours=48),
    )
    However, it seems like the failed task runs aren't rerun either, as the inputs have not changed and therefore the results are read from the cache. How to explicitly tell prefect to rerun those tasks, which have previously failed? Or am I missing something? Bonus: • Is that actually a good philosophy? • I thought of introducing the
    package release version (simply "version" in above code snippet)
    into the mix ◦ so that one can control with the package release version when to rerun all tasks ◦ e.g. due to some feature enhancements in an entirely new flow run, as opposed to just some bugfixes within the same flow release • note:
    package release version != prefect flow version
    • In general we are not yet quite sure how to best handle versioning and staging with prefect 🤔
    Robin
    Kevin Kho
    29 replies
    Copy to Clipboard
  • Dimosthenis Schizas

    Dimosthenis Schizas

    1 year ago
    Hey everyone, I have a probably noob question. I have created a fairly easy flow with 3 steps, that does some calculations. The execution usually takes 20-30 mins. When I'm trying to register the flow it seems that prefect evaluates it and start calculating. I haven't left it run, but probably after execution ends the flow will be registered, but that doesn't make any sense. Shouldn't the register just pack it and send to the server, or am I missing something here?
    Dimosthenis Schizas
    nicholas
    13 replies
    Copy to Clipboard
  • James Brink

    James Brink

    1 year ago
    Hi Everyone,
  • James Brink

    James Brink

    1 year ago
    Hi Everyone, I’m new to Prefect and trying to figure out if it would help us improve our data pipeline. Currently we use a proprietary orchestration architecture but we have the desire to move to something open-source with wide adoption. I have looked through the docs and I just have a particular situation of how our pipeline works that I am unsure how to integrate with Prefect. We have some platforms outside of our own cloud resources that collect data and has a subsequent ETL process on our own resources. In many cases these collections may take hours, and because they don’t happen inside the executer itself it seems atypical to most of the examples I have found in the docs. Really what I need to happen is for the Prefect flow to start a data collection via an API call to our outside service and then stay is some sort of ‘running’ state until it could be signaled by our outside service (maybe via API call?) that the collection is complete. Then the subsequent task in the flow would download that collected data from S3 and trigger subsequent tasks for our ETL process. Is there any way to 'push' a notification to a Prefect task to signal it to complete? I read through the docs and came up with the code below which would be more like a polling/listening solution (if I have even done it correctly at all). Would this work? is there a better way?
    import prefect
    from prefect import task, Flow
    from prefect.engine.signals import RETRY, FAIL, PAUSE
    from datetime import datetime, timedelta
    
    
    @task
    def start_collection(timeout_interval, collection_name):
        invoke_outside_service(collection_name)
        timeout_time = datetime.now() + timedelta(seconds=timeout_interval)
        return timeout_time
    
    
    #can I use code to define max retries? ideally 'max_retries=timeout_interval//600 + 2' to retry every 5 minutes until timeout period ends
    @task(max_retries=50, retry_delay=timedelta(seconds=300))
    def check_collection_status(timeout_time, collection_name):
        status = poll_outside_service_status(collection_name)
        if status == 'complete':
            return '<s3://nitorum-data/>' + collection_name + '/' + datetime.now(
            ).strftime('%Y-%m-%d') + '.csv'
        elif status == 'failed':
            raise FAIL(message='Collection failed.')
        elif status == 'running':
            if datetime.now() > timeout_time:
                raise PAUSE(message='Collection timed out.')
            else:
                raise RETRY(message='Collection still running.')
    
    
    @task
    def load_data(path):
        data = download_data_from_s3(path)
        return data
    
    
    @task
    def transform_task(data):
        data = transform(data)
        return data
    
    
    @task
    def save_data(data):
        save_to_database(data)
    
    
    with Flow("ETL with Outside Service Data Collection") as flow:
        timeout_interval = Parameter("timeout_interval", default=3600)
        collection_name = Parameter("collection_name", default="colleciton_1")
    
        timeout_time = start_collection(timeout_interval, collection_name)
    
        path = check_collection_status(timeout_time, collection_name)
    
        data = load_data(path)
    
        data = transform_task(data)
    
        save_data(data)
    James Brink
    Kevin Kho
    11 replies
    Copy to Clipboard
  • Adam Brusselback

    Adam Brusselback

    1 year ago
    Hey again, so I am obviously doing something wrong since I seem to be unable to create a flow that takes in host/port/etc for a PostgresExecute task at runtime rather than definition time since those are not available during definition time for me...
    Adam Brusselback
    Kevin Kho
    12 replies
    Copy to Clipboard
  • Adam Brusselback

    Adam Brusselback

    1 year ago
    getting this warning:
    UserWarning: A Task was passed as an argument to PostgresExecute, you likely want to first initialize PostgresExecute with any static (non-Task) arguments, then call the initialized task with any dynamic (Task) arguments instead. For example:
    
      my_task = PostgresExecute(...)  # static (non-Task) args go here
      res = my_task(...)  # dynamic (Task) args go here
    
    see <https://docs.prefect.io/core/concepts/flows.html#apis> for more info.
  • Adam Brusselback

    Adam Brusselback

    1 year ago
    And then later an error saying:
    could not translate host name "<Task: get_value>" to address: Unknown host
  • Adam Brusselback

    Adam Brusselback

    1 year ago
    My flow looks like, but I have tried multiple different ways to figure out how to do this when my stuff is only known at runtime.
    with Flow("test") as flow:
        client_name = Parameter('client_name', default='client_demo')    
        client_context = get_client_context("C:/.secrets/client/", client_name)
        db_exec = PostgresExecute(
            host=get_value(client_context, 'oltp_host'), db_name=get_value(client_context, 'oltp_database'), user=get_value(client_context, 'oltp_user'), query="SELECT do_something()"
        )
        result = db_exec(password=get_value(client_context, 'oltp_pass'))
  • bastianwegge

    bastianwegge

    1 year ago
    Hey guys, I’m currently working on a GitLab CI Pipeline that leverages the Prefect Python Client and builds and also publishes our flows as Docker images into our internal registry. In that regard I found that the environment variable documentation for
    PREFECT__CLOUD__AUTH_TOKEN
    is a bit too hidden when you’re approaching from this side. I had to dig into the Storage Registration Code (Prefect library internals) to find that the Client takes this variable (if it is specified) and uses it for registration purposes on prefect Cloud. Just a small thing to improve in the API-docs I guess that might make it a lot easier to get up and running.
    bastianwegge
    1 replies
    Copy to Clipboard