w

    Wai Kiat Tan

    1 year ago
    Hi, I have a flow in Prefect Cloud which invokes AWS lambda to perform file aggregation with different S3 bucket paths. The agent is a local agent in a Docker Container hosted in AWS ECS. Problems: When the lambda runs more than 6-7 minutes, the agent is not able to detect the response return from the lambda. And the task keep running forever. I follow the recommendation from here https://docs.prefect.io/orchestration/agents/local.html#requirements and use the official Prefect docker image. Is there anyway I can solve/improve on this? Thanks!
    Note that, while the task keeps running, it actually invoke the lambda again with the same input. Hope this information provide insights.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Wai Kiat Tan, do you have a timeout specified?
    w

    Wai Kiat Tan

    1 year ago
    hi, @Kevin Kho yea, we do have this line
    lambda_client = create_client(connect_timeout=900, read_timeout=900)
    Kevin Kho

    Kevin Kho

    1 year ago
    Is that create_client boto3?
    w

    Wai Kiat Tan

    1 year ago
    yea, that's the one from boto3
    client = session.client("lambda", config=config, region_name="ap-southeast-1")
    from botocore.config import Config
    
    config = Config(
        connect_timeout=connect_timeout,
        read_timeout=read_timeout,
    )
    ^ that's the original code that i have
    Kevin Kho

    Kevin Kho

    1 year ago
    It seems like this timeout is what causing the restarts…combined with Prefect retrying it with the same input. Though the timeout is higher than 6-7 minutes so I don’t know. What is the Lambda doing?
    w

    Wai Kiat Tan

    1 year ago
    but we already max the timeout to 900 seconds, right? our decorator @task does not have retry specified, is there a default value to it? the lambda reads JSON objects from S3 and aggregate those JSON files into a large JSON file (new line separated)
    Kevin Kho

    Kevin Kho

    1 year ago
    There is no default timeout to the Prefect task. Maybe you can turn on boto3 logs and see what’s going on there? Sample addition to do that
    LocalRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
    . boto3 gives nothing on the info level
    w

    Wai Kiat Tan

    1 year ago
    There is no default timeout to the Prefect task. <- do you mean there is not default retry value? can i set those
    env
    in the prefect official docker file instead?
    also, i would like to share. if i run
    prefect agent local start -t $PREFECT_SERVICE_ACCOUNT
    locally in my machine, the flow is able to execute perfectly fine
    1. run agent locally in my machine is working 2. run agent locally within a docker container is not
    Kevin Kho

    Kevin Kho

    1 year ago
    Yeah so just to be clear, timeout and max_retries are separate values. There is no default for them, You can set an environment variable like
    PREFECT__TASKS__DEFAULTS__MAX_RETRIES=4
    as seen here . Yes you can set them in the Dockerfile but most people just supply it in the task like
    @task(max_retries = 3, retry_delay = …, timeout=900)
    . Oh thanks for sharing that. Are you sure your container has access to outside traffic?
    w

    Wai Kiat Tan

    1 year ago
    im certain that the container has access to outside traffic, if the number of JSON file is small, im able to process everything and the flow completes just fine
    Kevin Kho

    Kevin Kho

    1 year ago
    I think turning on boto3 logs and making the logs debug mode will give us more insight to how those retries are happening and what is causing the timeout
    w

    Wai Kiat Tan

    1 year ago
    okay, to clarify.1. the task which triggers the mentioned lambda does not have retry set 2. the timeout is set on lambda client, not within a task decorator
    alright, i get back to you after turning on the boto3
    davzucky

    davzucky

    1 year ago
    Could the task restart because of the liveness? If the task doesn't send back ping. Towel will flag it at dead and retry, no?
    @Wai Kiat Tan are you using dask?
    w

    Wai Kiat Tan

    1 year ago
    hi @davzucky i was thinking about the liveness, but i could not find a way to adjust it. i even turned off the heartbeat in the flow setting. LOL, got that idea from a github issue
    davzucky

    davzucky

    1 year ago
    Can you try disable the heartbeat on your flow from the UI?
    w

    Wai Kiat Tan

    1 year ago
    no, im not using dask, just pure prefect, boto3 in flow
    davzucky

    davzucky

    1 year ago
    O.k. I had problem with long blocking task on mongo. Disabling the heartbeat fix our problem. However this is at the workflow level.
    Would be nice to be able at the task level this is a long running and blocking task.
    You are using docker. Are you specifing any resources quota?
    w

    Wai Kiat Tan

    1 year ago
    so within your task, you query mongo to extract some data, but it takes too long for the flow to detect it?
    im did not specific any resources quote
    but the heavy lifting is done by the lambda instead of the agent
    i
    docker stats
    , everything looks okay
    Kevin Kho

    Kevin Kho

    1 year ago
    Can we try the extra logs then?
    davzucky

    davzucky

    1 year ago
    We are using the aggregate fearure of mongo. This is running for 10-15 mins and we were getting heartbeat problem. We extracted this task in is own flow and disabled the heartbeat. The root cause look to be the fact that the mongo driver is locking the process.v
    Can you share code of your call? Are you calling the lambda async or sync? I know they are two modes
    w

    Wai Kiat Tan

    1 year ago
    @task(
        name="",
        target="",
        checkpoint=True,
        result=S3Result(f""),
    )
    def invoke_blob_loader(table_path, flow_date, source_bucket, target_bucket):
        body = {
            "table_path": table_path,
            "flow_date": flow_date,
            "source_bucket": source_bucket,
            "target_bucket": target_bucket,
        }
        dto = construct_dto(body)
        <http://logger.info|logger.info>(json.dumps(dto, default=str))
    
        response = invoke(lambda_client, f"de-json2bq-{environment}-blob-loader", dto)
        validated_response = validate_lambda_response(response)
        <http://logger.info|logger.info>(json.dumps(validated_response))
        return validated_response["payload"]
    the invoke function wraps on top of boto3 client
    it is sync, because i expect the lambda to return me some info, so i can pass it to the downstream task
    @Kevin Kho, yea, i will run that soon
    i keep seeing the this after the task invokes the lambda, but i did not see any boto3 related log though
    ENV PREFECT__LOGGING__LEVEL="DEBUG"
    ENV PREFECT__LOGGING__EXTRA_LOGGERS="['boto3']"
    added this into my dockerfile
    @Kevin Kho just to share the dockerfile
    FROM prefecthq/prefect:0.14.21-python3.8
    
    ARG PREFECT_SERVICE_ACCOUNT
    ARG ENVIRONMENT
    
    ENV ENVIRONMENT ${ENVIRONMENT}
    ENV PREFECT__LOGGING__LEVEL="DEBUG"
    ENV PREFECT__LOGGING__EXTRA_LOGGERS="['boto3']"
    
    RUN prefect backend cloud
    CMD prefect agent local start -t ***********
    am i expecting to see some aws logs here? just curious
    Kevin Kho

    Kevin Kho

    1 year ago
    When I tried this on the RunConfig it worked and I got boto3 logs yep. I don’t know about the Dockerfile env syntax
    The syntax looks right from what I can tell
    But maybe the Prefect logs will be enough to see why the task is being retried so I would run and see the logs still
    Talked to the team members and since it works with a small number of JSON files, but not with a larger one, this could be a sign of docker resource limits being hit. I know you mentioned you check the stats but there might be something there hanging
    w

    Wai Kiat Tan

    1 year ago
    @Kevin Kho let me reproduce this with the prefect logs again i think the number of files is not the concern though, because the prefect flow passes the s3 paths from one task to another the prefect task invoke the lambda with a s3 path, the invoked lambda lists number of files in the s3 path and read them
    @Kevin Kho that's the prefect log
    [2021-06-08 01:53:33+0000] INFO - prefect.CloudTaskRunner | Task 'invoke_blob_loader': Starting task run...
    [2021-06-08 01:53:33+0000] DEBUG - prefect.CloudTaskRunner | Task 'invoke_blob_loader': Handling state change from Pending to Mapped
    [2021-06-08 01:53:34+0000] INFO - prefect.CloudTaskRunner | Task 'invoke_blob_loader': Finished task run for task with final state: 'Mapped'
    [2021-06-08 01:53:34+0000] INFO - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Starting task run...
    [2021-06-08 01:53:34+0000] DEBUG - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Handling state change from Pending to Running
    [2021-06-08 01:53:35+0000] DEBUG - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Calling task.run() method...
    [2021-06-08 01:53:35+0000] INFO - prefect | {"flow_id": "8b806ae3-0e7f-471e-882a-3316f6878b3d", "flow_run_id": "fdb9c515-4e79-46d4-8f28-7d6679beb024", "flow_run_name": "refreshing-chameleon", "flow_run_version": 3, "task_id": "f699e90d-11aa-457b-91e5-3b8535e4109a", "task_run_id": "7acfb38a-cb60-473e-8780-55a7d3083140", "event_type": "internal_data_json_event", "event_timestamp": "2021-06-08T01:53:22.084436+00:00", "body": {OUR INTERNAL INPUT}}
    [2021-06-08 01:53:38+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:53:54+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:54:09+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:54:25+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:54:41+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:54:56+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:55:12+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [2021-06-08 01:55:27+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [2021-06-08 01:55:43+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:55:58+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:56:14+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:56:29+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:56:45+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:57:00+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:57:16+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [2021-06-08 01:57:31+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:57:47+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:58:02+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:58:18+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [DEBUG tini (1)] Passing signal: 'Window changed'
    [DEBUG tini (1)] Passing signal: 'Window changed'
    [2021-06-08 01:58:34+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:58:49+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:59:05+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:59:20+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [2021-06-08 01:59:36+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 01:59:51+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 02:00:07+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 02:00:22+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 02:00:38+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 02:00:53+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 02:01:09+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [2021-06-08 02:01:24+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [2021-06-08 02:01:40+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 02:01:55+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 02:02:11+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 02:02:26+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [2021-06-08 02:02:42+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 02:02:57+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 02:03:12+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 02:03:28+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
    [2021-06-08 02:03:43+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
    [2021-06-08 02:03:58+0000] INFO - prefect | {"request_id": "192daff4-600b-4d16-9715-25c6f8ac9956", "payload": {"status_code": 200, OUR_INTERNAL_OUTPUT}}
    [2021-06-08 02:03:58+0000] DEBUG - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Handling state change from Running to Success
    [2021-06-08 02:03:59+0000] INFO - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Finished task run for task with final state: 'Success'
    if you look into the following attachments, you can notice that the same lambda is triggered twice
    the first time is completed approximately at 09:59:08
    the second time is triggered at 09:58:52
    while the task is should wait for the response returned from the first time
    Kevin Kho

    Kevin Kho

    1 year ago
    Thanks for the detailed logs! Nothing seems to have retried on the Prefect side. If Prefect retired the task also, you would be double logs for
    <http://logger.info|logger.info>(json.dumps(dto, default=str))
    and you would see a Retry state. Are you using a
    LocalExecutor
    ? I'll ask the team about that
    tini
    "window changed" to get more info, but I doubt it's that. Could you share the flow code? Just the
    with Flow() as flow
    and onwards. You can share it privately if there's sensitive info though of course remove it.
    I'm wondering if there was
    flow.run()
    left in the code when it was registered causing 2 runs. This can happen if you have
    flow.run()
    and then register with the CLI
    w

    Wai Kiat Tan

    1 year ago
    @Kevin Kho yea, im using the
    LocalExecutor
    . Sure, i will share the flow code with you separately. thank!
    we do not have
    flow.run()
    left in the code =(