https://prefect.io logo
Title
d

Daniel Shinoda

01/19/2023, 3:57 AM
Guys, I keep receiving the error message that I am trying to call a task within another task, but I am clearly putting all the tasks only inside the flow function, why I keep receiving this message? The only way that I can make my code work is by transforming the task to a function with ".fn" what I don't want.
1
@task
def query(client, customer_id):

    current_datetime = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    json_file = f'query_results_{customer_id}_{current_datetime}.json'
    with open(json_file, mode='w') as file:
        for row in rows:
            data = {
		(...)
 		}
            json.dump(data, file)
            file.write('\n')
        print(file.name,' created with ',len(rows),'rows')
    return query_to_json


@task
def upload_to_gcs( file_to_send,bucket_name):
    gcp_credentials_block = GcpCredentials.load("ds-projects")
    cloud_storage_upload_blob_from_file(
            file_to_send, bucket_name, "blob", gcp_credentials_block)
    return f'has been uploaded to {bucket_name}'


@flow
def main(customer_id):

    secret_block_refresh_token = Secret.load("g-ads-refresh-token")
    secret_block_dev_token = Secret.load("g-ads-developer-token")
    secret_block_client_secret = Secret.load("g-ads-client-secret")

    credentials = {
 	XXXXXXXXX
	}

    googleads_client = GoogleAdsClient.load_from_dict(credentials)

    file_to_send = query(googleads_client, customer_id)
    bucket_name = "g_ads_geo_report_campaign"
    upload_to_gcs.fn(file_to_send,bucket_name)


    
if __name__ == "__main__":
    main("xxxxx")
a

Anna Geller

01/19/2023, 3:59 AM
cloud_storage_upload_blob_from_file is a task and you call it from another task
try switching this to a subflow to fix this
🤗 1
d

Daniel Shinoda

01/19/2023, 4:07 AM
Wow, so fast, thanks so much! By transforming the task into a subflow the code worked! 🥳
🙌 2