Jeremy Tee
04/28/2021, 7:55 AMUnexpected error: TypeError("object of type 'NoneType' has no len()")
. However, when i rerun a fresh one, it works! Is my way of caching wrong?
prefect = 0.14.15Jeremy Tee
04/28/2021, 7:56 AMimport json
import boto3
from prefect import Flow, flatten, task
from prefect.engine import signals
from prefect.engine.results.s3_result import S3Result
from prefect.executors import LocalExecutor
from prefect.run_configs import LocalRun
from prefect.storage import S3
@task(
name="get_names",
target="result/{flow_name}/{flow_run_name}/{task_full_name}",
checkpoint=True,
result=S3Result("prefect-flow"),
)
def get_names():
return ["Peter", "James", "Gee"]
@task(
name="get_skills",
target="result/{flow_name}/{flow_run_name}/{task_full_name}",
checkpoint=True,
result=S3Result("prefect-flow"),
)
def get_skills(name):
skills_mapping = {
"Peter": ["IT", "Accounting"],
"James": ["Mathematics", "English"],
"Gee": ["Python", "SQL"],
}
return skills_mapping.get(name, [])
@task(
name="invoke_lambda",
target="result/{flow_name}/{flow_run_name}/{task_full_name}",
checkpoint=True,
result=S3Result("prefect-flow"),
)
def invoke_lambda(skill):
print(f"invoking lambda with skill: {skill}")
lambda_client = boto3.client("lambda", region_name="ap-southeast-1")
response = lambda_client.invoke(
FunctionName="prefect-test", Payload=json.dumps(skill)
)
payload = json.loads(response["Payload"].read())
if payload.get("errorMessage"):
raise signals.FAIL()
return payload
with Flow(
"pikachu",
executor=LocalExecutor(),
run_config=LocalRun(),
storage=S3(
bucket="prefect-flow",
),
) as flow:
names = get_names()
skills = get_skills.map(names)
response = invoke_lambda.map(flatten(skills))
flow.register(
project_name="jeremy",
labels=["dev"],
)
Jeremy Tee
04/28/2021, 7:57 AMJeremy Tee
04/28/2021, 8:01 AMJeremy Tee
04/28/2021, 8:27 AMKevin Kho
Jeremy Tee
04/28/2021, 4:11 PMKevin Kho
Jeremy Tee
04/29/2021, 2:32 AMJeremy Tee
04/29/2021, 2:36 AMKevin Kho
Kevin Kho
Jeremy Tee
04/29/2021, 2:50 AMKevin Kho
def invoke_lambda(skill):
import time
time.sleep(10)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(skill)
return f"{skill}-processed"
Kevin Kho
Jeremy Tee
04/29/2021, 2:52 AMKevin Kho
Jeremy Tee
04/29/2021, 3:06 AMJeremy Tee
04/29/2021, 3:35 AMKevin Kho
Kevin Kho
Jeremy Tee
04/29/2021, 5:01 AMKevin Kho