curtis smiley
03/12/2021, 4:02 PMPOINTER = None
for statement in sql_list:
task = BillingIcebergTask(statement=statement)
# Pointer represents the previously set task
if not POINTER:
flow.add_task(task)
else:
flow.set_dependencies(task=task, upstream_tasks=[POINTER])
# Set pointer
POINTER = task
nicholas
BillingIcebergTask
a class that you're instantiating first? Or is it a decorated task?curtis smiley
03/12/2021, 4:05 PMnicholas
curtis smiley
03/12/2021, 4:17 PMnicholas
curtis smiley
03/12/2021, 4:23 PMfrom datetime import date
from dotenv import load_dotenv
from prefect import Flow
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from src.create_monthly_dates import get_monthly_dates, format_dates
from src.create_quarterly_dates import get_quarterly_dates
from src.tasks import BillingIcebergTask
load_dotenv()
today_Date = date.today()
monthly = format_dates(get_monthly_dates, today_Date, "monthly", True, True)
sql_list = [
"""SELECT DATE_TRUNC(MONTH, PAYMENT_ON_LOCAL) AS PAYMENT_ON_LOCAL, COUNT(1) AS CNT
FROM BILLING.CONTRACT_FEE_RECKONING
GROUP BY DATE_TRUNC(MONTH, PAYMENT_ON_LOCAL) ORDER BY 1 DESC;""",
"""SELECT COUNT(1) AS CNT FROM BILLING.CONTRACT_FEE_RECKONING
WHERE PAYMENT_ON_LOCAL >= '{}'
AND PAYMENT_ON_LOCAL < '{}';""".format(
monthly[0], monthly[1]
),
"""BEGIN NAME JWL_TRX;""",
"""SELECT CURRENT_TRANSACTION();""",
"""DELETE FROM BILLING.CONTRACT_FEE_RECKONING WHERE PAYMENT_ON_LOCAL >= '{}'
AND PAYMENT_ON_LOCAL < '{}';""".format(
monthly[0], monthly[1]
),
"""COMMIT;""",
"""SELECT SUBSTR(INVOICE_YYYYMM, 1, 6) AS BILL_MONTH, SCHEDULED, COUNT(1) AS CNT
GROUP BY SUBSTR(INVOICE_YYYYMM, 1, 6), SCHEDULED ORDER BY 1 DESC, 2;""",
"""SELECT COUNT(1) AS CNT FROM BILLING.ONDEMAND_INVOICE
WHERE SUBSTR(INVOICE_YYYYMM, 1, 6) = '{}'
AND SCHEDULED = 'Monthly';""".format(
monthly_no_day[0]
)
#... More Sql Statments
]
# Instantiate `prefect.Flow` object
flow = Flow(name="Billing Daily Run")
# flow.schedule = Schedule(clocks=[CronClock("30 * * * *")])
POINTER = None
for statement in sql_list:
task = BillingIcebergTask(statement=statement)
# Pointer represents the previously set task
if not POINTER:
flow.add_task(task)
else:
flow.set_dependencies(task=task, upstream_tasks=[POINTER])
# Set pointer
POINTER = task
curtis smiley
03/12/2021, 4:25 PMcurtis smiley
03/12/2021, 4:28 PMimport os
from datetime import timedelta
from prefect import Task
from prefect.tasks.snowflake import SnowflakeQuery
class BillingIcebergTask(Task):
"""Reusable Task class for Iceberg statements.
Attributes
----------
statement : `str`
Query to use for billing update.
"""
def __init__(self, statement: str):
"""Initialize a billing Iceberg task.
Parameters
----------
statement : `str`
Defintion of billing procedure step.
"""
self.statement = statement
super().__init__(
name=statement, max_retries=3, retry_delay=timedelta(seconds=30)
)
def run(self): # pragma: no cover
"""Runs a billing update step."""
connection = SnowflakeQuery(
account=os.environ["SNOWFLAKE_ACCOUNT"],
user=os.environ["SNOWFLAKE_USERNAME"],
password=os.environ["SNOWFLAKE_PASSWORD"],
database=os.environ["SNOWFLAKE_DATABASE"],
schema=os.environ["SNOWFLAKE_SCHEMA"],
warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
)
try:
connection(query=self.statement)
except Exception as some_error:
raise some_error
curtis smiley
03/12/2021, 4:36 PMcurtis smiley
03/12/2021, 4:37 PMnicholas
nicholas