Hi everybody, when i tried to rerun my failed mapp...
# ask-community
j
Hi everybody, when i tried to rerun my failed mapped task from prefect cloud by clicking on the restart button, it prompts me this error
Unexpected error: TypeError("object of type 'NoneType' has no len()")
. However, when i rerun a fresh one, it works! Is my way of caching wrong? prefect = 0.14.15
Copy code
import json

import boto3
from prefect import Flow, flatten, task
from prefect.engine import signals
from prefect.engine.results.s3_result import S3Result
from prefect.executors import LocalExecutor
from prefect.run_configs import LocalRun
from prefect.storage import S3


@task(
    name="get_names",
    target="result/{flow_name}/{flow_run_name}/{task_full_name}",
    checkpoint=True,
    result=S3Result("prefect-flow"),
)
def get_names():
    return ["Peter", "James", "Gee"]


@task(
    name="get_skills",
    target="result/{flow_name}/{flow_run_name}/{task_full_name}",
    checkpoint=True,
    result=S3Result("prefect-flow"),
)
def get_skills(name):
    skills_mapping = {
        "Peter": ["IT", "Accounting"],
        "James": ["Mathematics", "English"],
        "Gee": ["Python", "SQL"],
    }
    return skills_mapping.get(name, [])


@task(
    name="invoke_lambda",
    target="result/{flow_name}/{flow_run_name}/{task_full_name}",
    checkpoint=True,
    result=S3Result("prefect-flow"),
)
def invoke_lambda(skill):
    print(f"invoking lambda with skill: {skill}")
    lambda_client = boto3.client("lambda", region_name="ap-southeast-1")
    response = lambda_client.invoke(
        FunctionName="prefect-test", Payload=json.dumps(skill)
    )
    payload = json.loads(response["Payload"].read())
    if payload.get("errorMessage"):
        raise signals.FAIL()
    return payload


with Flow(
    "pikachu",
    executor=LocalExecutor(),
    run_config=LocalRun(),
    storage=S3(
        bucket="prefect-flow",
    ),
) as flow:
    names = get_names()
    skills = get_skills.map(names)
    response = invoke_lambda.map(flatten(skills))


flow.register(
    project_name="jeremy",
    labels=["dev"],
)
it fails when attempting to call the mapped task when i try to restart. It will work if i create a new flow run.
this is what is cache in s3 on my first run
if i remove flatten and recreate the scenario, it has no issues
k
Yes I was able to recreate this. Will ask the team.
j
Thank you @Kevin Kho!
k
This is likely a bug. I can write up the issue, or you can if you want.
j
@Kevin Kho can you write up the issue? I am not very confident in writing one up 😅
or is there a guide for me to follow? If yes, then I don't mind writing one up!
k
When you click new issue there will be a template to follow
Oh I think you can replace the lambda section with a bit lighter weight code.
j
Do you have an example?
k
Copy code
def invoke_lambda(skill):
    import time
    time.sleep(10)
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(skill) 
    return f"{skill}-processed"
I recreated with this and used a Local Executor and Cancelled my Flows with the UI halfway and then tried to restart
j
hmm but for my use case i wanted it to invoke a lambda and check if it is succeeded without any error
k
I see what you mean. Sounds good. Just use the same code for the issue 🙂
j
alright! I will link the issue here once I am done!
k
This looks good, could you add flatten to the title cuz i think that’s related?
Just looked over it again and thanks for adding a lot of detail
j
Ok i have made the changes to include flatten in my title!
k
Thanks a lot! Really appreciate the effort