I'm having an issue using the `BigQueryTask` from ...
# ask-community
j
I'm having an issue using the
BigQueryTask
from
prefect.tasks.gcp
. I want to be able to pass as
Parameter
value into this task but I am unable to do so. It appears the request being sent to BigQuery contains the following which generates a JSON error
{'value': <Parameter: min_date>}
The above value is not valid JSON so therefore explains why the Google API won't accept this request.
k
Hey Jeff, could you move the code to the thread when you get a chance to keep the main channel a bit neater? There is a bit of confusion happening here. The Flow block constructs the DAG during registration time. Tasks are the mechanism to defer logic to runtime. What is happening is
min_date
is a Parameter (and a Task) so it comes it as runtime, but the usage is evaluated during build time because you pass it to the
init
of
BigQueryTask
. The
BigQueryTask
has an init and a run method. The run is the deferred method. You are passing a parameter to the init, which is evaluated during build time. Instead, you want to do something like:
Copy code
query_task = BigQueryTask()(
        query=sql,
        query_params=[("min_date", "DATE", min_date)],
        project=prefect.context.get("BQ_PROJECT_ID"),
        location=prefect.context.get("BQ_LOCATION")
    )
because the first parenthesis is the
init
and the second is the
run
. A couple of other things that need to be changed:
default=pendulum.now(tz="UTC").subtract(hours=24).to_date_string()
will be evaluated during registration time and then fixed. The Parameter default can’t be a callable. Instead, it needs to be something like a string “now”, and then you can run the callable inside a task You shouldn’t need
flow.add_task(min_date)
or
flow.add_task(query_task)
Calling them in the flow block will work. The
with
snippet is executed once during registration. Might be fine here. I’m not sure on the intent.
j
Below is the code for my Flow.
Copy code
import prefect
from prefect import Flow
from prefect.tasks.gcp import BigQueryTask
from prefect.core.parameter import Parameter
import pendulum
import os

with Flow("Refresh dim_users") as flow:
    min_date = Parameter(
        "min_date",
        default=pendulum.now(tz="UTC").subtract(hours=24).to_date_string(),
        required=False,
    )
    flow.add_task(min_date)

    with open(os.path.join(os.path.dirname(__file__), "sql/dim_users.sql"), "r") as f:
        sql = f.read()
    
    query_task = BigQueryTask(
        query=sql,
        query_params=[("min_date", "DATE", min_date)],
        project=prefect.context.get("BQ_PROJECT_ID"),
        location=prefect.context.get("BQ_LOCATION")
    )
    query_task.set_upstream(task=min_date)
    flow.add_task(query_task)
Note how I am passing
min_date
into the
BigQueryTask
task. How can I resolve this issue?
k
This should be a lot better:
Copy code
@task
def get_min_date(x):
    if x == "now":
        return pendulum.now(tz="UTC").subtract(hours=24).to_date_string()

query_task = BigQueryTask()

@task
def get_sql():
    with open(os.path.join(os.path.dirname(__file__), "sql/dim_users.sql"), "r") as f:
        sql = f.read()
    return sql

with Flow("Refresh dim_users") as flow:
    min_date = Parameter(
        "min_date",
        default="now",
        required=False,
    )
    min_date2 = get_min_date(min_date)
    
    query_task(
        query=sql,
        query_params=[("min_date", "DATE", min_date2)],
        project=prefect.context.get("BQ_PROJECT_ID"),
        location=prefect.context.get("BQ_LOCATION")
    )
j
Thanks @Kevin Kho I tried your suggestion and it has worked. As for the default date I think you are just suggesting to make a separate task to resolve it? The only purpose of the Parameter was to add the ability to run the job manually if some other flows had failed.
k
Yes but Parameters pass through an API call so the values come in JSON, which can’t support Python callables. So you need to handle the logic inside a task based on some simpler input
j
Would something like this work for the
min_date
then
Copy code
@task
def resolve_min_date(min_date):
    if min_date is None:
        return pendulum.now(tz="UTC").subtract(hours=24).to_date_string()
    return min_date
k
Yes that would work.
j
Thanks for your help @Kevin Kho much appreciated!
k
I don’t know about the
prefect.context.get("BQ_PROJECT_ID")
. If using Prefect Cloud, I think it’s hard to inject context. I would suggest Parameterizing those too honestly
And of course!