Ismail Cenik
06/03/2021, 5:34 PMKevin Kho
06/03/2021, 5:36 PMIsmail Cenik
06/03/2021, 5:40 PMKevin Kho
06/03/2021, 5:42 PMIsmail Cenik
06/03/2021, 5:45 PMKevin Kho
06/03/2021, 5:47 PMIsmail Cenik
06/03/2021, 5:48 PMKevin Kho
06/03/2021, 6:04 PMIsmail Cenik
06/03/2021, 6:10 PMKevin Kho
06/03/2021, 6:18 PMSleeping for 60s" }
and then nothing else?Ismail Cenik
06/03/2021, 6:22 PMKevin Kho
06/03/2021, 6:25 PMIsmail Cenik
06/03/2021, 6:33 PMwhile True:
if isJobDone(app_name, kinesis_client):
des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
if des_response['ApplicationDetail']['ApplicationStatus'] == 'READY':
return
stop_response = kinesis_client.stop_application(ApplicationName=app_name, Force=False)
print(f'Stop KDA: {app_name}, response: {stop_response}')
return
print(f"Sleeping for {SLEEP_TIME}s")
time.sleep(int(SLEEP_TIME))
def isJobDone(app_name, kinesis_client):
des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
print(f"Check KDA status, job: {app_name}, status: {des_response['ApplicationDetail']['ApplicationStatus']}")
des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
Kevin Kho
06/03/2021, 7:33 PMIsmail Cenik
06/03/2021, 7:36 PMKevin Kho
06/03/2021, 7:42 PMIsmail Cenik
06/03/2021, 7:51 PMKevin Kho
06/03/2021, 7:55 PMIsmail Cenik
06/03/2021, 7:56 PM@task(log_stdout=True, max_retries=5, retry_delay=timedelta(seconds=60))
def waitPlanDataLoadingTask(app_name):
kinesis_client = boto3.client(
'kinesisanalyticsv2', region_name=region,
aws_access_key_id=key_id, aws_secret_access_key=access_key)
while True:
if isJobDone(app_name, kinesis_client):
des_response = kinesis_client.describe_application(ApplicationName=app_name, IncludeAdditionalDetails=True)
if des_response['ApplicationDetail']['ApplicationStatus'] == 'READY':
return
stop_response = kinesis_client.stop_application(ApplicationName=app_name, Force=False)
print(f'Stop KDA: {app_name}, response: {stop_response}')
return
print(f"Sleeping for {SLEEP_TIME}s")
time.sleep(int(SLEEP_TIME))
@task(log_stdout=True, timeout = 3600, max_retries=5, retry_delay=timedelta(seconds=60))
Kevin Kho
06/03/2021, 8:03 PMIsmail Cenik
06/03/2021, 8:16 PMKevin Kho
06/03/2021, 8:18 PMIsmail Cenik
06/03/2021, 8:57 PM[logging]
# Extra loggers for Prefect log configuration
extra_loggers = "['boto3']"
Then I can define the environmental variable on the scripts (Register flow/runf flow) like that
export PREFECT__LOGGING__EXTRA_LOGGERS="['boto3']"
And update the code as
import logging
import sys
for l in ['boto3']:
logger = logging.getLogger(l)
logger.setLevel('INFO')
log_stream = logging.StreamHandler(sys.stdout)
log_stream.setFormatter(LOG_FORMAT)
logger.addHandler(log_stream)
are they correct?Kevin Kho
06/03/2021, 9:16 PMKubernetesRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']")
. The location of the TOML file though is in the home folder under the .prefect
folder.KubernetesRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": ['boto3']})
and this might be all you need to get the boto3 logger.Ismail Cenik
06/03/2021, 9:27 PMKevin Kho
06/03/2021, 9:28 PMflow.run_config = KubernetesRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
Ismail Cenik
06/03/2021, 9:55 PMKevin Kho
06/04/2021, 12:32 AMDEBUG
?Ismail Cenik
06/04/2021, 12:41 AMflow.run_config = KubernetesRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
). Is there anymore update?Kevin Kho
06/04/2021, 12:45 AMIsmail Cenik
06/04/2021, 12:50 AMKevin Kho
06/04/2021, 12:58 AMDockerStorage
with KubernetesRun
, the default image pull policy for Kubernetes is IfNotPresent
meaning that it only pulls the container if it doesn’t have one with the same name. The image gets updated but because it has the same name, Kubernetes doesn’t pull it. Can you add flow.run_config = KubernetesRun(image_pull_policy="Always")
. This was added in 0.14.18.Ismail Cenik
06/04/2021, 1:06 AMKevin Kho
06/04/2021, 5:57 AM@task(log_stdout=True, max_retries=200, retry_delay=timedelta(seconds=60), timeout=600)
def waitPlanDataLoadingTask(app_name):
It’s very likely it’s one of the describe_application
or stop_application
that’s hanging. If that Kinesis call is only working sometimes and hangs on others. This timeout
will retry the task if it hangs. The only thing to note is that it will timeout after a certain number of loop iterations. To work with this, you can also change the max_retries to something very generous so that this process will just keep running.describe_application
or stop_application
on Local instead of Kubernetes and see if you get logs showing up. For me, I did flow.run_config = LocalRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
and it worked to print out the boto3 logs.Ismail Cenik
06/04/2021, 4:00 PMKevin Kho
06/04/2021, 8:22 PMIsmail Cenik
06/04/2021, 8:45 PMKevin Kho
06/04/2021, 9:01 PM