Hi, I have a flow in Prefect Cloud which invokes ...
# ask-community
w
Hi, I have a flow in Prefect Cloud which invokes AWS lambda to perform file aggregation with different S3 bucket paths. The agent is a local agent in a Docker Container hosted in AWS ECS. Problems: When the lambda runs more than 6-7 minutes, the agent is not able to detect the response return from the lambda. And the task keep running forever. I follow the recommendation from here https://docs.prefect.io/orchestration/agents/local.html#requirements and use the official Prefect docker image. Is there anyway I can solve/improve on this? Thanks!
upvote 1
Note that, while the task keeps running, it actually invoke the lambda again with the same input. Hope this information provide insights.
k
Hi @Wai Kiat Tan, do you have a timeout specified?
w
hi, @Kevin Kho yea, we do have this line
Copy code
lambda_client = create_client(connect_timeout=900, read_timeout=900)
k
Is that create_client boto3?
w
yea, that's the one from boto3
Copy code
client = session.client("lambda", config=config, region_name="ap-southeast-1")
Copy code
from botocore.config import Config

config = Config(
    connect_timeout=connect_timeout,
    read_timeout=read_timeout,
)
^ that's the original code that i have
k
It seems like this timeout is what causing the restarts…combined with Prefect retrying it with the same input. Though the timeout is higher than 6-7 minutes so I don’t know. What is the Lambda doing?
w
but we already max the timeout to 900 seconds, right? our decorator @task does not have retry specified, is there a default value to it? the lambda reads JSON objects from S3 and aggregate those JSON files into a large JSON file (new line separated)
k
There is no default timeout to the Prefect task. Maybe you can turn on boto3 logs and see what’s going on there? Sample addition to do that
LocalRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
. boto3 gives nothing on the info level
w
There is no default timeout to the Prefect task. <- do you mean there is not default retry value? can i set those
env
in the prefect official docker file instead?
also, i would like to share. if i run
Copy code
prefect agent local start -t $PREFECT_SERVICE_ACCOUNT
locally in my machine, the flow is able to execute perfectly fine
1. run agent locally in my machine is working 2. run agent locally within a docker container is not
k
Yeah so just to be clear, timeout and max_retries are separate values. There is no default for them, You can set an environment variable like
PREFECT__TASKS__DEFAULTS__MAX_RETRIES=4
as seen here . Yes you can set them in the Dockerfile but most people just supply it in the task like
@task(max_retries = 3, retry_delay = …, timeout=900)
. Oh thanks for sharing that. Are you sure your container has access to outside traffic?
w
im certain that the container has access to outside traffic, if the number of JSON file is small, im able to process everything and the flow completes just fine
k
I think turning on boto3 logs and making the logs debug mode will give us more insight to how those retries are happening and what is causing the timeout
w
okay, to clarify. 1. the task which triggers the mentioned lambda does not have retry set 2. the timeout is set on lambda client, not within a task decorator
alright, i get back to you after turning on the boto3
d
Could the task restart because of the liveness? If the task doesn't send back ping. Towel will flag it at dead and retry, no?
@Wai Kiat Tan are you using dask?
w
hi @davzucky i was thinking about the liveness, but i could not find a way to adjust it. i even turned off the heartbeat in the flow setting. LOL, got that idea from a github issue
d
Can you try disable the heartbeat on your flow from the UI?
w
no, im not using dask, just pure prefect, boto3 in flow
d
O.k. I had problem with long blocking task on mongo. Disabling the heartbeat fix our problem. However this is at the workflow level.
Would be nice to be able at the task level this is a long running and blocking task.
You are using docker. Are you specifing any resources quota?
w
so within your task, you query mongo to extract some data, but it takes too long for the flow to detect it?
im did not specific any resources quote
but the heavy lifting is done by the lambda instead of the agent
i
docker stats
, everything looks okay
k
Can we try the extra logs then?
d
We are using the aggregate fearure of mongo. This is running for 10-15 mins and we were getting heartbeat problem. We extracted this task in is own flow and disabled the heartbeat. The root cause look to be the fact that the mongo driver is locking the process.v
Can you share code of your call? Are you calling the lambda async or sync? I know they are two modes
w
Copy code
@task(
    name="",
    target="",
    checkpoint=True,
    result=S3Result(f""),
)
def invoke_blob_loader(table_path, flow_date, source_bucket, target_bucket):
    body = {
        "table_path": table_path,
        "flow_date": flow_date,
        "source_bucket": source_bucket,
        "target_bucket": target_bucket,
    }
    dto = construct_dto(body)
    <http://logger.info|logger.info>(json.dumps(dto, default=str))

    response = invoke(lambda_client, f"de-json2bq-{environment}-blob-loader", dto)
    validated_response = validate_lambda_response(response)
    <http://logger.info|logger.info>(json.dumps(validated_response))
    return validated_response["payload"]
