https://prefect.io logo
Title
i

Ismail Cenik

06/03/2021, 5:34 PM
Hello, our flows have been successfully running for 1 month and made more than 200 successful runs. A new error has just started yesterday. For the last 6 runs, half of them were not completed and stayed in one of our tasks. There is no log information. According to the task logic, the Kinesis Data Analytics must be checked per 60 seconds. After a while, just waiting without doing anything. There is still an active flow staying in that condition, and I have not killed it to show you. Could you please check it?
k

Kevin Kho

06/03/2021, 5:36 PM
Hey @Ismail Cenik, are you on server or cloud?
How often does this flow run? Are you using Dask or Kubernetes?
i

Ismail Cenik

06/03/2021, 5:40 PM
Hello, We run several times during the development. However, the number of runs decreased because of some QA activities. I realized yesterday. Our Prefect Agent has been installed on our EKS. I am using prefect cloud to manage flow runs.
k

Kevin Kho

06/03/2021, 5:42 PM
Did this just happen once? You haven’t tried restarting the flow right?
i

Ismail Cenik

06/03/2021, 5:45 PM
It happened 3 times in the last 24 hours. I have killed the other two and one of them is still active
BTW we have couple of successful runs
k

Kevin Kho

06/03/2021, 5:47 PM
This seems like something to do with the EKS cluster. We won’t have any more information on the Prefect side than you see. Maybe you can check the Kubernetes logs?
i

Ismail Cenik

06/03/2021, 5:48 PM
Prefect just stops to check Kinesis Data Analytics status, only stays like in sleep mode
There is nothing meaningful on Kubernetes logs
k

Kevin Kho

06/03/2021, 6:04 PM
Can you give me an example flow ID?
i

Ismail Cenik

06/03/2021, 6:10 PM
a4699a77-2b95-44c6-9eae-2d134ec45d6a
k

Kevin Kho

06/03/2021, 6:18 PM
So your log ends with
Sleeping for 60s" }
and then nothing else?
i

Ismail Cenik

06/03/2021, 6:22 PM
Yes, per 60 seconds, it must look at the status, but just stays on that case several hours
and no more logs
k

Kevin Kho

06/03/2021, 6:25 PM
Would you know if there is a timeout to requests on Kinesis Data Analytics?
It seems like it’s either the request is going out but hangs, or the kubernetes cluster is resource starved.
Where there other large processes that started yesterday on your cluster?
i

Ismail Cenik

06/03/2021, 6:33 PM
Let me check
The team said that there is no new larger process run.
For the Kinesis Data Analytics, there is no timeout
This is the related code block
while True:
        if isJobDone(app_name, kinesis_client):
            des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
            if des_response['ApplicationDetail']['ApplicationStatus'] == 'READY':
                return
            stop_response = kinesis_client.stop_application(ApplicationName=app_name, Force=False)
            print(f'Stop KDA: {app_name}, response: {stop_response}')
            return
        print(f"Sleeping for {SLEEP_TIME}s")
        time.sleep(int(SLEEP_TIME))
def isJobDone(app_name, kinesis_client):
    des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
    print(f"Check KDA status, job: {app_name}, status: {des_response['ApplicationDetail']['ApplicationStatus']}")
It seems to be stuck on
des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
k

Kevin Kho

06/03/2021, 7:33 PM
Does the kinesis_client have a timeout feature?
i

Ismail Cenik

06/03/2021, 7:36 PM
No
This problem started yesterday. BTW it might be a different error, right?
k

Kevin Kho

06/03/2021, 7:42 PM
I have seen people running long processes (think infinite while loops) on Prefect and it just runs. This causes other issues but I bring this up because a long running task shouldn’t have issues. Based on the code and logs, it does seem to get stuck there.
I know you check every minute, but how often do you expect results from the kinesis_client here?
i

Ismail Cenik

06/03/2021, 7:51 PM
The Kinesis average running time is around 4-5 minutes and we have 10 Kinesis in the flow
k

Kevin Kho

06/03/2021, 7:55 PM
I am wondering if we should change this to use Prefect retries and timeouts instead so that we can timeout if Kinesis takes more than the expected time and hangs
i

Ismail Cenik

06/03/2021, 7:56 PM
This is the current code
@task(log_stdout=True, max_retries=5, retry_delay=timedelta(seconds=60))
def waitPlanDataLoadingTask(app_name):
    kinesis_client = boto3.client(
        'kinesisanalyticsv2', region_name=region,
        aws_access_key_id=key_id, aws_secret_access_key=access_key)
    while True:
        if isJobDone(app_name, kinesis_client):
            des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
            if des_response['ApplicationDetail']['ApplicationStatus'] == 'READY':
                return
            stop_response = kinesis_client.stop_application(ApplicationName=app_name, Force=False)
            print(f'Stop KDA: {app_name}, response: {stop_response}')
            return
        print(f"Sleeping for {SLEEP_TIME}s")
        time.sleep(int(SLEEP_TIME))
We might have some large tasks, depend on the data, so can I update the heading like
@task(log_stdout=True, timeout = 3600, max_retries=5, retry_delay=timedelta(seconds=60))
k

Kevin Kho

06/03/2021, 8:03 PM
I’ll look a bit into Kinesis and see if we have to go down this path
👍 1
Have you tried adding boto3 logs to the flow? Maybe we’ll get more information that way?
i

