Hi team, I have a flow runs in cloud, I got the er...
# ask-community
j
Hi team, I have a flow runs in cloud, I got the error: `Task parameter introspection took 223.309 seconds , exceeding
PREFECT_TASK_INTROSPECTION_WARN_THRESHOLD
of 10.0. Try wrapping large task parameters with
prefect.utilities.annotations.quote
for increased performance, e.g.
my_task(quote(param))
. To disable this message set
PREFECT_TASK_INTROSPECTION_WARN_THRESHOLD=0
.`
08:50:26 AM
upload_to_gcs-0
prefect.task_runs
INFO
Process for flow run 'incredible-vicugna' exited with status code: -9; This indicates that the process exited due to a SIGKILL signal. Typically, this is either caused by manual cancellation or high memory usage causing the operating system to terminate the process.
08:50:28 AM
prefect.flow_runs.runner
INFO
Reported flow run '28fc1c8a-b46f-41d6-98a9-6aa82a95150b' as crashed: Flow run process exited with non-zero status code -9.
r
It’s always a bad idea to send a big size variable to any function. If you have “big” data to use from one task to another, you should store it in any storage or database and just send the path / reference to your other tasks
upvote 2
👎 1
j
Thanks for your help @Robin Niel I am confused about it. My flow has three tasks, 1: Pull data from API. 2: Load data to Google Cloud storage 3: Load data to Biguery.
r
No problem. I’m guessing your tasks that’s saving the data to bigquery must have the data sent as a parameter ?
If that’s the case, you should send the bucket / object path to the tasks and load it from inside the “big query task” But in that case, I think bigquery supports loading data from gcs directly without having to load it in memory, would that work for you ?
j
My flow crashed at the second step. It can pull data from API, bu t it can not load to GCS.
r
Ok I see then you should not use a second task, make the first tasks also save to gcs
Issue has to do with serializing the data from one task to another
Same goes for every distributed framework, it’s always bad practice to move big volumes of data
🙌 1
j
Copy code
# Define the Python function to pull data from API
@task
def pull_data_from_api():
    all_records = []
    page = 1
    while True:
        params = {
            "page": page,
            "page_size": 5000
        }

        response = <http://requests.post|requests.post>(url, headers=headers, json=request_body, params=params)
        if response.status_code == 200:
            data = response.json()
            records = data.get('conversions', [])
            if not records:
                break
            all_records.extend(records)
            page += 1
        else:
            print(f"Request failed with status code {response.status_code}")
            break
    return all_records

# Define the Python function to upload data to Google Cloud Storage
@task
def upload_to_gcs(all_records):
    # Define the GCS bucket connection
    credentials_json_string =(f'{gcs_service_account}').replace('\\\\n', '\\n').replace("'", '"')
    credentials_json_object = json.loads(credentials_json_string)
    credentials = service_account.Credentials.from_service_account_info(credentials_json_object)
    storage_client = storage.Client(project=credentials_json_object['project_id'], credentials=credentials)
    if all_records:
        json_data = json.dumps(all_records)
        data_json = '\n'.join(json.dumps(item) for item in all_records)
        bucket = storage_client.get_bucket(f'{GCS_bucket_name}')
        blob = bucket.blob(file_path)
        blob.upload_from_string(data=data_json, content_type='application/json')
        print("Data uploaded to the blob.")
This is my code, you mean I should combine the two tasks to one task?
r
Yes, the problem is your sending “all_records” as a parameter here so prefect has to send it potentially through network so I guess it’s serializing it and running some check on it so it’s bad to move data as parameters when dealing with distributed system
In that particuliar case yes, make them one tasks only to avoid passing the data between tasks
🙏 1
j
The problem is I have to transform the data to json lists before I load it to GCS. It is fine or not?
json_data = json.dumps(all_records)
data_json = '\n'.join(json.dumps(item) for item in all_records)
r
Yes it’s fine
j
the transformation wil cost a lot
r
I think you are misunderstanding the problem here
It’s not the code you’re running, it’s the fact you’re trying to send data between tasks
j
I see. Thanks!
r
No problem 🙂
j
Let me try.
j
Thank you @Robin Niel! @Jiaqiang Li, if you’re interested in a few options, see this discussion of dealing with big data in the docs.
j
Thanks. Jeff
👍 1
I combined the two tasks to one, it still crashed as the same error.
I think I need to try the big data solution.
r
Can you send the new code ? 🙂
j
sure
Copy code
@task
def pull_data_from_api_to_gcs_and_bigquery():
    all_records = []
    page = 1
    while True:
        params = {
            "page": page,
            "page_size": 5000
        }

        response = <http://requests.post|requests.post>(url, headers=headers, json=request_body, params=params)
        if response.status_code == 200:
            data = response.json()
            records = data.get('conversions', [])
            if not records:
                break
            all_records.extend(records)
            page += 1
        else:
            print(f"Request failed with status code {response.status_code}")
            break
    # return all_records
    
    # Upload data to Google Cloud Storage
    # Define the GCS bucket connection
    credentials_json_string =(f'{gcs_service_account}').replace('\\\\n', '\\n').replace("'", '"')
    credentials_json_object = json.loads(credentials_json_string)
    credentials = service_account.Credentials.from_service_account_info(credentials_json_object)
    storage_client = storage.Client(project=credentials_json_object['project_id'], credentials=credentials)
    if all_records:
        json_data = json.dumps(all_records)
        data_json = '\n'.join(json.dumps(item) for item in all_records)
        bucket = storage_client.get_bucket(f'{GCS_bucket_name}')
        blob = bucket.blob(file_path)
        blob.upload_from_string(data=data_json, content_type='application/json')
        print("Data uploaded to the blob.")
Copy code
# Define the prefect flow
@flow
def everflow_clicks_conversions_api_to_gcp():
    pull_data_from_api_to_gcs_and_bigquery()
    print("Data uploaded to GCS and Bigquery")

if __name__ == "__main__":
    everflow_clicks_conversions_api_to_gcp()
r
That’s weird, you don’t have any parameters anymore, the exception should be different 🤔
j
The error is the same:
Process for flow run 'small-crocodile' exited with status code: -9; This indicates that the process exited due to a SIGKILL signal. Typically, this is either caused by manual cancellation or high memory usage causing the operating system to terminate the process.
10:18:42 AM
prefect.flow_runs.runner
INFO
Reported flow run '34db97bf-dd2c-4fce-9555-584775deff25' as crashed: Flow run process exited with non-zero status code -9.
I guess the problem is from the data transformation.
r
it doesn’t specify the parameter introspection anymore, i would not say it’s the same error. I think you’re just running into an out of memory. ave you tried increasing the memory for the job ?
j
How can I increase the memorry?
I only see the time out for the work pool. I don't know where can I increase the memory
101 Views