https://prefect.io logo
Title
a

Alexandru Anghel

05/18/2022, 4:34 PM
Hello, I am new to Prefect and i've started evaluating the Orion version for including it into a project i'm working on. What i am trying to do is create a basic Bigquery table and insert some dummy values in it (the tutorial from here: https://prefecthq.github.io/prefect-gcp/bigquery/ I can create the deployment but when trying to run it i get this error:
RuntimeError: Tasks cannot be called from within tasks. Did you mean to call this task in a flow?
I'm not sure what i'm missing here as i run the tasks inside the flow. Could you please assist me? Thanks! Awesome project, by the way!
z

Zanie

05/18/2022, 5:03 PM
The functions you are importing from
prefect_gcp
are tasks
You can remove the
@task
decorators from your own tasks so they’re just functions that are preparing the underlying task to be called, or you can call the underlying function for the gcp tasks to avoid invoking a task from within a task.
:upvote: 1
e.g. for the second solution:
bigquery_insert_stream.fn(dataset=…
k

Kevin Kho

05/18/2022, 5:26 PM
Hey @Alexandru Anghel, could you move the code and error into the thread when you get a chance to keep the main channel cleaner?
a

Alexandru Anghel

05/18/2022, 6:38 PM
Moving code to thread:
from prefect import flow, task
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_create_table
from prefect_gcp.bigquery import bigquery_insert_stream
from google.cloud.bigquery import SchemaField


gcp_credentials = GcpCredentials()

@task(name="create table")
def example_bigquery_create_table_flow(dataset, table_name):
    schema = [
        SchemaField("number", field_type="INTEGER", mode="REQUIRED"),
        SchemaField("text", field_type="STRING", mode="REQUIRED"),
        SchemaField("bool", field_type="BOOLEAN")
    ]
    result = bigquery_create_table(
        dataset=dataset,
        table=table_name,
        schema=schema,
        #time_partitioning="HOUR",
        gcp_credentials=gcp_credentials
    )

    return result


@task(name="insert")
def example_bigquery_insert_stream_flow():
    # gcp_credentials = GcpCredentials()
    records = [
        {"number": 1, "text": "abc", "bool": True},
        {"number": 2, "text": "def", "bool": False},
    ]
    result = bigquery_insert_stream(
        dataset="kafka_landing",
        table="prefect_table",
        records=records, 
        gcp_credentials=gcp_credentials
    )

    return result

@flow(name="bigquery_data_pipeline")
def run_flow(dataset: str, table_name: str):
    a = example_bigquery_create_table_flow(dataset, table_name)
    b = example_bigquery_insert_stream_flow(wait_for=[a])

from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner


DeploymentSpec(
    name="bq",
    flow_location="./bq.py",
    flow_name = "bigquery_data_pipeline",
    #tags=['tutorial','test'],
    parameters={'dataset':'kafka_landing', 'table_name': 'prefect_table'},
    flow_runner=KubernetesFlowRunner(service_account_name = 'kube-runner-sa', namespace='prefect', image="<http://eu.gcr.io/gcp-project/project/prefect:v0.0.1|eu.gcr.io/gcp-project/project/prefect:v0.0.1>", image_pull_policy='Always') #,  image_pull_secrets=["gcr"]
)
Thank you, @Zanie! Makes sense now, i will give a try tomorrow.
k

Kevin Kho

05/18/2022, 6:43 PM
Thanks for moving!
👍 1
a

Alexandru Anghel

05/23/2022, 12:23 PM
Hello @Zanie, I have removed the "@task" decorators, but now i am facing another error:
TypeError: insert_into_table() got an unexpected keyword argument 'wait_for'
I am using the documentation from here. Any reason for this error? Thanks!
k

Kevin Kho

05/23/2022, 3:27 PM
Subflows block execution so you might not need
wait_for
here.
You can do:
result = bigquery_create_table(
        dataset=dataset,
        table=table_name,
        schema=schema,
        #time_partitioning="HOUR",
        gcp_credentials=gcp_credentials
    )
    result.wait()
also
:upvote: 1