Ismail Cenik

06/03/2021, 8:16 PM
No, could you give an example?
k

Kevin Kho

06/03/2021, 8:18 PM
This is how to add boto3 logs
i

Ismail Cenik

06/03/2021, 8:57 PM
I will request TOML change from a different team ... Please double-check it.
[logging]
# Extra loggers for Prefect log configuration
extra_loggers = "['boto3']"
Then I can define the environmental variable on the scripts (Register flow/runf flow) like that
export PREFECT__LOGGING__EXTRA_LOGGERS="['boto3']"
And update the code as
import logging
import sys
for l in ['boto3']:
    logger = logging.getLogger(l)
    logger.setLevel('INFO')
    log_stream = logging.StreamHandler(sys.stdout)
    log_stream.setFormatter(LOG_FORMAT)
    logger.addHandler(log_stream)
are they correct?
BTW, where is the place of the TOML file in our case. We have just a prefect agent installed on our EKS.
k

Kevin Kho

06/03/2021, 9:16 PM
You don’t need to update the code. The TOML or environment variable will already work. I think the environment variable in the RunConfig will work so you don’t need to go to the different team. You can do something like
KubernetesRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']")
. The location of the TOML file though is in the home folder under the
.prefect
folder.
Let me confirm this syntax with a team member
Try
KubernetesRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": ['boto3']})
and this might be all you need to get the boto3 logger.
👍 1
i

Ismail Cenik

06/03/2021, 9:27 PM
So I will not change anything in TOML, right. In my local prefect, the file is under ".prefect", but for the cloud case, I do not know where it is.
k

Kevin Kho

06/03/2021, 9:28 PM
Yes I don’t think you have to. Testing now
Ok so I tested this. boto3 logs seem to be in debug and the syntax I gave earlier may have been wrong. This is what you want to attach to the Flow:
flow.run_config = KubernetesRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
👍 1
and then you’ll see boto3 logs in the Flow on Cloud and hopefully that will give us more clues.
i

Ismail Cenik

06/03/2021, 9:55 PM
I have started the flow, and let you know when we see the same issue
I have tried that flow 97539121-0cc3-4665-9b21-453785347130
The structure is the same and the only difference is its environment
After a successful run, the same problem happened again, please check it
k

Kevin Kho

06/04/2021, 12:32 AM
I am not seeing any boto3 logs or debug level logs. Did you add the logging level to be
DEBUG
?
i

Ismail Cenik

06/04/2021, 12:41 AM
I just added what you provided (
flow.run_config = KubernetesRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
). Is there anymore update?
k

Kevin Kho

06/04/2021, 12:45 AM
I assume you re-registered? What storage are you using?
i

Ismail Cenik

06/04/2021, 12:50 AM
Yes, I always registered flows when updating, let me send storage info private
DockerStorage
k

Kevin Kho

06/04/2021, 12:58 AM
So I think what happened is that when you use
DockerStorage
with
KubernetesRun
, the default image pull policy for Kubernetes is
IfNotPresent
meaning that it only pulls the container if it doesn’t have one with the same name. The image gets updated but because it has the same name, Kubernetes doesn’t pull it. Can you add
flow.run_config = KubernetesRun(image_pull_policy="Always")
. This was added in 0.14.18.
What version of Prefect are you on? If you’re below 0.14.18, it needs to be updated on the Kubernetes side
If you use a job template, you can also do this before 0.14.18
i

Ismail Cenik

06/04/2021, 1:06 AM
I am using a job template
I think it is 0.14.21
Let me add "image_pull_policy="Always""
Please check it. The logs are just saying that
There is no Boto3 logs
k

Kevin Kho

06/04/2021, 5:57 AM
That’s weird. I tried setting up the boto3 logs and it worked locally (I read stuff from S3 to test). At this point, the simplest fix would be including an appropriate timeout like:
@task(log_stdout=True, max_retries=200, retry_delay=timedelta(seconds=60), timeout=600)
def waitPlanDataLoadingTask(app_name):
It’s very likely it’s one of the
describe_application
or
stop_application
that’s hanging. If that Kinesis call is only working sometimes and hangs on others. This
timeout
will retry the task if it hangs. The only thing to note is that it will timeout after a certain number of loop iterations. To work with this, you can also change the max_retries to something very generous so that this process will just keep running.
You can try that Kinesis client
describe_application
or
stop_application
on Local instead of Kubernetes and see if you get logs showing up. For me, I did
flow.run_config = LocalRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
and it worked to print out the boto3 logs.
i

Ismail Cenik

06/04/2021, 4:00 PM
Hello, we updated our prefect version to the 0.14.20, now observing
Kevin, we realized that there is a communication error between our Prefect Agent and Prefect cloud
Our ex version was 0.14.14 and it is now 0.14.20.
k

Kevin Kho

06/04/2021, 8:22 PM
Was that causing issues?
i

Ismail Cenik

06/04/2021, 8:45 PM
Not sure, since we are getting the same error
I saw 4 successful runs then get the same error
We talked to George and he suggested to make that upgrade.
This is our flow info
Prefect Core Version is 0.14.21 and our current version on the PRefect Agent side is 0.14.20. Might it be a problem?
k

Kevin Kho

06/04/2021, 9:01 PM
I don’t expect this to be your specific problem but a lot of weird errors do happen when the flow version is higher than the agent version. The agent should be higher