Thread
#prefect-community
    Jeff Kehler

    Jeff Kehler

    4 months ago
    I'm having an issue using the
    BigQueryTask
    from
    prefect.tasks.gcp
    . I want to be able to pass as
    Parameter
    value into this task but I am unable to do so. It appears the request being sent to BigQuery contains the following which generates a JSON error
    {'value': <Parameter: min_date>}
    The above value is not valid JSON so therefore explains why the Google API won't accept this request.
    Kevin Kho

    Kevin Kho

    4 months ago
    Hey Jeff, could you move the code to the thread when you get a chance to keep the main channel a bit neater? There is a bit of confusion happening here. The Flow block constructs the DAG during registration time. Tasks are the mechanism to defer logic to runtime. What is happening is
    min_date
    is a Parameter (and a Task) so it comes it as runtime, but the usage is evaluated during build time because you pass it to the
    init
    of
    BigQueryTask
    . The
    BigQueryTask
    has an init and a run method. The run is the deferred method. You are passing a parameter to the init, which is evaluated during build time. Instead, you want to do something like:
    query_task = BigQueryTask()(
            query=sql,
            query_params=[("min_date", "DATE", min_date)],
            project=prefect.context.get("BQ_PROJECT_ID"),
            location=prefect.context.get("BQ_LOCATION")
        )
    because the first parenthesis is the
    init
    and the second is the
    run
    . A couple of other things that need to be changed:
    default=pendulum.now(tz="UTC").subtract(hours=24).to_date_string()
    will be evaluated during registration time and then fixed. The Parameter default can’t be a callable. Instead, it needs to be something like a string “now”, and then you can run the callable inside a task You shouldn’t need
    flow.add_task(min_date)
    or
    flow.add_task(query_task)
    Calling them in the flow block will work. The
    with
    snippet is executed once during registration. Might be fine here. I’m not sure on the intent.
    Jeff Kehler

    Jeff Kehler

    4 months ago
    Below is the code for my Flow.
    import prefect
    from prefect import Flow
    from prefect.tasks.gcp import BigQueryTask
    from prefect.core.parameter import Parameter
    import pendulum
    import os
    
    with Flow("Refresh dim_users") as flow:
        min_date = Parameter(
            "min_date",
            default=pendulum.now(tz="UTC").subtract(hours=24).to_date_string(),
            required=False,
        )
        flow.add_task(min_date)
    
        with open(os.path.join(os.path.dirname(__file__), "sql/dim_users.sql"), "r") as f:
            sql = f.read()
        
        query_task = BigQueryTask(
            query=sql,
            query_params=[("min_date", "DATE", min_date)],
            project=prefect.context.get("BQ_PROJECT_ID"),
            location=prefect.context.get("BQ_LOCATION")
        )
        query_task.set_upstream(task=min_date)
        flow.add_task(query_task)
    Note how I am passing
    min_date
    into the
    BigQueryTask
    task. How can I resolve this issue?
    Kevin Kho

    Kevin Kho

    4 months ago
    This should be a lot better:
    @task
    def get_min_date(x):
        if x == "now":
            return pendulum.now(tz="UTC").subtract(hours=24).to_date_string()
    
    query_task = BigQueryTask()
    
    @task
    def get_sql():
        with open(os.path.join(os.path.dirname(__file__), "sql/dim_users.sql"), "r") as f:
            sql = f.read()
        return sql
    
    with Flow("Refresh dim_users") as flow:
        min_date = Parameter(
            "min_date",
            default="now",
            required=False,
        )
        min_date2 = get_min_date(min_date)
        
        query_task(
            query=sql,
            query_params=[("min_date", "DATE", min_date2)],
            project=prefect.context.get("BQ_PROJECT_ID"),
            location=prefect.context.get("BQ_LOCATION")
        )
    Jeff Kehler

    Jeff Kehler

    4 months ago
    Thanks @Kevin Kho I tried your suggestion and it has worked. As for the default date I think you are just suggesting to make a separate task to resolve it? The only purpose of the Parameter was to add the ability to run the job manually if some other flows had failed.
    Kevin Kho

    Kevin Kho

    4 months ago
    Yes but Parameters pass through an API call so the values come in JSON, which can’t support Python callables. So you need to handle the logic inside a task based on some simpler input
    Jeff Kehler

    Jeff Kehler

    4 months ago
    Would something like this work for the
    min_date
    then
    @task
    def resolve_min_date(min_date):
        if min_date is None:
            return pendulum.now(tz="UTC").subtract(hours=24).to_date_string()
        return min_date
    Kevin Kho

    Kevin Kho

    4 months ago
    Yes that would work.
    Jeff Kehler

    Jeff Kehler

    4 months ago
    Thanks for your help @Kevin Kho much appreciated!
    Kevin Kho

    Kevin Kho

    4 months ago
    I don’t know about the
    prefect.context.get("BQ_PROJECT_ID")
    . If using Prefect Cloud, I think it’s hard to inject context. I would suggest Parameterizing those too honestly
    And of course!