i

    iñigo

    7 months ago
    Hello, I've been working with prefect the last months to see if it is able to replace some of our scripts I'm having an issue with the use of a prefect task built in. Postgres. The flow that I'm trying to achieve it's pretty simple, we just need to execute a query to gather some data ND then transform it and send it to another db. I'm not having a problem using the built in task but trying to connect with the rest of them as they don't appear in the UI as task... Thank you
    Anna Geller

    Anna Geller

    7 months ago
    Can you explain a problem that you are facing a bit more? In case you just need an example, here is one:
    from prefect import Flow, Parameter, task
    from prefect.tasks.postgres import PostgresExecute, PostgresExecuteMany, PostgresFetch
    from prefect.tasks.secrets import PrefectSecret
    
    
    postgres_fetch_task = PostgresFetch(
        db_name="postgres",
        user="postgres",
        host="localhost",
        port=5432,
        commit=True,
        fetch="all",
    )
    
    
    @task
    def define_query_from_param(table_name: str):
        return f"SELECT * FROM {table_name};"
    
    
    @task(log_stdout=True)
    def print_data(x):
        print(x)  # prints a list of tuples
    
    
    with Flow("postgres_example") as flow:
        postgres_pwd = PrefectSecret("POSTGRES_PASSWORD")
        table_name = Parameter("table_name", default="stage.customers")
        query = define_query_from_param(table_name)
        final_data = postgres_fetch_task(password=postgres_pwd, query=query)
        print_data(final_data)
    
    if __name__ == "__main__":
        flow.run()
    i

    iñigo

    7 months ago
    Thank you very much @Anna Geller. I feel a bit dumb because it was a stupid error that I wasn't able to see it. With your example now I see it clear and it helped me to fix my code. After this connection and data gathering I always have a doubt on how to treat the data. I always use Pandas as working with some large dataframes is easier. What I do with the dataframe when I gather all the data if I want to update the values in a db I do df.iterrows() and one by one in a task I do check the value in the DB and update it in the cases needed, but I've seen that you have the loop function and way to do it but I'm not sure if it is more efficient or no... thanks in advance and thank a lot for your example. PD: I hope it is well explained
    Anna Geller

    Anna Geller

    7 months ago
    if you want to use Pandas, it’s even easier.
    when you ask what is more efficient to fetch a large amount of data, it’s certainly the pandas option, as it has optimizations allowing to grab the data in chunks
    would be great to understand what is your specific question and what are you struggling with here. Is your question answered now? 🙂
    i

    iñigo

    7 months ago
    Thanks a lot.
    In fact probably I'm mixing 2 ideas, the postgres task issue is resolved 🙂 in the other hand I'm asking about the way loops are managed in prefect. In fact what I do is gather data from the query results and pass it to a Dataframe, making the col_names the df.columns and the data is the list of tuples from the query.
    But then I want to know what is more efficient to go through a DataFrame, the df.iterrows() function with a normal loop FOR inside a task, or use the method Loop from within prefect.
    I'm not sure if i'm mixing some things
    Anna Geller

    Anna Geller

    7 months ago
    I’d say you can use df.iterrows() 🙂 task looping is a bit another use case
    i

    iñigo

    7 months ago
    ok, thanks a lot. I was wondering the use of docker agents but in the examples i don't find it easy to understand. What do I need to deploy a container with the python libraries and the command of docker agent launch? thanks and apologize for the many questions... is there an example on how to do with a docker agent?