Hey Guys, Can someone help with a ValueError("Coul...
# ask-community
c
Hey Guys, Can someone help with a ValueError("Could not infer an active Flow context.") I have a few sql statements that need to be run in order. So im using for to loop through each sql statement. Here is a quick snippet of for loop. I can share more also!
Copy code
POINTER = None
for statement in sql_list:

    task = BillingIcebergTask(statement=statement)

    # Pointer represents the previously set task
    if not POINTER:
        flow.add_task(task)
    else:
        flow.set_dependencies(task=task, upstream_tasks=[POINTER])

    # Set pointer
    POINTER = task
n
Hi @curtis smiley - is
BillingIcebergTask
a class that you're instantiating first? Or is it a decorated task?
c
It is a class thats instantiated
n
Ok cool - would you mind sharing the rest of the code?
c
Sure. What's. the best way to share that with you?
n
If you'd prefer not to share it in the public thread, feel free to DM it to me (we'll keep the conversation in this thread though)
c
Copy code
from datetime import date
from dotenv import load_dotenv
from prefect import Flow
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from src.create_monthly_dates import get_monthly_dates, format_dates
from src.create_quarterly_dates import get_quarterly_dates
from src.tasks import BillingIcebergTask

load_dotenv()

today_Date = date.today()

monthly = format_dates(get_monthly_dates, today_Date, "monthly", True, True)

sql_list = [
    """SELECT DATE_TRUNC(MONTH, PAYMENT_ON_LOCAL) AS PAYMENT_ON_LOCAL, COUNT(1) AS CNT
    FROM BILLING.CONTRACT_FEE_RECKONING
    GROUP BY DATE_TRUNC(MONTH, PAYMENT_ON_LOCAL) ORDER BY 1 DESC;""",
    """SELECT COUNT(1) AS CNT FROM BILLING.CONTRACT_FEE_RECKONING
    WHERE PAYMENT_ON_LOCAL >= '{}'
    AND PAYMENT_ON_LOCAL <  '{}';""".format(
        monthly[0], monthly[1]
    ),
    """BEGIN NAME JWL_TRX;""",
    """SELECT CURRENT_TRANSACTION();""",
    """DELETE FROM BILLING.CONTRACT_FEE_RECKONING WHERE PAYMENT_ON_LOCAL >= '{}'
    AND PAYMENT_ON_LOCAL <  '{}';""".format(
        monthly[0], monthly[1]
    ),
    """COMMIT;""",
    """SELECT SUBSTR(INVOICE_YYYYMM, 1, 6) AS BILL_MONTH, SCHEDULED, COUNT(1) AS CNT
    GROUP BY SUBSTR(INVOICE_YYYYMM, 1, 6), SCHEDULED ORDER BY 1 DESC, 2;""",
    """SELECT COUNT(1) AS CNT FROM BILLING.ONDEMAND_INVOICE
    WHERE SUBSTR(INVOICE_YYYYMM, 1, 6) = '{}'
    AND SCHEDULED = 'Monthly';""".format(
        monthly_no_day[0]
    )
#... More Sql Statments
]

# Instantiate `prefect.Flow` object
flow = Flow(name="Billing Daily Run")

# flow.schedule = Schedule(clocks=[CronClock("30 * * * *")])

POINTER = None
for statement in sql_list:

    task = BillingIcebergTask(statement=statement)

    # Pointer represents the previously set task
    if not POINTER:
        flow.add_task(task)
    else:
        flow.set_dependencies(task=task, upstream_tasks=[POINTER])

    # Set pointer
    POINTER = task
and here is what the BillingIcebergTask code is
Copy code
import os
from datetime import timedelta

from prefect import Task
from prefect.tasks.snowflake import SnowflakeQuery


class BillingIcebergTask(Task):
    """Reusable Task class for Iceberg statements.

    Attributes
    ----------
    statement : `str`
        Query to use for billing update.
    """

    def __init__(self, statement: str):
        """Initialize a billing Iceberg task.

        Parameters
        ----------
        statement : `str`
            Defintion of billing procedure step.
        """
        self.statement = statement
        super().__init__(
            name=statement, max_retries=3, retry_delay=timedelta(seconds=30)
        )

    def run(self):  # pragma: no cover
        """Runs a billing update step."""

        connection = SnowflakeQuery(
            account=os.environ["SNOWFLAKE_ACCOUNT"],
            user=os.environ["SNOWFLAKE_USERNAME"],
            password=os.environ["SNOWFLAKE_PASSWORD"],
            database=os.environ["SNOWFLAKE_DATABASE"],
            schema=os.environ["SNOWFLAKE_SCHEMA"],
            warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
        )

        try:
            connection(query=self.statement)
        except Exception as some_error:
            raise some_error
Hey i figured it out
so the billingicebergtask was calling prefect built in task and i just consolidated it to only. use Snowflakequery instead
n
Ahhh ok that makes sense!
Glad you got it sorted 🙂
👍 1