Joël Luijmes
10/30/2020, 2:55 PMfrom prefect import task
from prefect import Flow
from prefect.tasks.postgres.postgres import PostgresExecute, PostgresFetch
with Flow("PostgreSQL -> BigQuery Sync") as flow:
max_product_id = PostgresFetch(
'manager',
'postgres',
'localhost',
15432,
# query='SELECT MAX(id) from products'
query='SELECT 188000'
)
query_products = PostgresFetch(
'manager',
'postgres',
'localhost',
15432,
fetch='all',
# query=f'SELECT * FROM products WHERE id > {maximum_id} LIMIT 10'
)
products = query_products(query=f'SELECT * FROM products WHERE id > {max_product_id} LIMIT 10')
state = flow.run()
print(state.result[max_product_id].result)
print(state.result[query_products].result)
print(state.result[products].result)
Chris White
@task
def query_formatter(max_id):
return f'SELECT * FROM products WHERE id > {max_id} LIMIT 10'
# moving this initialization step outside of Flow for clarity
max_product_id = PostgresFetch(
'manager',
'postgres',
'localhost',
15432,
# query='SELECT MAX(id) from products'
query='SELECT 188000'
)
query_products = PostgresFetch(
'manager',
'postgres',
'localhost',
15432,
fetch='all',
# query=f'SELECT * FROM products WHERE id > {maximum_id} LIMIT 10'
)
with Flow("PostgreSQL -> BigQuery Sync") as flow:
query = query_formatter(max_product_id)
products = query_products(query=query)
Dylan
from prefect import task
from prefect import Flow
from prefect.tasks.postgres.postgres import PostgresExecute, PostgresFetch
@task
def form_query_string(max_product_id):
return f'SELECT * FROM products WHERE id > {max_product_id} LIMIT 10'
with Flow("PostgreSQL -> BigQuery Sync") as flow:
max_product_id = PostgresFetch(
'manager',
'postgres',
'localhost',
15432,
# query='SELECT MAX(id) from products'
query='SELECT 188000'
)
query = form_query_string(max_product_id)
query_products = PostgresFetch(
'manager',
'postgres',
'localhost',
15432,
fetch='all',
# query=query
)
products = query_products(query=f'SELECT * FROM products WHERE id > {max_product_id} LIMIT 10')
state = flow.run()
print(state.result[max_product_id].result)
print(state.result[query_products].result)
print(state.result[products].result)
Zanie
from typing import Any
from prefect import task, Flow, Task
from prefect.utilities.tasks import defaults_from_attrs
from prefect.tasks.postgres.postgres import PostgresExecute, PostgresFetch
class FakeFetch(Task):
"""
A fake fetching task that allows us to see the query and force a return value
for testing
"""
def __init__(self, query: str = None, returns: Any = None, **kwargs):
self.query = query
self.returns = returns
super().__init__(**kwargs)
@defaults_from_attrs("query", "returns")
def run(self, query: str = None, returns: Any = None):
print(f"QUERYING: {query}")
return returns
@task
def create_products_query(maximum_id: int) -> str:
return f'SELECT * FROM products WHERE id > {maximum_id} LIMIT 10'
query_products = FakeFetch(
# set your config options for using this repeatedly
returns = ["Fake product return"]
)
max_product_id = FakeFetch(
query='SELECT MAX(id) from products',
returns=18000,
)
with Flow("PostgreSQL -> BigQuery Sync") as flow:
products = query_products(query=create_products_query(max_product_id))
state = flow.run()
print(state.result[max_product_id].result)
print(state.result[products].result)
PrefectResult
backend we’ll serialize the result into JSON and store it: see https://docs.prefect.io/core/advanced_tutorials/using-results.html — if you’re just trying to pass JSON downstream it may make sense to convert it to JSON within your task.Joël Luijmes
10/31/2020, 8:00 AMthere’s a very stark difference between what’s available at build time vs. run timeYes that definitely makes sense, I kinda got to your example on one of my tries, but then failed to pass the result of that new task into Postgres.
Dylan