Hello, our flows have been successfully running fo...
# prefect-server
i
Hello, our flows have been successfully running for 1 month and made more than 200 successful runs. A new error has just started yesterday. For the last 6 runs, half of them were not completed and stayed in one of our tasks. There is no log information. According to the task logic, the Kinesis Data Analytics must be checked per 60 seconds. After a while, just waiting without doing anything. There is still an active flow staying in that condition, and I have not killed it to show you. Could you please check it?
k
Hey @Ismail Cenik, are you on server or cloud?
How often does this flow run? Are you using Dask or Kubernetes?
i
Hello, We run several times during the development. However, the number of runs decreased because of some QA activities. I realized yesterday. Our Prefect Agent has been installed on our EKS. I am using prefect cloud to manage flow runs.
k
Did this just happen once? You haven’t tried restarting the flow right?
i
It happened 3 times in the last 24 hours. I have killed the other two and one of them is still active
BTW we have couple of successful runs
k
This seems like something to do with the EKS cluster. We won’t have any more information on the Prefect side than you see. Maybe you can check the Kubernetes logs?
i
Prefect just stops to check Kinesis Data Analytics status, only stays like in sleep mode
There is nothing meaningful on Kubernetes logs
k
Can you give me an example flow ID?
i
a4699a77-2b95-44c6-9eae-2d134ec45d6a
k
So your log ends with
Sleeping for 60s" }
and then nothing else?
i
Yes, per 60 seconds, it must look at the status, but just stays on that case several hours
and no more logs
k
Would you know if there is a timeout to requests on Kinesis Data Analytics?
It seems like it’s either the request is going out but hangs, or the kubernetes cluster is resource starved.
Where there other large processes that started yesterday on your cluster?
i
Let me check
The team said that there is no new larger process run.
For the Kinesis Data Analytics, there is no timeout
This is the related code block
Copy code
while True:
        if isJobDone(app_name, kinesis_client):
            des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
            if des_response['ApplicationDetail']['ApplicationStatus'] == 'READY':
                return
            stop_response = kinesis_client.stop_application(ApplicationName=app_name, Force=False)
            print(f'Stop KDA: {app_name}, response: {stop_response}')
            return
        print(f"Sleeping for {SLEEP_TIME}s")
        time.sleep(int(SLEEP_TIME))
Copy code
def isJobDone(app_name, kinesis_client):
    des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
    print(f"Check KDA status, job: {app_name}, status: {des_response['ApplicationDetail']['ApplicationStatus']}")
It seems to be stuck on
Copy code
des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
k
Does the kinesis_client have a timeout feature?
i
No
This problem started yesterday. BTW it might be a different error, right?
k
I have seen people running long processes (think infinite while loops) on Prefect and it just runs. This causes other issues but I bring this up because a long running task shouldn’t have issues. Based on the code and logs, it does seem to get stuck there.
I know you check every minute, but how often do you expect results from the kinesis_client here?
i
The Kinesis average running time is around 4-5 minutes and we have 10 Kinesis in the flow
k
I am wondering if we should change this to use Prefect retries and timeouts instead so that we can timeout if Kinesis takes more than the expected time and hangs
i
This is the current code
Copy code
@task(log_stdout=True, max_retries=5, retry_delay=timedelta(seconds=60))
def waitPlanDataLoadingTask(app_name):
    kinesis_client = boto3.client(
        'kinesisanalyticsv2', region_name=region,
        aws_access_key_id=key_id, aws_secret_access_key=access_key)
    while True:
        if isJobDone(app_name, kinesis_client):
            des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
            if des_response['ApplicationDetail']['ApplicationStatus'] == 'READY':
                return
            stop_response = kinesis_client.stop_application(ApplicationName=app_name, Force=False)
            print(f'Stop KDA: {app_name}, response: {stop_response}')
            return
        print(f"Sleeping for {SLEEP_TIME}s")
        time.sleep(int(SLEEP_TIME))
