Still a Prefect NooB! I am failing to find a simp...
# prefect-community
c
Still a Prefect NooB! I am failing to find a simple example of what I am trying to do with Prefect. It seems like it would be not very much code! Firstly I want to run a query in a file against a Snowflake database. Secondly, I'd like to take the data returned by the snowflake query and insert it into a postgres table. Seems simple, right?
1
...and yet, I am not sure where to begin! I've done lots of SQL in my life but am completely new to both Prefect & Python. But I like to think I can be taught! 😜
k
Just to make sure I understood correctly, you are writing the workflow in Python right?
c
Yeah I think that is the plan. Thats what Prefect is all about right? Running work flows written in Python?
k
that’s right
I would suggest that you have functions written in Python first. Then read this getting started tutorial
c
Hmmm. Yeah we are currently on 1.2.4 not sure when an upgrade will happen
b
If I can add two cents, prefect is much easier to learn than database connectors, which can be a tricky topic in their own right! Try getting started with psycopg2 for Postgres and the snowflake-connector for Snowflake.
c
Billy, thanks I appreciate the advice. But rather than diving down rabbit holes I really wish I could see a simple script that does what I described. Run a query against one database, insert the results to a table in a different database. It seems like it would be a pretty common type of task, but the simple Prefect examples I have seen are mostly just manipulating data defined in the example script rather than hitting a database as a source.
b
Respectfully, I think this request is beyond the scope of the community slack, and it's a very busy time at Prefect HQ. That being said, here is an incomplete implementation that would do something like what you have requested. The tricky part of this is going to be connecting securely to your respective databases (I've used custom classes which encapsulate my implementation). You may also run into some performance issues on the insert, if the number of records is large. It's tough to give an example simply because there are so many ways to do this, and the implementation details will be highly specific to your environment.
Copy code
import pendulum
from my_private_package import PostgresClient, SnowflakeClient, slack_failure_notification
from prefect import Flow, Parameter, task
from prefect.schedules import CronSchedule


@task
def get_data_from_snowflake(project_name, report_date):
    with SnowflakeClient() as snowflake:
        data = snowflake.execute(
            sql_statement=get_data_sql_statement,
            params={
                "project_name": project_name,
                "report_date": report_date,
            },
        )
    return data

@task
def load_data_to_postgres(data):
    with PostgresClient() as pg:
        pg.execute(sql_statement=load_data_sql_statement, values=data)


with Flow(
    "Snowflake to PostgreSQL",
    schedule=CronSchedule("0 8 * * 1-5", start_date=pendulum.now("America/New_York")),
    state_handlers=[slack_failure_notification],
) as flow:
    project_name = Parameter("project")
    report_date = Parameter("report_data")
    data = get_data_from_snowflake(project_name=project_name, report_date=report_date)
    load_data_to_postgres(data)