hi everybody, I am currently using `prefect cloud`...
# ask-community
j
hi everybody, I am currently using
prefect cloud
, and my flow will invoke
aws lambda
and return me with the response. However, after registering my flow, whenever i try to run it, it throws
Unexpected error: TypeError("cannot serialize '_io.BufferedReader' object")
Is there a workaround on this?
Copy code
@task(name="invoke_lambda")
def invoke_lambda(function_name, table_path, etl_target_date):
    lambda_client = boto3.client("lambda")
    response = lambda_client.invoke(
        FunctionName=function_name,
        Payload=json.dumps({"table_path": table_path, "etl_target_date": etl_target_date}),
    )
    return response


with Flow(
    "test-flow",
    executor=LocalExecutor(),
    run_config=LocalRun(),
    storage=S3(
        bucket="random-bucket",
    ),
) as flow:
    x = invoke_lambda("test", "a/b/c", "2021/04/02")


flow.register(project_name="xxx", labels=["dev"])
c
Hi Jeremy - this suggests that boto3’s response object is not serializable (very common in boto3); you need to figure out how to convert the response to a basic Python object - perhaps there is a to_json() method or something like that?
upvote 1
z
Something like the following may work
Copy code
from prefect.tasks.aws import LambdaInvoke

invoke_lambda = LambdaInvoke(function_name='my_function', payload={'table_path': 'my_table_path', 'etl_target_date': 'my_etl_target_date'
j
I have also tried the lambdaInvoke task by prefect but it also has the same issue
k
Can you do something like this? (this worked for me) Just read the response content. I also had the same error as you without reading it.
Copy code
import prefect
import json
import boto3
from prefect import task, Flow

@task(name="invoke_lambda")
def invoke_lambda(function_name):
    lambda_client = boto3.client("lambda")
    response = lambda_client.invoke(
        FunctionName=function_name,
        Payload=json.dumps({}),
    )
    return response['Payload'].read()

with Flow("lambda-test") as flow:
    x = invoke_lambda("TestingLambdaInvoke")

flow.register(project_name="example-project")
w
hi, i encountered something similar, my code is as follows:
Copy code
import json

import boto3
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
from prefect.storage import S3


@task
def extract():
    return 8


@task
def transform(x):
    session = boto3.Session()
    lambda_client = session.client("lambda")

    response = lambda_client.invoke(
        FunctionName="my-function", Payload=json.dumps({"number": x})
    )

    return response


with Flow(
    name="compute-flow",
    executor=LocalDaskExecutor(),
    # schedule=Schedule(clocks=[CronClock("*/5 * * * *")]),
    storage=S3(
        bucket="my-bucket",
    ),
) as flow:
    e = extract()
    t = transform(e)
the flow registration is done via github action as follows:
Copy code
- name: 'Login Prefect Cloud'
        run: |
          prefect backend cloud
          prefect auth login -t ${{ secrets.PREFECT_TOKEN }}
      - name: 'Register Prefect flow'
        run: |
          prefect register --project data-engineering --label lux --path compute/
the error i received is also
Copy code
TypeError: cannot pickle '_io.BufferedReader' object
when i performed a
flow.run()
on local, it works just fine. is this related to how i store my flow code in s3 bucket?
k
I believe this is more related to task results needing to be serializable for it to be distributed to Dask workers: https://docs.prefect.io/core/concepts/common-pitfalls.html#serialization
🚀 1
so the output of the task needs to be pickle-able with cloudpickle. The boto3 response object in this case is not, so you need to convert it to json or a dictionary for it to be serializable
something like this will help you:
Copy code
@task(name="invoke_lambda")
def invoke_lambda(function_name):
    lambda_client = boto3.client("lambda")
    response = lambda_client.invoke(
        FunctionName=function_name,
        Payload=json.dumps({}),
    )
    return response['Payload'].read()
Nice use of Github Actions by the way
w
thanks kevin, let me give it a try right away
does executor matters in this context? i can swap to local executor though
kevin, thanks! it works
marvin 2
k
thanks for telling me! i was making a test myself
w
now i understand chris's statement
because response["payload"] is <botocore.response.StreamingBody>
^ that is not serializable
correct me, if my understanding is wrong
k
yes that's exactly right
w
a follow up question,
Copy code
from prefect.utilities.debug import is_serializable


is_serializable(my_flow) # returns True / False
^ i can use this to check whether the entire flow is serializable?
k
Yes
w
alright! thanks for pointing me to the right direction.
k
No problem. Happy to help 🙂
j
@Kevin Kho thank you 😄. will give it a try!
👍 1