Alexandru Anghel
05/18/2022, 4:34 PMRuntimeError: Tasks cannot be called from within tasks. Did you mean to call this task in a flow?
I'm not sure what i'm missing here as i run the tasks inside the flow. Could you please assist me?
Thanks! Awesome project, by the way!Zanie
prefect_gcp
are tasksZanie
@task
decorators from your own tasks so they’re just functions that are preparing the underlying task to be called, or you can call the underlying function for the gcp tasks to avoid invoking a task from within a task.Zanie
bigquery_insert_stream.fn(dataset=…
Kevin Kho
Alexandru Anghel
05/18/2022, 6:38 PMfrom prefect import flow, task
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_create_table
from prefect_gcp.bigquery import bigquery_insert_stream
from google.cloud.bigquery import SchemaField
gcp_credentials = GcpCredentials()
@task(name="create table")
def example_bigquery_create_table_flow(dataset, table_name):
schema = [
SchemaField("number", field_type="INTEGER", mode="REQUIRED"),
SchemaField("text", field_type="STRING", mode="REQUIRED"),
SchemaField("bool", field_type="BOOLEAN")
]
result = bigquery_create_table(
dataset=dataset,
table=table_name,
schema=schema,
#time_partitioning="HOUR",
gcp_credentials=gcp_credentials
)
return result
@task(name="insert")
def example_bigquery_insert_stream_flow():
# gcp_credentials = GcpCredentials()
records = [
{"number": 1, "text": "abc", "bool": True},
{"number": 2, "text": "def", "bool": False},
]
result = bigquery_insert_stream(
dataset="kafka_landing",
table="prefect_table",
records=records,
gcp_credentials=gcp_credentials
)
return result
@flow(name="bigquery_data_pipeline")
def run_flow(dataset: str, table_name: str):
a = example_bigquery_create_table_flow(dataset, table_name)
b = example_bigquery_insert_stream_flow(wait_for=[a])
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
DeploymentSpec(
name="bq",
flow_location="./bq.py",
flow_name = "bigquery_data_pipeline",
#tags=['tutorial','test'],
parameters={'dataset':'kafka_landing', 'table_name': 'prefect_table'},
flow_runner=KubernetesFlowRunner(service_account_name = 'kube-runner-sa', namespace='prefect', image="<http://eu.gcr.io/gcp-project/project/prefect:v0.0.1|eu.gcr.io/gcp-project/project/prefect:v0.0.1>", image_pull_policy='Always') #, image_pull_secrets=["gcr"]
)
Alexandru Anghel
05/18/2022, 6:40 PMKevin Kho
Alexandru Anghel
05/23/2022, 12:23 PMTypeError: insert_into_table() got an unexpected keyword argument 'wait_for'
I am using the documentation from here.
Any reason for this error?
Thanks!Kevin Kho
wait_for
here.Kevin Kho
result = bigquery_create_table(
dataset=dataset,
table=table_name,
schema=schema,
#time_partitioning="HOUR",
gcp_credentials=gcp_credentials
)
result.wait()
also