the invoke function wraps on top of boto3 client
it is sync, because i expect the lambda to return me some info, so i can pass it to the downstream task
@Kevin Kho, yea, i will run that soon
i keep seeing the this after the task invokes the lambda, but i did not see any boto3 related log though
Copy code
ENV PREFECT__LOGGING__LEVEL="DEBUG"
ENV PREFECT__LOGGING__EXTRA_LOGGERS="['boto3']"
added this into my dockerfile
@Kevin Kho just to share the dockerfile
Copy code
FROM prefecthq/prefect:0.14.21-python3.8

ARG PREFECT_SERVICE_ACCOUNT
ARG ENVIRONMENT

ENV ENVIRONMENT ${ENVIRONMENT}
ENV PREFECT__LOGGING__LEVEL="DEBUG"
ENV PREFECT__LOGGING__EXTRA_LOGGERS="['boto3']"

RUN prefect backend cloud
CMD prefect agent local start -t ***********
am i expecting to see some aws logs here? just curious
k
When I tried this on the RunConfig it worked and I got boto3 logs yep. I don’t know about the Dockerfile env syntax
The syntax looks right from what I can tell
But maybe the Prefect logs will be enough to see why the task is being retried so I would run and see the logs still
Talked to the team members and since it works with a small number of JSON files, but not with a larger one, this could be a sign of docker resource limits being hit. I know you mentioned you check the stats but there might be something there hanging
w
@Kevin Kho let me reproduce this with the prefect logs again i think the number of files is not the concern though, because the prefect flow passes the s3 paths from one task to another the prefect task invoke the lambda with a s3 path, the invoked lambda lists number of files in the s3 path and read them
@Kevin Kho that's the prefect log
Copy code
[2021-06-08 01:53:33+0000] INFO - prefect.CloudTaskRunner | Task 'invoke_blob_loader': Starting task run...
[2021-06-08 01:53:33+0000] DEBUG - prefect.CloudTaskRunner | Task 'invoke_blob_loader': Handling state change from Pending to Mapped
[2021-06-08 01:53:34+0000] INFO - prefect.CloudTaskRunner | Task 'invoke_blob_loader': Finished task run for task with final state: 'Mapped'
[2021-06-08 01:53:34+0000] INFO - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Starting task run...
[2021-06-08 01:53:34+0000] DEBUG - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Handling state change from Pending to Running
[2021-06-08 01:53:35+0000] DEBUG - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Calling task.run() method...
[2021-06-08 01:53:35+0000] INFO - prefect | {"flow_id": "8b806ae3-0e7f-471e-882a-3316f6878b3d", "flow_run_id": "fdb9c515-4e79-46d4-8f28-7d6679beb024", "flow_run_name": "refreshing-chameleon", "flow_run_version": 3, "task_id": "f699e90d-11aa-457b-91e5-3b8535e4109a", "task_run_id": "7acfb38a-cb60-473e-8780-55a7d3083140", "event_type": "internal_data_json_event", "event_timestamp": "2021-06-08T01:53:22.084436+00:00", "body": {OUR INTERNAL INPUT}}
[2021-06-08 01:53:38+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:53:54+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:54:09+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:54:25+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:54:41+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:54:56+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:55:12+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 01:55:27+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 01:55:43+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:55:58+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:56:14+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:56:29+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:56:45+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:57:00+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:57:16+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 01:57:31+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:57:47+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:58:02+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:58:18+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Window changed'
[DEBUG tini (1)] Passing signal: 'Window changed'
[2021-06-08 01:58:34+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:58:49+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:59:05+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:59:20+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 01:59:36+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:59:51+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:00:07+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:00:22+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:00:38+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:00:53+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:01:09+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 02:01:24+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 02:01:40+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:01:55+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:02:11+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:02:26+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 02:02:42+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:02:57+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:03:12+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:03:28+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 02:03:43+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:03:58+0000] INFO - prefect | {"request_id": "192daff4-600b-4d16-9715-25c6f8ac9956", "payload": {"status_code": 200, OUR_INTERNAL_OUTPUT}}
[2021-06-08 02:03:58+0000] DEBUG - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Handling state change from Running to Success
[2021-06-08 02:03:59+0000] INFO - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Finished task run for task with final state: 'Success'
if you look into the following attachments, you can notice that the same lambda is triggered twice
the first time is completed approximately at 095908
the second time is triggered at 095852
while the task is should wait for the response returned from the first time
k
Thanks for the detailed logs! Nothing seems to have retried on the Prefect side. If Prefect retired the task also, you would be double logs for
<http://logger.info|logger.info>(json.dumps(dto, default=str))
and you would see a Retry state. Are you using a
LocalExecutor
? I'll ask the team about that
tini
"window changed" to get more info, but I doubt it's that. Could you share the flow code? Just the
with Flow() as flow
and onwards. You can share it privately if there's sensitive info though of course remove it.
I'm wondering if there was
flow.run()
left in the code when it was registered causing 2 runs. This can happen if you have
flow.run()
and then register with the CLI
w
@Kevin Kho yea, im using the
LocalExecutor
. Sure, i will share the flow code with you separately. thank!
we do not have
flow.run()
left in the code =(