Jeremy Tee
04/02/2021, 7:47 AMprefect cloud
, and my flow will invoke aws lambda
and return me with the response. However, after registering my flow, whenever i try to run it, it throws Unexpected error: TypeError("cannot serialize '_io.BufferedReader' object")
Is there a workaround on this?
@task(name="invoke_lambda")
def invoke_lambda(function_name, table_path, etl_target_date):
lambda_client = boto3.client("lambda")
response = lambda_client.invoke(
FunctionName=function_name,
Payload=json.dumps({"table_path": table_path, "etl_target_date": etl_target_date}),
)
return response
with Flow(
"test-flow",
executor=LocalExecutor(),
run_config=LocalRun(),
storage=S3(
bucket="random-bucket",
),
) as flow:
x = invoke_lambda("test", "a/b/c", "2021/04/02")
flow.register(project_name="xxx", labels=["dev"])
Chris White
Zach Angell
LambdaInvoke
task built in https://github.com/PrefectHQ/prefect/blob/master/src/prefect/tasks/aws/lambda_function.py#L219Zach Angell
from prefect.tasks.aws import LambdaInvoke
invoke_lambda = LambdaInvoke(function_name='my_function', payload={'table_path': 'my_table_path', 'etl_target_date': 'my_etl_target_date'
Jeremy Tee
04/02/2021, 2:02 PMKevin Kho
import prefect
import json
import boto3
from prefect import task, Flow
@task(name="invoke_lambda")
def invoke_lambda(function_name):
lambda_client = boto3.client("lambda")
response = lambda_client.invoke(
FunctionName=function_name,
Payload=json.dumps({}),
)
return response['Payload'].read()
with Flow("lambda-test") as flow:
x = invoke_lambda("TestingLambdaInvoke")
flow.register(project_name="example-project")
Wai Kiat Tan
04/02/2021, 3:30 PMimport json
import boto3
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
from prefect.storage import S3
@task
def extract():
return 8
@task
def transform(x):
session = boto3.Session()
lambda_client = session.client("lambda")
response = lambda_client.invoke(
FunctionName="my-function", Payload=json.dumps({"number": x})
)
return response
with Flow(
name="compute-flow",
executor=LocalDaskExecutor(),
# schedule=Schedule(clocks=[CronClock("*/5 * * * *")]),
storage=S3(
bucket="my-bucket",
),
) as flow:
e = extract()
t = transform(e)
Wai Kiat Tan
04/02/2021, 3:32 PM- name: 'Login Prefect Cloud'
run: |
prefect backend cloud
prefect auth login -t ${{ secrets.PREFECT_TOKEN }}
- name: 'Register Prefect flow'
run: |
prefect register --project data-engineering --label lux --path compute/
Wai Kiat Tan
04/02/2021, 3:34 PMTypeError: cannot pickle '_io.BufferedReader' object
Wai Kiat Tan
04/02/2021, 3:51 PMflow.run()
on local, it works just fine. is this related to how i store my flow code in s3 bucket?Kevin Kho
Kevin Kho
Kevin Kho
@task(name="invoke_lambda")
def invoke_lambda(function_name):
lambda_client = boto3.client("lambda")
response = lambda_client.invoke(
FunctionName=function_name,
Payload=json.dumps({}),
)
return response['Payload'].read()
Kevin Kho
Wai Kiat Tan
04/02/2021, 4:01 PMWai Kiat Tan
04/02/2021, 4:02 PMWai Kiat Tan
04/02/2021, 4:12 PMKevin Kho
Wai Kiat Tan
04/02/2021, 4:13 PMWai Kiat Tan
04/02/2021, 4:13 PMWai Kiat Tan
04/02/2021, 4:13 PMWai Kiat Tan
04/02/2021, 4:14 PMKevin Kho
Wai Kiat Tan
04/02/2021, 4:15 PMfrom prefect.utilities.debug import is_serializable
is_serializable(my_flow) # returns True / False
^ i can use this to check whether the entire flow is serializable?Kevin Kho
Wai Kiat Tan
04/02/2021, 4:22 PMKevin Kho
Jeremy Tee
04/03/2021, 5:30 AM