We might have some large tasks, depend on the data, so can I update the heading like
Copy code
@task(log_stdout=True, timeout = 3600, max_retries=5, retry_delay=timedelta(seconds=60))
k
I’ll look a bit into Kinesis and see if we have to go down this path
👍 1
Have you tried adding boto3 logs to the flow? Maybe we’ll get more information that way?
i
No, could you give an example?
k
This is how to add boto3 logs
i
I will request TOML change from a different team ... Please double-check it.
Copy code
[logging]
# Extra loggers for Prefect log configuration
extra_loggers = "['boto3']"
Then I can define the environmental variable on the scripts (Register flow/runf flow) like that
Copy code
export PREFECT__LOGGING__EXTRA_LOGGERS="['boto3']"
And update the code as
Copy code
import logging
import sys
for l in ['boto3']:
    logger = logging.getLogger(l)
    logger.setLevel('INFO')
    log_stream = logging.StreamHandler(sys.stdout)
    log_stream.setFormatter(LOG_FORMAT)
    logger.addHandler(log_stream)
are they correct?
BTW, where is the place of the TOML file in our case. We have just a prefect agent installed on our EKS.
k
You don’t need to update the code. The TOML or environment variable will already work. I think the environment variable in the RunConfig will work so you don’t need to go to the different team. You can do something like
KubernetesRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']")
. The location of the TOML file though is in the home folder under the
.prefect
folder.
Let me confirm this syntax with a team member
Try
KubernetesRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": ['boto3']})
and this might be all you need to get the boto3 logger.
👍 1
i
So I will not change anything in TOML, right. In my local prefect, the file is under ".prefect", but for the cloud case, I do not know where it is.
k
Yes I don’t think you have to. Testing now
Ok so I tested this. boto3 logs seem to be in debug and the syntax I gave earlier may have been wrong. This is what you want to attach to the Flow:
flow.run_config = KubernetesRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
👍 1
and then you’ll see boto3 logs in the Flow on Cloud and hopefully that will give us more clues.
i
I have started the flow, and let you know when we see the same issue
I have tried that flow 97539121-0cc3-4665-9b21-453785347130
The structure is the same and the only difference is its environment
After a successful run, the same problem happened again, please check it
k
I am not seeing any boto3 logs or debug level logs. Did you add the logging level to be
DEBUG
?
i
I just added what you provided (
flow.run_config = KubernetesRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
). Is there anymore update?
k
I assume you re-registered? What storage are you using?
i
Yes, I always registered flows when updating, let me send storage info private
DockerStorage
k
So I think what happened is that when you use
DockerStorage
with
KubernetesRun
, the default image pull policy for Kubernetes is
IfNotPresent
meaning that it only pulls the container if it doesn’t have one with the same name. The image gets updated but because it has the same name, Kubernetes doesn’t pull it. Can you add
flow.run_config = KubernetesRun(image_pull_policy="Always")
. This was added in 0.14.18.
What version of Prefect are you on? If you’re below 0.14.18, it needs to be updated on the Kubernetes side
If you use a job template, you can also do this before 0.14.18
i
I am using a job template
I think it is 0.14.21
Let me add "image_pull_policy="Always""
Please check it. The logs are just saying that
There is no Boto3 logs
k
That’s weird. I tried setting up the boto3 logs and it worked locally (I read stuff from S3 to test). At this point, the simplest fix would be including an appropriate timeout like:
Copy code
@task(log_stdout=True, max_retries=200, retry_delay=timedelta(seconds=60), timeout=600)
def waitPlanDataLoadingTask(app_name):
It’s very likely it’s one of the
describe_application
or
stop_application
that’s hanging. If that Kinesis call is only working sometimes and hangs on others. This
timeout
will retry the task if it hangs. The only thing to note is that it will timeout after a certain number of loop iterations. To work with this, you can also change the max_retries to something very generous so that this process will just keep running.
You can try that Kinesis client
describe_application
or
stop_application
on Local instead of Kubernetes and see if you get logs showing up. For me, I did
flow.run_config = LocalRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
and it worked to print out the boto3 logs.
i
Hello, we updated our prefect version to the 0.14.20, now observing
Kevin, we realized that there is a communication error between our Prefect Agent and Prefect cloud
Our ex version was 0.14.14 and it is now 0.14.20.
k
Was that causing issues?
i
Not sure, since we are getting the same error
I saw 4 successful runs then get the same error
We talked to George and he suggested to make that upgrade.
This is our flow info
Prefect Core Version is 0.14.21 and our current version on the PRefect Agent side is 0.14.20. Might it be a problem?
k
I don’t expect this to be your specific problem but a lot of weird errors do happen when the flow version is higher than the agent version. The agent should be higher