Ismail Cenik
06/03/2021, 5:34 PMKevin Kho
Kevin Kho
Ismail Cenik
06/03/2021, 5:40 PMKevin Kho
Ismail Cenik
06/03/2021, 5:45 PMIsmail Cenik
06/03/2021, 5:46 PMKevin Kho
Ismail Cenik
06/03/2021, 5:48 PMIsmail Cenik
06/03/2021, 5:58 PMKevin Kho
Ismail Cenik
06/03/2021, 6:10 PMKevin Kho
Sleeping for 60s" }
and then nothing else?Ismail Cenik
06/03/2021, 6:22 PMIsmail Cenik
06/03/2021, 6:22 PMKevin Kho
Kevin Kho
Kevin Kho
Ismail Cenik
06/03/2021, 6:33 PMIsmail Cenik
06/03/2021, 7:06 PMIsmail Cenik
06/03/2021, 7:06 PMIsmail Cenik
06/03/2021, 7:07 PMIsmail Cenik
06/03/2021, 7:10 PMwhile True:
if isJobDone(app_name, kinesis_client):
des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
if des_response['ApplicationDetail']['ApplicationStatus'] == 'READY':
return
stop_response = kinesis_client.stop_application(ApplicationName=app_name, Force=False)
print(f'Stop KDA: {app_name}, response: {stop_response}')
return
print(f"Sleeping for {SLEEP_TIME}s")
time.sleep(int(SLEEP_TIME))
Ismail Cenik
06/03/2021, 7:10 PMdef isJobDone(app_name, kinesis_client):
des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
print(f"Check KDA status, job: {app_name}, status: {des_response['ApplicationDetail']['ApplicationStatus']}")
Ismail Cenik
06/03/2021, 7:11 PMdes_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
Kevin Kho
Ismail Cenik
06/03/2021, 7:36 PMIsmail Cenik
06/03/2021, 7:37 PMKevin Kho
Kevin Kho
Ismail Cenik
06/03/2021, 7:51 PMKevin Kho
Ismail Cenik
06/03/2021, 7:56 PM@task(log_stdout=True, max_retries=5, retry_delay=timedelta(seconds=60))
def waitPlanDataLoadingTask(app_name):
kinesis_client = boto3.client(
'kinesisanalyticsv2', region_name=region,
aws_access_key_id=key_id, aws_secret_access_key=access_key)
while True:
if isJobDone(app_name, kinesis_client):
des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
if des_response['ApplicationDetail']['ApplicationStatus'] == 'READY':
return
stop_response = kinesis_client.stop_application(ApplicationName=app_name, Force=False)
print(f'Stop KDA: {app_name}, response: {stop_response}')
return
print(f"Sleeping for {SLEEP_TIME}s")
time.sleep(int(SLEEP_TIME))
Ismail Cenik
06/03/2021, 7:57 PM@task(log_stdout=True, timeout = 3600, max_retries=5, retry_delay=timedelta(seconds=60))
Kevin Kho
Kevin Kho
Ismail Cenik
06/03/2021, 8:16 PMKevin Kho
Ismail Cenik
06/03/2021, 8:57 PM[logging]
# Extra loggers for Prefect log configuration
extra_loggers = "['boto3']"
Then I can define the environmental variable on the scripts (Register flow/runf flow) like that
export PREFECT__LOGGING__EXTRA_LOGGERS="['boto3']"
And update the code as
import logging
import sys
for l in ['boto3']:
logger = logging.getLogger(l)
logger.setLevel('INFO')
log_stream = logging.StreamHandler(sys.stdout)
log_stream.setFormatter(LOG_FORMAT)
logger.addHandler(log_stream)
are they correct?Ismail Cenik
06/03/2021, 9:05 PMKevin Kho
KubernetesRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']")
. The location of the TOML file though is in the home folder under the .prefect
folder.Kevin Kho
Kevin Kho
KubernetesRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": ['boto3']})
and this might be all you need to get the boto3 logger.Ismail Cenik
06/03/2021, 9:27 PMKevin Kho
Kevin Kho
flow.run_config = KubernetesRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
Kevin Kho
Ismail Cenik
06/03/2021, 9:55 PMIsmail Cenik
06/03/2021, 11:55 PMIsmail Cenik
06/03/2021, 11:55 PMIsmail Cenik
06/04/2021, 12:08 AMKevin Kho
DEBUG
?Ismail Cenik
06/04/2021, 12:41 AMflow.run_config = KubernetesRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
). Is there anymore update?Kevin Kho
Ismail Cenik
06/04/2021, 12:50 AMIsmail Cenik
06/04/2021, 12:51 AMKevin Kho
DockerStorage
with KubernetesRun
, the default image pull policy for Kubernetes is IfNotPresent
meaning that it only pulls the container if it doesn’t have one with the same name. The image gets updated but because it has the same name, Kubernetes doesn’t pull it. Can you add flow.run_config = KubernetesRun(image_pull_policy="Always")
. This was added in 0.14.18.Kevin Kho
Kevin Kho
Ismail Cenik
06/04/2021, 1:06 AMIsmail Cenik
06/04/2021, 1:11 AMIsmail Cenik
06/04/2021, 1:11 AMIsmail Cenik
06/04/2021, 5:09 AMIsmail Cenik
06/04/2021, 5:09 AMKevin Kho
@task(log_stdout=True, max_retries=200, retry_delay=timedelta(seconds=60), timeout=600)
def waitPlanDataLoadingTask(app_name):
It’s very likely it’s one of the describe_application
or stop_application
that’s hanging. If that Kinesis call is only working sometimes and hangs on others. This timeout
will retry the task if it hangs. The only thing to note is that it will timeout after a certain number of loop iterations. To work with this, you can also change the max_retries to something very generous so that this process will just keep running.Kevin Kho
describe_application
or stop_application
on Local instead of Kubernetes and see if you get logs showing up. For me, I did flow.run_config = LocalRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
and it worked to print out the boto3 logs.Ismail Cenik
06/04/2021, 4:00 PMIsmail Cenik
06/04/2021, 8:17 PMIsmail Cenik
06/04/2021, 8:17 PMKevin Kho
Ismail Cenik
06/04/2021, 8:45 PMIsmail Cenik
06/04/2021, 8:45 PMIsmail Cenik
06/04/2021, 8:46 PMIsmail Cenik
06/04/2021, 8:53 PMIsmail Cenik
06/04/2021, 8:54 PMKevin Kho