Carl
02/24/2021, 12:20 PMPostgresExecuteMany
task but keep running into a “ValueError(’Could not infer an active Flow context.’)” error. Any ideas what I’m doing wrong here? Thx
from prefect import task, Flow
import prefect.tasks.postgres.postgres as pg
DB_NAME = 'blah'
DB_USER = 'blah'
DB_HOST = 'localhost'
sql_insert = pg.PostgresExecuteMany(db_name=DB_NAME, user=DB_USER, host=DB_HOST)
@task()
def extract_data():
vals = ['aaa', 'bbb']
return vals
@task()
def load(data):
insert_stmt = """INSERT INTO "table_a" ("COL_A", "COL_B") VALUES (%s, %s)"""
ret = sql_insert(query=insert_stmt, data=data, commit=True)
return ret
def build_flow():
with Flow('Test ETL') as f:
data = extract_data()
load(data)
return f
flow = build_flow()
flow.run()
Amanda Wee
02/24/2021, 12:29 PM@task()
decorator from load
so that it becomes a function that invokes a prefect task?Carl
02/24/2021, 12:41 PMAmanda Wee
02/24/2021, 12:56 PM