Hello, I am new to Prefect and i've started evalu...
# prefect-community
a
Hello, I am new to Prefect and i've started evaluating the Orion version for including it into a project i'm working on. What i am trying to do is create a basic Bigquery table and insert some dummy values in it (the tutorial from here: https://prefecthq.github.io/prefect-gcp/bigquery/ I can create the deployment but when trying to run it i get this error:
Copy code
RuntimeError: Tasks cannot be called from within tasks. Did you mean to call this task in a flow?
I'm not sure what i'm missing here as i run the tasks inside the flow. Could you please assist me? Thanks! Awesome project, by the way!
z
The functions you are importing from
prefect_gcp
are tasks
You can remove the
@task
decorators from your own tasks so they’re just functions that are preparing the underlying task to be called, or you can call the underlying function for the gcp tasks to avoid invoking a task from within a task.
upvote 1
e.g. for the second solution:
bigquery_insert_stream.fn(dataset=…
k
Hey @Alexandru Anghel, could you move the code and error into the thread when you get a chance to keep the main channel cleaner?
a
Moving code to thread:
Copy code
from prefect import flow, task
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_create_table
from prefect_gcp.bigquery import bigquery_insert_stream
from google.cloud.bigquery import SchemaField


gcp_credentials = GcpCredentials()

@task(name="create table")
def example_bigquery_create_table_flow(dataset, table_name):
    schema = [
        SchemaField("number", field_type="INTEGER", mode="REQUIRED"),
        SchemaField("text", field_type="STRING", mode="REQUIRED"),
        SchemaField("bool", field_type="BOOLEAN")
    ]
    result = bigquery_create_table(
        dataset=dataset,
        table=table_name,
        schema=schema,
        #time_partitioning="HOUR",
        gcp_credentials=gcp_credentials
    )

    return result


@task(name="insert")
def example_bigquery_insert_stream_flow():
    # gcp_credentials = GcpCredentials()
    records = [
        {"number": 1, "text": "abc", "bool": True},
        {"number": 2, "text": "def", "bool": False},
    ]
    result = bigquery_insert_stream(
        dataset="kafka_landing",
        table="prefect_table",
        records=records, 
        gcp_credentials=gcp_credentials
    )

    return result

@flow(name="bigquery_data_pipeline")
def run_flow(dataset: str, table_name: str):
    a = example_bigquery_create_table_flow(dataset, table_name)
    b = example_bigquery_insert_stream_flow(wait_for=[a])

from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner


DeploymentSpec(
    name="bq",
    flow_location="./bq.py",
    flow_name = "bigquery_data_pipeline",
    #tags=['tutorial','test'],
    parameters={'dataset':'kafka_landing', 'table_name': 'prefect_table'},
    flow_runner=KubernetesFlowRunner(service_account_name = 'kube-runner-sa', namespace='prefect', image="<http://eu.gcr.io/gcp-project/project/prefect:v0.0.1|eu.gcr.io/gcp-project/project/prefect:v0.0.1>", image_pull_policy='Always') #,  image_pull_secrets=["gcr"]
)
Thank you, @Zanie! Makes sense now, i will give a try tomorrow.
k
Thanks for moving!
👍 1
a
Hello @Zanie, I have removed the "@task" decorators, but now i am facing another error:
TypeError: insert_into_table() got an unexpected keyword argument 'wait_for'
I am using the documentation from here. Any reason for this error? Thanks!
k
Subflows block execution so you might not need
wait_for
here.
You can do:
Copy code
result = bigquery_create_table(
        dataset=dataset,
        table=table_name,
        schema=schema,
        #time_partitioning="HOUR",
        gcp_credentials=gcp_credentials
    )
    result.wait()
also
upvote 1