• Aqib Fayyaz

    Aqib Fayyaz

    10 months ago
    Prefect agent that is deployed on gke cluster when deploying server using helm chart and the prefect agent that we deploy on kuberntes using this commnad
    prefect agent kubernetes install -k API_KEY | kubectl apply --namespace=my-namespace -f -
    are they both the same, like i have one deployed with prefect server on gke but i need to use prefect cloud so is one agent enough for both server and cloud or i have to deploy seperate one for cloud?
    Aqib Fayyaz
    Anna Geller
    2 replies
    Copy to Clipboard
  • ek

    ek

    10 months ago
    I’m wondering on how to remove completed prefect jobs on k8s?
    ek
    Kevin Kho
    2 replies
    Copy to Clipboard
  • Leon Kozlowski

    Leon Kozlowski

    10 months ago
    Hi all - I am wondering what the best way is to determine what agent a flow is running on while its in progress
    Leon Kozlowski
    Anna Geller
    +1
    11 replies
    Copy to Clipboard
  • Max Kolasinski

    Max Kolasinski

    10 months ago
    I’m having some trouble wrapping my head around designing a conditional flow based around Great Expectations Validation and was wondering if any one had any advice. To summarize- we have an ETL Task, after which we want to run the Validation Task. If the Validation Task sets its state as Failed we want to run a Notification Task that alerts us, but otherwise we want the subsequent tasks to continue. Where I’m getting stuck is that it seems like the routing tools that I’ve found don’t quite fit right: • Using Triggers doesn’t feel correct, because I don’t want to run my Notification Task on
    all_failed
    or
    any_failed
    - if the ETL Task fails, it shouldn’t run. What I believe I would need is something like an
    on_x_task_failed
    option- it seems like the available options are way too broad to be useful. • I then looked into some of the ideas on the Conditional Logic page, but this seems clumsy for a few reasons. I need a Task specifically to check the State of the Validation Task, and then on our Schematic View we have additional Tasks showing up for each Case Task as well as the Merge Task. All combined, it makes our Schematic look like the image below which seems crazy for what is effectively
    if x do y
    . I feel like I have to be approaching this in completely the wrong way- if anyone had any ideas or suggestions I would be extremely grateful.
    Max Kolasinski
    Kevin Kho
    6 replies
    Copy to Clipboard
  • Tom Shaffner

    Tom Shaffner

    10 months ago
    Theory question: Is it possible to have a (non-task) abstract class in which the methods are tasks? I have a flow structure that gets repeated frequently for different data sets, I'm trying to create an abstract class in which the repetitive tasks are methods with the necessary text inputs, and then the 1-2 non-repetitive methods are abstract methods in the parent class and are thus implemented in child classes. The problem though is that when I do this, the @task decorators are messing with calling the methods internally. I have to pass in "self" manually, and if I need to specify any non-data dependencies (e.g. add an upstream_tasks input for the decorator) the methods seem to break whether "self" goes before or after the decorator input. Is there a way to do this? Or will this approach just not work? Really hoping there is a way since this will enable me to MASSIVELY reduce my code replication.
    Tom Shaffner
    Kevin Kho
    2 replies
    Copy to Clipboard
  • André Bonatto

    André Bonatto

    10 months ago
    Hi, I have a list of flows that are very similar and I wanted to create a factory function in order to make things easier to maintain. The code looks very similar to this:
    from prefect import Flow, Parameter, task
    from etl import prepare, load
    from crawlers import crawler1, crawler2
    from typing import Dict, Callable, String
    prepare = task(prepare)
    load = task(load)
    def make_standard_pipeline(flow_name: String, func :Callable, func_params:Dict):
        
    with Flow(flow_name) as flow:
            
    params = {k: Parameter(k, default = v) for k,v in func_params.items()}
            
    df = func(**params)
            
    df = prepare(df)
            
    df = load(df)
        
    return flow
    pipe1 = make_standard_pipeline('flow1', task(crawler1), {})
    pipe2 = make_standard_pipeline('flow2', task(crawler2), {'type' : 'xxx'})
    Locally this code runs fine and I can also register these on the prefect server. However, when I try to run the flows, only the first flow defined in the file runs successfully (I tested reordering the flows). For the other flows, I get Key Error saying it couldn't found task slug crawler2. Does anyone has hints on what could be causing this problem? Thank you.
    André Bonatto
    Anna Geller
    2 replies
    Copy to Clipboard
  • Chen Di

    Chen Di

    10 months ago
    Hi,I'm new to Prefect, but I'm using airflow for a while, we got an issue in airflow, so I wonder if Prefect could handle it better than airflow. We tried to run 40,000 tasks (very simple tasks, just print commands, with 1000 dags, 10 dag runs per dag, 4 task per dag run. ) at same time, we found airflow took a very long time to finish them (about 1 hours) , our server resource is good enough, it looks like scheduler is the bottleneck, so I wonder would Prefect be better than airflow to handle many tasks at same time. What's the max amount of task run at the same time you have seen? How long Prefect takes to finish them? Thanks
    Chen Di
    1 replies
    Copy to Clipboard
  • Manuel Gomes

    Manuel Gomes

    10 months ago
    Hi everyone, first question, please be gentle 🙂 I am sending a parameter called video to my "Process video" workflow. The first task in this workflow is to upload the video to an s3 bucket. The upload task is transcribed in the thread: My main problem seems to be that s3 requires a string filename, but my video is of the type <Parameter: video>, as illustrated by the error message
    ValueError: Filename must be a string
    So there is clearly some sort of.. unwrapping/unpacking that I'm missing? my flow is likewise in the thread, as is the invocation. So... would someone please tell me in which exact angle I should smack my forehead and the correct octave of the "duh!"... plus maybe what I should be doing instead?
    Manuel Gomes
    Jenny
    10 replies
    Copy to Clipboard
  • Wilhelm Su

    Wilhelm Su

    10 months ago
    Hello everyone! I'm new here. I'm handling a new DE role, and after trying the others (Airflow, Luigi, Dagster) I chose prefect for being that weird characteristic of being the most fun to use. It's absolutely amazing. Would anyone know how to capture the exact full python traceback when a task fails? I've set up state_handlers but the State.message only captures an abbreviated form of the error (e.g. ZeroDivisionError) . Thanks!
    Wilhelm Su
    Kevin Kho
    +1
    7 replies
    Copy to Clipboard
  • Anh Nguyen

    Anh Nguyen

    10 months ago
    Hi all, i wanna create a api, that use to call to execute a task in prefect. How to do that? thanks all
    Anh Nguyen
    Anna Geller
    13 replies
    Copy to Clipboard