c

    curtis smiley

    1 year ago
    Hey Guys, Can someone help with a ValueError("Could not infer an active Flow context.") I have a few sql statements that need to be run in order. So im using for to loop through each sql statement. Here is a quick snippet of for loop. I can share more also!
    POINTER = None
    for statement in sql_list:
    
        task = BillingIcebergTask(statement=statement)
    
        # Pointer represents the previously set task
        if not POINTER:
            flow.add_task(task)
        else:
            flow.set_dependencies(task=task, upstream_tasks=[POINTER])
    
        # Set pointer
        POINTER = task
    nicholas

    nicholas

    1 year ago
    Hi @curtis smiley - is
    BillingIcebergTask
    a class that you're instantiating first? Or is it a decorated task?
    c

    curtis smiley

    1 year ago
    It is a class thats instantiated
    nicholas

    nicholas

    1 year ago
    Ok cool - would you mind sharing the rest of the code?
    c

    curtis smiley

    1 year ago
    Sure. What's. the best way to share that with you?
    nicholas

    nicholas

    1 year ago
    If you'd prefer not to share it in the public thread, feel free to DM it to me (we'll keep the conversation in this thread though)
    c

    curtis smiley

    1 year ago
    from datetime import date
    from dotenv import load_dotenv
    from prefect import Flow
    from prefect.schedules import Schedule
    from prefect.schedules.clocks import CronClock
    from src.create_monthly_dates import get_monthly_dates, format_dates
    from src.create_quarterly_dates import get_quarterly_dates
    from src.tasks import BillingIcebergTask
    
    load_dotenv()
    
    today_Date = date.today()
    
    monthly = format_dates(get_monthly_dates, today_Date, "monthly", True, True)
    
    sql_list = [
        """SELECT DATE_TRUNC(MONTH, PAYMENT_ON_LOCAL) AS PAYMENT_ON_LOCAL, COUNT(1) AS CNT
        FROM BILLING.CONTRACT_FEE_RECKONING
        GROUP BY DATE_TRUNC(MONTH, PAYMENT_ON_LOCAL) ORDER BY 1 DESC;""",
        """SELECT COUNT(1) AS CNT FROM BILLING.CONTRACT_FEE_RECKONING
        WHERE PAYMENT_ON_LOCAL >= '{}'
        AND PAYMENT_ON_LOCAL <  '{}';""".format(
            monthly[0], monthly[1]
        ),
        """BEGIN NAME JWL_TRX;""",
        """SELECT CURRENT_TRANSACTION();""",
        """DELETE FROM BILLING.CONTRACT_FEE_RECKONING WHERE PAYMENT_ON_LOCAL >= '{}'
        AND PAYMENT_ON_LOCAL <  '{}';""".format(
            monthly[0], monthly[1]
        ),
        """COMMIT;""",
        """SELECT SUBSTR(INVOICE_YYYYMM, 1, 6) AS BILL_MONTH, SCHEDULED, COUNT(1) AS CNT
        GROUP BY SUBSTR(INVOICE_YYYYMM, 1, 6), SCHEDULED ORDER BY 1 DESC, 2;""",
        """SELECT COUNT(1) AS CNT FROM BILLING.ONDEMAND_INVOICE
        WHERE SUBSTR(INVOICE_YYYYMM, 1, 6) = '{}'
        AND SCHEDULED = 'Monthly';""".format(
            monthly_no_day[0]
        )
    #... More Sql Statments
    ]
    
    # Instantiate `prefect.Flow` object
    flow = Flow(name="Billing Daily Run")
    
    # flow.schedule = Schedule(clocks=[CronClock("30 * * * *")])
    
    POINTER = None
    for statement in sql_list:
    
        task = BillingIcebergTask(statement=statement)
    
        # Pointer represents the previously set task
        if not POINTER:
            flow.add_task(task)
        else:
            flow.set_dependencies(task=task, upstream_tasks=[POINTER])
    
        # Set pointer
        POINTER = task
    and here is what the BillingIcebergTask code is
    import os
    from datetime import timedelta
    
    from prefect import Task
    from prefect.tasks.snowflake import SnowflakeQuery
    
    
    class BillingIcebergTask(Task):
        """Reusable Task class for Iceberg statements.
    
        Attributes
        ----------
        statement : `str`
            Query to use for billing update.
        """
    
        def __init__(self, statement: str):
            """Initialize a billing Iceberg task.
    
            Parameters
            ----------
            statement : `str`
                Defintion of billing procedure step.
            """
            self.statement = statement
            super().__init__(
                name=statement, max_retries=3, retry_delay=timedelta(seconds=30)
            )
    
        def run(self):  # pragma: no cover
            """Runs a billing update step."""
    
            connection = SnowflakeQuery(
                account=os.environ["SNOWFLAKE_ACCOUNT"],
                user=os.environ["SNOWFLAKE_USERNAME"],
                password=os.environ["SNOWFLAKE_PASSWORD"],
                database=os.environ["SNOWFLAKE_DATABASE"],
                schema=os.environ["SNOWFLAKE_SCHEMA"],
                warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
            )
    
            try:
                connection(query=self.statement)
            except Exception as some_error:
                raise some_error
    Hey i figured it out
    so the billingicebergtask was calling prefect built in task and i just consolidated it to only. use Snowflakequery instead
    nicholas

    nicholas

    1 year ago
    Ahhh ok that makes sense!
    Glad you got it sorted 🙂