https://prefect.io logo
i

iñigo

02/18/2022, 1:57 PM
Hello, I've been working with prefect the last months to see if it is able to replace some of our scripts I'm having an issue with the use of a prefect task built in. Postgres. The flow that I'm trying to achieve it's pretty simple, we just need to execute a query to gather some data ND then transform it and send it to another db. I'm not having a problem using the built in task but trying to connect with the rest of them as they don't appear in the UI as task... Thank you
a

Anna Geller

02/18/2022, 2:10 PM
Can you explain a problem that you are facing a bit more? In case you just need an example, here is one:
Copy code
from prefect import Flow, Parameter, task
from prefect.tasks.postgres import PostgresExecute, PostgresExecuteMany, PostgresFetch
from prefect.tasks.secrets import PrefectSecret


postgres_fetch_task = PostgresFetch(
    db_name="postgres",
    user="postgres",
    host="localhost",
    port=5432,
    commit=True,
    fetch="all",
)


@task
def define_query_from_param(table_name: str):
    return f"SELECT * FROM {table_name};"


@task(log_stdout=True)
def print_data(x):
    print(x)  # prints a list of tuples


with Flow("postgres_example") as flow:
    postgres_pwd = PrefectSecret("POSTGRES_PASSWORD")
    table_name = Parameter("table_name", default="stage.customers")
    query = define_query_from_param(table_name)
    final_data = postgres_fetch_task(password=postgres_pwd, query=query)
    print_data(final_data)

if __name__ == "__main__":
    flow.run()
🙌 1
i

iñigo

02/18/2022, 7:02 PM
Thank you very much @Anna Geller. I feel a bit dumb because it was a stupid error that I wasn't able to see it. With your example now I see it clear and it helped me to fix my code. After this connection and data gathering I always have a doubt on how to treat the data. I always use Pandas as working with some large dataframes is easier. What I do with the dataframe when I gather all the data if I want to update the values in a db I do df.iterrows() and one by one in a task I do check the value in the DB and update it in the cases needed, but I've seen that you have the loop function and way to do it but I'm not sure if it is more efficient or no... thanks in advance and thank a lot for your example. PD: I hope it is well explained
a

Anna Geller

02/18/2022, 7:17 PM
if you want to use Pandas, it’s even easier.
when you ask what is more efficient to fetch a large amount of data, it’s certainly the pandas option, as it has optimizations allowing to grab the data in chunks
would be great to understand what is your specific question and what are you struggling with here. Is your question answered now? 🙂
i

iñigo

02/18/2022, 7:31 PM
Thanks a lot.
In fact probably I'm mixing 2 ideas, the postgres task issue is resolved 🙂 in the other hand I'm asking about the way loops are managed in prefect. In fact what I do is gather data from the query results and pass it to a Dataframe, making the col_names the df.columns and the data is the list of tuples from the query.
But then I want to know what is more efficient to go through a DataFrame, the df.iterrows() function with a normal loop FOR inside a task, or use the method Loop from within prefect.
I'm not sure if i'm mixing some things
a

Anna Geller

02/18/2022, 7:45 PM
I’d say you can use df.iterrows() 🙂 task looping is a bit another use case
i

iñigo

02/18/2022, 7:59 PM
ok, thanks a lot. I was wondering the use of docker agents but in the examples i don't find it easy to understand. What do I need to deploy a container with the python libraries and the command of docker agent launch? thanks and apologize for the many questions... is there an example on how to do with a docker agent?