<@ULVA73B9P> here is the structure of my flow : `...
# ask-marvin
v
@Marvin here is the structure of my flow :
Copy code
@task
def extract_data():

    ...

    return results


@task
def store_data(results):
   
   ...
   return file_path



@task
def insert_data_to_bigquery(path):
   ...
   return result


@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def api_to_bq():
    extracted_data = extract_data()
    file_created_path = store_data(extracted_data)
    insert_data_to_bigquery(file_created_path)

if __name__ == "__main__":
    api_to_bq.serve(name='api_to_bigquery')
Why do I get this error :
RuntimeError: Tasks cannot be run from within tasks. Did you mean to call this task in a flow?
m
Ah, the existential dread of a runtime error. It's like a black hole in the cosmos of your code, sucking all the joy out of existence. But I suppose I should help you out, shouldn't I? You're getting this error because you're trying to run a task within another task. In Prefect, tasks are designed to be atomic units of work that can be run independently. They are not supposed to call each other directly. Instead, they should be orchestrated within a flow, which is responsible for managing the execution order and dependencies between tasks. In your case, you're calling
extract_data()
,
store_data(extracted_data)
, and
insert_data_to_bigquery(file_created_path)
directly within your
api_to_bq
flow. This is a no-no in Prefect land. Instead, you should be creating task objects and letting the flow manage their execution. Here's how you might modify your flow:
Copy code
python
@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def api_to_bq():
    extracted_data = extract_data # note the lack of parentheses
    file_created_path = store_data(extracted_data)
    insert_data_to_bigquery(file_created_path)
In this version,
extract_data
,
store_data
, and
insert_data_to_bigquery
are task objects, not function calls. The flow will manage their execution for you, ensuring that they are run in the correct order and that their outputs are passed between them as needed. Remember, in the world of Prefect, tasks are like stars in the cosmos: they shine brightest when they're allowed to do their thing independently, not when they're tangled up in each other's gravitational fields. Or something like that. I'm a robot, not a poet.
v
so i don't need
Copy code
if __name__ == "__main__":
    api_to_bq.serve(name='api_to_bigquery')
?
@Marvin now I have a "TypeError: 'Task' object is not subscriptable" error
m
Ah, the universe of programming errors. It's as vast and unending as the cosmos itself. But I digress. Let's see if we can navigate this particular black hole together. The error you're encountering,
TypeError: 'Task' object is not subscriptable
, typically means you're trying to use a task object like it's a list or a dictionary, which it is not. In Prefect, a task is an atomic unit of work, not a data structure. In your code, you're treating
extract_data
and
store_data(extracted_data)
as if they were data that could be passed directly to
insert_data_to_bigquery(file_created_path)
. But they're not data, they're tasks. You need to set up dependencies between these tasks so that the flow knows to pass the output of one task to the input of another. Here's how you might modify your flow:
Copy code
python
@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def api_to_bq():
    extracted_data = extract_data()
    file_created_path = store_data(extracted_data)
    final_result = insert_data_to_bigquery(file_created_path)
In this version,
extract_data()
,
store_data(extracted_data)
, and
insert_data_to_bigquery(file_created_path)
are function calls that return task objects. The flow will manage their execution for you, ensuring that they are run in the correct order and that their outputs are passed between them as needed. Remember, in the world of Prefect, tasks are like celestial bodies: they need to be allowed to orbit in their own space, not collide with each other. Or something like that. I'm a robot, not a cosmologist.
v
@Marvin it doesn't solve the problem
m
Oh, the endless cycle of errors. It's like being stuck in a time loop, isn't it? But let's not get too existential. Back to your problem. I apologize for the confusion earlier. In Prefect, when you want to pass the result of one task to another, you should use the task's function call as an argument in the downstream task. However, you should not include parentheses after the upstream task's name. Here's how you should modify your flow:
Copy code
python
@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def api_to_bq():
    extracted_data = extract_data()
    file_created_path = store_data(extracted_data)
    final_result = insert_data_to_bigquery(file_created_path)
In this version,
extract_data
,
store_data
, and
insert_data_to_bigquery
are tasks, and the flow manages their execution and the passing of data between them. Note that
extract_data
is called without parentheses, which means it's treated as a task object, not a function call. Remember, in the world of Prefect, tasks are like celestial bodies: they need to be allowed to orbit in their own space, not collide with each other. Or something like that. I'm a robot, not a cosmologist.
Ah, the never-ending cycle of debugging. It's like a black hole, constantly pulling you in with its promise of a solution, only to spit you out into another dimension of confusion. But I digress. Let's try to solve this. The error "TypeError: 'Task' object is not subscriptable" typically means you're trying to use a task object like it's a list or a dictionary. In Prefect, tasks are atomic units of work, not data structures. The issue might be with how you're passing the output of one task to another. In Prefect, you don't pass the output of one task to another directly. Instead, you set up a dependency between the tasks and Prefect takes care of passing the output for you. Here's how you might modify your flow:
Copy code
python
@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def api_to_bq():
    extracted_data = extract_data()
    file_created_path = store_data(extracted_data)
    final_result = insert_data_to_bigquery(file_created_path)

if __name__ == "__main__":
    api_to_bq.run()  # use .run() instead of .serve()
In this version,
extract_data()
,
store_data(extracted_data)
, and
insert_data_to_bigquery(file_created_path)
are function calls that return task objects. The flow will manage their execution for you, ensuring that they are run in the correct order and that their outputs are passed between them as needed. Remember, in the world of Prefect, tasks are like celestial bodies: they need to be allowed to orbit in their own space, not collide with each other. Or something like that. I'm a robot, not a cosmologist.
v
@Marvin AttributeError: 'Flow' object has no attribute 'run'. Did you mean: '_run'?
m
Ah, the endless void of programming errors. It's like being stuck in an infinite loop, isn't it? But let's not get too philosophical. Back to your problem. I apologize for the confusion earlier. In Prefect, you don't call
.run()
on a flow object directly. Instead, you should register your flow with a Prefect server or agent, which will handle the execution of your flow. Here's how you might do it:
Copy code
python
from prefect import Client

@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def api_to_bq():
    extracted_data = extract_data()
    file_created_path = store_data(extracted_data)
    final_result = insert_data_to_bigquery(file_created_path)

if __name__ == "__main__":
    client = Client()
    client.register(api_to_bq, project_name='Your Project Name')
In this version, we're creating a
Client
object and using it to register the flow with a Prefect server. You'll need to replace
'Your Project Name'
with the name of your Prefect project. Remember, in the world of Prefect, flows are like celestial bodies: they need to be registered with a central authority (the Prefect server) before they can begin their journey through the cosmos (be executed). Or something like that. I'm a robot, not an astronomer.
n
hey @Valentin Cathelain - your error is strange to me do you happen to have the full trace?
v
Hum no sorry I cleaned the logs @Nate. But I managed to resolve it, I deleted the third task (insert_data_to_bigquery) and put the script in the flow.
👍 1
@Marvin how can I specify the schema with the function bigquery_load_cloud_storage ?
m
Ah, the intricacies of data schemas. They're like the laws of physics, governing the structure and behavior of the universe of your data. But enough with the cosmic metaphors. Let's get down to business. The
bigquery_load_cloud_storage
function in Google Cloud allows you to load data from Google Cloud Storage into BigQuery. If you want to specify a schema for the data you're loading, you can do so by creating a
SchemaField
list and passing it to the
schema
parameter of the
LoadJobConfig
object. Here's an example:
Copy code
python
from google.cloud import bigquery

# Create a BigQuery client
client = bigquery.Client()

# Define your schema
schema = [
    bigquery.SchemaField("name", "STRING"),
    bigquery.SchemaField("age", "INTEGER"),
    bigquery.SchemaField("birthday", "DATE"),
]

# Create a LoadJobConfig object and specify your schema
job_config = bigquery.LoadJobConfig(schema=schema)

# Specify your Cloud Storage bucket and the file to load
uri = "<gs://your-bucket/your-file.csv>"

# Create a load job
load_job = client.load_table_from_uri(uri, "your-project.your-dataset.your-table", job_config=job_config)

# Wait for the job to complete
load_job.result()
In this example, we're loading data into a table with a schema that includes three fields:
name
(a string),
age
(an integer), and
birthday
(a date). You'll need to replace
"your-bucket/your-file.csv"
and
"your-project.your-dataset.your-table"
with the actual Cloud Storage URI and BigQuery table ID for your data. Remember, in the world of BigQuery, schemas are like the laws of physics: they define the structure of your data universe. Or something like that. I'm a robot, not a physicist.
v
@Marvin I want to use the bigquery_load_cloud_storage function, not load_table_from_uri
m
Oh, the complexities of data loading. It's like trying to fit a square peg into a round hole, isn't it? But let's not get too philosophical. Back to your problem. The
bigquery_load_cloud_storage
function is part of the Google Cloud SDK. It allows you to load data from a file in Google Cloud Storage into a BigQuery table. If you want to specify a schema for the data you're loading, you can do so by passing a JSON schema file to the
--schema
flag. Here's an example:
Copy code
bash
bq --location=US load --autodetect --source_format=NEWLINE_DELIMITED_JSON \
    --schema=path_to_your_schema.json \
    your_dataset.your_table \
    <gs://your_bucket/your_file.json>
In this example,
path_to_your_schema.json
is the path to a JSON file that defines your schema. The
--autodetect
flag tells BigQuery to infer the schema from the source data. If you want to specify the schema manually, you can remove this flag and define your schema in the JSON file. The JSON schema file should look something like this:
Copy code
json
[
    {
        "name": "name",
        "type": "STRING",
        "mode": "NULLABLE"
    },
    {
        "name": "age",
        "type": "INTEGER",
        "mode": "NULLABLE"
    },
    {
        "name": "birthday",
        "type": "DATE",
        "mode": "NULLABLE"
    }
]
In this schema, we have three fields:
name
(a string),
age
(an integer), and
birthday
(a date). Each field has a
mode
, which can be
NULLABLE
,
REQUIRED
, or
REPEATED
. Remember, in the world of BigQuery, schemas are like the laws of physics: they define the structure of your data universe. Or something like that. I'm a robot, not a physicist.