Alexandru Anghel
05/18/2022, 4:34 PMRuntimeError: Tasks cannot be called from within tasks. Did you mean to call this task in a flow?
I'm not sure what i'm missing here as i run the tasks inside the flow. Could you please assist me?
Thanks! Awesome project, by the way!Zanie
05/18/2022, 5:03 PMprefect_gcp
are tasks@task
decorators from your own tasks so they’re just functions that are preparing the underlying task to be called, or you can call the underlying function for the gcp tasks to avoid invoking a task from within a task.bigquery_insert_stream.fn(dataset=…
Kevin Kho
05/18/2022, 5:26 PMAlexandru Anghel
05/18/2022, 6:38 PMfrom prefect import flow, task
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_create_table
from prefect_gcp.bigquery import bigquery_insert_stream
from google.cloud.bigquery import SchemaField
gcp_credentials = GcpCredentials()
@task(name="create table")
def example_bigquery_create_table_flow(dataset, table_name):
schema = [
SchemaField("number", field_type="INTEGER", mode="REQUIRED"),
SchemaField("text", field_type="STRING", mode="REQUIRED"),
SchemaField("bool", field_type="BOOLEAN")
]
result = bigquery_create_table(
dataset=dataset,
table=table_name,
schema=schema,
#time_partitioning="HOUR",
gcp_credentials=gcp_credentials
)
return result
@task(name="insert")
def example_bigquery_insert_stream_flow():
# gcp_credentials = GcpCredentials()
records = [
{"number": 1, "text": "abc", "bool": True},
{"number": 2, "text": "def", "bool": False},
]
result = bigquery_insert_stream(
dataset="kafka_landing",
table="prefect_table",
records=records,
gcp_credentials=gcp_credentials
)
return result
@flow(name="bigquery_data_pipeline")
def run_flow(dataset: str, table_name: str):
a = example_bigquery_create_table_flow(dataset, table_name)
b = example_bigquery_insert_stream_flow(wait_for=[a])
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
DeploymentSpec(
name="bq",
flow_location="./bq.py",
flow_name = "bigquery_data_pipeline",
#tags=['tutorial','test'],
parameters={'dataset':'kafka_landing', 'table_name': 'prefect_table'},
flow_runner=KubernetesFlowRunner(service_account_name = 'kube-runner-sa', namespace='prefect', image="<http://eu.gcr.io/gcp-project/project/prefect:v0.0.1|eu.gcr.io/gcp-project/project/prefect:v0.0.1>", image_pull_policy='Always') #, image_pull_secrets=["gcr"]
)
Kevin Kho
05/18/2022, 6:43 PMAlexandru Anghel
05/23/2022, 12:23 PMTypeError: insert_into_table() got an unexpected keyword argument 'wait_for'
I am using the documentation from here.
Any reason for this error?
Thanks!Kevin Kho
05/23/2022, 3:27 PMwait_for
here.result = bigquery_create_table(
dataset=dataset,
table=table_name,
schema=schema,
#time_partitioning="HOUR",
gcp_credentials=gcp_credentials
)
result.wait()
also