https://prefect.io logo
j

Joël Luijmes

10/30/2020, 2:55 PM
Hey, I’m evaluating both Airflow and Prefect in order to monitor and build data pipelines. As a proof of concept I want to make a sync from Postgresql -> BigQuery. With Airflow this was pretty straightforward (see attached snippet). But I’m struggeling to implement this with Prefect, any suggestions? Specifically the question is: how can I change the query based on result of previous task? Additional question: can I export all the results to jsonl format (or something similar) directory from the Postgres task? Or should I do it manually in the code? This obviously fails
Copy code
from prefect import task
from prefect import Flow
from prefect.tasks.postgres.postgres import PostgresExecute, PostgresFetch


with Flow("PostgreSQL -> BigQuery Sync") as flow:
    max_product_id = PostgresFetch(
        'manager', 
        'postgres',
        'localhost',
        15432,
        # query='SELECT MAX(id) from products'
        query='SELECT 188000'
    )

    query_products = PostgresFetch(
        'manager',
        'postgres',
        'localhost',
        15432,
        fetch='all',
        # query=f'SELECT * FROM products WHERE id > {maximum_id} LIMIT 10'
    )

    products = query_products(query=f'SELECT * FROM products WHERE id > {max_product_id} LIMIT 10')



state = flow.run()
print(state.result[max_product_id].result)
print(state.result[query_products].result)
print(state.result[products].result)
c

Chris White

10/30/2020, 3:51 PM
Hi Joel, you’re almost there - you need to remember that because Prefect allows you to exchange data between tasks, there’s a very stark difference between what’s available at build time vs. run time. In your case, I see you’re using an f-string, which Python will evaluate immediately. You instead want to defer the formatting of that string using a new task:
Copy code
@task
def query_formatter(max_id):
    return f'SELECT * FROM products WHERE id > {max_id} LIMIT 10'

# moving this initialization step outside of Flow for clarity
max_product_id = PostgresFetch(
        'manager', 
        'postgres',
        'localhost',
        15432,
        # query='SELECT MAX(id) from products'
        query='SELECT 188000'
    )

 query_products = PostgresFetch(
        'manager',
        'postgres',
        'localhost',
        15432,
        fetch='all',
        # query=f'SELECT * FROM products WHERE id > {maximum_id} LIMIT 10'
    )

with Flow("PostgreSQL -> BigQuery Sync") as flow:
   query = query_formatter(max_product_id)
   products = query_products(query=query)
d

Dylan

10/30/2020, 3:52 PM
Hi @Joël Luijmes! Welcome! If you’d like to use the PostgresFetch task from the Task Library, you could introduce an intermediate task to form the query string with the result of the previous task .
Copy code
from prefect import task
from prefect import Flow
from prefect.tasks.postgres.postgres import PostgresExecute, PostgresFetch

@task
def form_query_string(max_product_id):
  return f'SELECT * FROM products WHERE id > {max_product_id} LIMIT 10'


with Flow("PostgreSQL -> BigQuery Sync") as flow:
    max_product_id = PostgresFetch(
        'manager', 
        'postgres',
        'localhost',
        15432,
        # query='SELECT MAX(id) from products'
        query='SELECT 188000'
    )

    query = form_query_string(max_product_id)

    query_products = PostgresFetch(
        'manager',
        'postgres',
        'localhost',
        15432,
        fetch='all',
        # query=query
    )

    products = query_products(query=f'SELECT * FROM products WHERE id > {max_product_id} LIMIT 10')





state = flow.run()
print(state.result[max_product_id].result)
print(state.result[query_products].result)
print(state.result[products].result)
I didn’t run the above but that’s the general gist
z

Zanie

10/30/2020, 3:53 PM
Because I think it’s funny that all 3 of us modified your code at the same time, here’s my stab at it as well 🙂 (which runs)
Copy code
from typing import Any
from prefect import task, Flow, Task
from prefect.utilities.tasks import defaults_from_attrs
from prefect.tasks.postgres.postgres import PostgresExecute, PostgresFetch

class FakeFetch(Task):
    """
    A fake fetching task that allows us to see the query and force a return value
    for testing
    """
    def __init__(self, query: str = None, returns: Any = None, **kwargs):
        self.query = query
        self.returns = returns
        super().__init__(**kwargs)

    @defaults_from_attrs("query", "returns")
    def run(self, query: str = None, returns: Any = None):
        print(f"QUERYING: {query}")
        return returns

@task
def create_products_query(maximum_id: int) -> str:
    return f'SELECT * FROM products WHERE id > {maximum_id} LIMIT 10'


query_products = FakeFetch(
   # set your config options for using this repeatedly
    returns = ["Fake product return"]
)

max_product_id = FakeFetch(
    query='SELECT MAX(id) from products',
    returns=18000,
)


with Flow("PostgreSQL -> BigQuery Sync") as flow:
    products = query_products(query=create_products_query(max_product_id))

state = flow.run()
print(state.result[max_product_id].result)
print(state.result[products].result)
As far as the JSON, if you use the
PrefectResult
backend we’ll serialize the result into JSON and store it: see https://docs.prefect.io/core/advanced_tutorials/using-results.html — if you’re just trying to pass JSON downstream it may make sense to convert it to JSON within your task.
j

Joël Luijmes

10/31/2020, 8:00 AM
Haha amazing to get 3 immediate replies, thanks all! 😄
there’s a very stark difference between what’s available at build time vs. run time
Yes that definitely makes sense, I kinda got to your example on one of my tries, but then failed to pass the result of that new task into Postgres.
Okay greaat, I’ll take a look at the results thing. Is that the preferred method or what’s conventional? I basically want the results of Postgres on a bucket, such that I can import the data to BigQuery. The alternative is probably writing a local file, and uploading that to GCS manually, I suppose?
d

Dylan

10/31/2020, 5:53 PM
@Joël Luijmes that’s what I do for my flows but we do have a GCS task you can use