Jeff Kehler
05/23/2022, 2:57 AMBigQueryTask
from prefect.tasks.gcp
. I want to be able to pass as Parameter
value into this task but I am unable to do so. It appears the request being sent to BigQuery contains the following which generates a JSON error
{'value': <Parameter: min_date>}
The above value is not valid JSON so therefore explains why the Google API won't accept this request.Kevin Kho
min_date
is a Parameter (and a Task) so it comes it as runtime, but the usage is evaluated during build time because you pass it to the init
of BigQueryTask
.
The BigQueryTask
has an init and a run method. The run is the deferred method. You are passing a parameter to the init, which is evaluated during build time.
Instead, you want to do something like:
query_task = BigQueryTask()(
query=sql,
query_params=[("min_date", "DATE", min_date)],
project=prefect.context.get("BQ_PROJECT_ID"),
location=prefect.context.get("BQ_LOCATION")
)
because the first parenthesis is the init
and the second is the run
.
A couple of other things that need to be changed:
default=pendulum.now(tz="UTC").subtract(hours=24).to_date_string()
will be evaluated during registration time and then fixed. The Parameter default can’t be a callable. Instead, it needs to be something like a string “now”, and then you can run the callable inside a task
You shouldn’t need flow.add_task(min_date)
or flow.add_task(query_task)
Calling them in the flow block will work.
The with
snippet is executed once during registration. Might be fine here. I’m not sure on the intent.Jeff Kehler
05/23/2022, 3:12 AMimport prefect
from prefect import Flow
from prefect.tasks.gcp import BigQueryTask
from prefect.core.parameter import Parameter
import pendulum
import os
with Flow("Refresh dim_users") as flow:
min_date = Parameter(
"min_date",
default=pendulum.now(tz="UTC").subtract(hours=24).to_date_string(),
required=False,
)
flow.add_task(min_date)
with open(os.path.join(os.path.dirname(__file__), "sql/dim_users.sql"), "r") as f:
sql = f.read()
query_task = BigQueryTask(
query=sql,
query_params=[("min_date", "DATE", min_date)],
project=prefect.context.get("BQ_PROJECT_ID"),
location=prefect.context.get("BQ_LOCATION")
)
query_task.set_upstream(task=min_date)
flow.add_task(query_task)
Note how I am passing min_date
into the BigQueryTask
task. How can I resolve this issue?Kevin Kho
@task
def get_min_date(x):
if x == "now":
return pendulum.now(tz="UTC").subtract(hours=24).to_date_string()
query_task = BigQueryTask()
@task
def get_sql():
with open(os.path.join(os.path.dirname(__file__), "sql/dim_users.sql"), "r") as f:
sql = f.read()
return sql
with Flow("Refresh dim_users") as flow:
min_date = Parameter(
"min_date",
default="now",
required=False,
)
min_date2 = get_min_date(min_date)
query_task(
query=sql,
query_params=[("min_date", "DATE", min_date2)],
project=prefect.context.get("BQ_PROJECT_ID"),
location=prefect.context.get("BQ_LOCATION")
)
Jeff Kehler
05/23/2022, 3:14 AMKevin Kho
Jeff Kehler
05/23/2022, 3:15 AMmin_date
then
@task
def resolve_min_date(min_date):
if min_date is None:
return pendulum.now(tz="UTC").subtract(hours=24).to_date_string()
return min_date
Kevin Kho
Jeff Kehler
05/23/2022, 3:17 AMKevin Kho
prefect.context.get("BQ_PROJECT_ID")
. If using Prefect Cloud, I think it’s hard to inject context. I would suggest Parameterizing those too honestlyKevin Kho