Venkat Reddy
08/08/2024, 1:30 PMMarvin
08/08/2024, 1:56 PMbash
docker run -d --name prefect-postgres \
-v prefectdb:/var/lib/postgresql/data \
-p 5432:5432 \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=yourTopSecretPassword \
-e POSTGRES_DB=prefect postgres:latest
#### 3. Using SqlAlchemyConnector
in Prefect
Here's a complete example of how to use the SqlAlchemyConnector
to interact with your PostgreSQL database:
python
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
@task
def setup_table(block_name: str) -> None:
with SqlAlchemyConnector.load(block_name) as connector:
connector.execute("CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);")
connector.execute("INSERT INTO customers (name, address) VALUES (:name, :address);", parameters={"name": "Marvin", "address": "Highway 42"})
connector.execute_many("INSERT INTO customers (name, address) VALUES (:name, :address);", seq_of_parameters=[{"name": "Ford", "address": "Highway 42"}, {"name": "Unknown", "address": "Highway 42"}])
@task
def fetch_data(block_name: str) -> list:
all_rows = []
with SqlAlchemyConnector.load(block_name) as connector:
while True:
new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
if len(new_rows) == 0:
break
all_rows.append(new_rows)
return all_rows
@flow
def sqlalchemy_flow(block_name: str) -> list:
setup_table(block_name)
all_rows = fetch_data(block_name)
return all
if __name__ == "__main__":
sqlalchemy_flow("YOUR_BLOCK_NAME")
1. Create and Configure Blocks:
Create a block in the Prefect UI with your PostgreSQL database credentials.
2. Run the Flow:
Replace YOUR_BLOCK_NAME
with the name of the block you've created in the Prefect UI.
For further details, refer to the Prefect documentation:
* Connecting to PostgreSQL
* Prefect SQLAlchemy Integration
Feel free to reach out if you have more questions!Venkat Reddy
08/08/2024, 2:02 PMMarvin
08/08/2024, 2:02 PMbash
docker run -d --name prefect-postgres \
-v prefectdb:/var/lib/postgresql/data \
-p 5432:5432 \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=yourTopSecretPassword \
-e POSTGRES_DB=prefect postgres:latest
2. Set Up Prefect to Use Your PostgreSQL Database:
Configure your Prefect profile to use the PostgreSQL database:
bash
prefect config set PREFECT_API_DATABASE_CONNECTION_URL="<postgresql+asyncpg://postgres:yourTopSecretPassword@localhost:5432/prefect>"
#### Step 2: Create a Database Block in Prefect UI
1. Log in to Prefect Cloud:
Go to Prefect Cloud and log in to your account.
2. Create a New Block:
- Navigate to the Blocks section.
- Create a new SQLAlchemy block.
- Provide the connection details for your PostgreSQL database:
<postgres+asyncpg://postgres:yourTopSecretPassword@localhost:5432/your_database_name>
- Save the block with an identifiable name, such as my-postgres-block
.
#### Step 3: Write Your Prefect Flow
Here's a detailed example of how to set up your pipeline using Prefect and SQLAlchemy:
python
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
@task
def setup_source_table(block_name: str) -> None:
with SqlAlchemyConnector.load(block_name) as connector:
connector.execute("CREATE TABLE IF NOT EXISTS source_table (id SERIAL PRIMARY KEY, name VARCHAR);")
connector.execute("INSERT INTO source_table (name) VALUES ('item1'), ('item2'), ('item3');")
@task
def setup_destination_table(block_name: str) -> None:
with SqlAlchemyConnector.load(block_name) as connector:
connector.execute("CREATE TABLE IF NOT EXISTS destination_table (id SERIAL PRIMARY KEY, name VARCHAR);")
@task
def transfer_data(block_name: str) -> list:
with SqlAlchemyConnector.load(block_name) as connector:
data = connector.fetch_all("SELECT * FROM source_table;")
for row in data:
connector.execute("INSERT INTO destination_table (name) VALUES (:name);", parameters={"name": row['name']})
return data
@flow
def postgres_pipeline(block_name: str):
setup_source_table(block_name)
setup_destination_table(block_name)
transferred_data = transfer_data(block_name)
print(f"Data transferred: {transferred_data}")
if __name__ == "__main__":
postgres_pipeline("my-postgres-block")
### Step 4: Run Your Flow
1. Run the Flow Locally:
Execute the script as a standard Python script to ensure everything works:
bash
python your_script_name.py
2. Deploy the Flow to Prefect Cloud:
USe the following command to deploy to Prefect Cloud:
python
from prefect.deployments import Deployment
deployment = Deployment.build_from_flow(
flow=postgres_pipeline,
name='postgres-pipeline-deployment'
)
deployment.apply()
### Reference Documentation
Check out these links for more detailed instructions:
- Prefect SQLAlchemy Integration
- Host Quickstart Guide
Feel free to consult the Prefect documentation or reach out for further assistance!Venkat Reddy
08/08/2024, 2:04 PMMarvin
08/08/2024, 2:04 PMVenkat Reddy
08/19/2024, 2:47 PMMarvin
08/19/2024, 2:47 PMVenkat Reddy
08/19/2024, 2:50 PMVenkat Reddy
08/19/2024, 2:53 PMVenkat Reddy
08/19/2024, 2:54 PMVenkat Reddy
08/21/2024, 9:32 AMMarvin
08/21/2024, 9:32 AMVenkat Reddy
08/29/2024, 9:02 AMMarvin
08/29/2024, 9:02 AMVenkat Reddy
08/29/2024, 9:05 AMVenkat Reddy
08/29/2024, 11:11 AMVenkat Reddy
08/29/2024, 1:57 PMVenkat Reddy
09/04/2024, 11:16 AM