Wai Kiat Tan
06/04/2021, 6:30 AMWai Kiat Tan
06/04/2021, 6:32 AMKevin Kho
Wai Kiat Tan
06/04/2021, 1:51 PMlambda_client = create_client(connect_timeout=900, read_timeout=900)
Kevin Kho
Wai Kiat Tan
06/04/2021, 1:54 PMclient = session.client("lambda", config=config, region_name="ap-southeast-1")
Wai Kiat Tan
06/04/2021, 1:55 PMfrom botocore.config import Config
config = Config(
connect_timeout=connect_timeout,
read_timeout=read_timeout,
)
^ that's the original code that i haveKevin Kho
Wai Kiat Tan
06/04/2021, 1:59 PMKevin Kho
LocalRun(_env_={"PREFECT__LOGGING__LEVEL": "DEBUG","PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3']"})
. boto3 gives nothing on the info levelWai Kiat Tan
06/04/2021, 2:06 PMenv
in the prefect official docker file instead?Wai Kiat Tan
06/04/2021, 2:08 PMprefect agent local start -t $PREFECT_SERVICE_ACCOUNT
locally in my machine, the flow is able to execute perfectly fineWai Kiat Tan
06/04/2021, 2:09 PMKevin Kho
PREFECT__TASKS__DEFAULTS__MAX_RETRIES=4
as seen here . Yes you can set them in the Dockerfile but most people just supply it in the task like @task(max_retries = 3, retry_delay = …, timeout=900)
. Oh thanks for sharing that. Are you sure your container has access to outside traffic?Wai Kiat Tan
06/04/2021, 2:14 PMKevin Kho
Wai Kiat Tan
06/04/2021, 2:17 PMWai Kiat Tan
06/04/2021, 2:18 PMdavzucky
06/04/2021, 2:18 PMdavzucky
06/04/2021, 2:19 PMWai Kiat Tan
06/04/2021, 2:20 PMdavzucky
06/04/2021, 2:20 PMWai Kiat Tan
06/04/2021, 2:20 PMdavzucky
06/04/2021, 2:21 PMdavzucky
06/04/2021, 2:22 PMdavzucky
06/04/2021, 2:23 PMWai Kiat Tan
06/04/2021, 2:24 PMWai Kiat Tan
06/04/2021, 2:25 PMWai Kiat Tan
06/04/2021, 2:26 PMWai Kiat Tan
06/04/2021, 2:27 PMdocker stats
, everything looks okayKevin Kho
davzucky
06/04/2021, 2:52 PMdavzucky
06/04/2021, 2:54 PMWai Kiat Tan
06/04/2021, 3:04 PM@task(
name="",
target="",
checkpoint=True,
result=S3Result(f""),
)
def invoke_blob_loader(table_path, flow_date, source_bucket, target_bucket):
body = {
"table_path": table_path,
"flow_date": flow_date,
"source_bucket": source_bucket,
"target_bucket": target_bucket,
}
dto = construct_dto(body)
<http://logger.info|logger.info>(json.dumps(dto, default=str))
response = invoke(lambda_client, f"de-json2bq-{environment}-blob-loader", dto)
validated_response = validate_lambda_response(response)
<http://logger.info|logger.info>(json.dumps(validated_response))
return validated_response["payload"]
Wai Kiat Tan
06/04/2021, 3:04 PMWai Kiat Tan
06/04/2021, 3:05 PMWai Kiat Tan
06/04/2021, 3:05 PMWai Kiat Tan
06/04/2021, 4:26 PMENV PREFECT__LOGGING__LEVEL="DEBUG"
ENV PREFECT__LOGGING__EXTRA_LOGGERS="['boto3']"
added this into my dockerfileWai Kiat Tan
06/04/2021, 4:30 PMFROM prefecthq/prefect:0.14.21-python3.8
ARG PREFECT_SERVICE_ACCOUNT
ARG ENVIRONMENT
ENV ENVIRONMENT ${ENVIRONMENT}
ENV PREFECT__LOGGING__LEVEL="DEBUG"
ENV PREFECT__LOGGING__EXTRA_LOGGERS="['boto3']"
RUN prefect backend cloud
CMD prefect agent local start -t ***********
am i expecting to see some aws logs here? just curiousKevin Kho
Kevin Kho
Kevin Kho
Kevin Kho
Wai Kiat Tan
06/05/2021, 4:33 AMWai Kiat Tan
06/08/2021, 3:28 AM[2021-06-08 01:53:33+0000] INFO - prefect.CloudTaskRunner | Task 'invoke_blob_loader': Starting task run...
[2021-06-08 01:53:33+0000] DEBUG - prefect.CloudTaskRunner | Task 'invoke_blob_loader': Handling state change from Pending to Mapped
[2021-06-08 01:53:34+0000] INFO - prefect.CloudTaskRunner | Task 'invoke_blob_loader': Finished task run for task with final state: 'Mapped'
[2021-06-08 01:53:34+0000] INFO - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Starting task run...
[2021-06-08 01:53:34+0000] DEBUG - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Handling state change from Pending to Running
[2021-06-08 01:53:35+0000] DEBUG - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Calling task.run() method...
[2021-06-08 01:53:35+0000] INFO - prefect | {"flow_id": "8b806ae3-0e7f-471e-882a-3316f6878b3d", "flow_run_id": "fdb9c515-4e79-46d4-8f28-7d6679beb024", "flow_run_name": "refreshing-chameleon", "flow_run_version": 3, "task_id": "f699e90d-11aa-457b-91e5-3b8535e4109a", "task_run_id": "7acfb38a-cb60-473e-8780-55a7d3083140", "event_type": "internal_data_json_event", "event_timestamp": "2021-06-08T01:53:22.084436+00:00", "body": {OUR INTERNAL INPUT}}
[2021-06-08 01:53:38+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:53:54+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:54:09+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:54:25+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:54:41+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:54:56+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:55:12+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 01:55:27+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 01:55:43+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:55:58+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:56:14+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:56:29+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:56:45+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:57:00+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:57:16+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 01:57:31+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:57:47+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:58:02+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:58:18+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Window changed'
[DEBUG tini (1)] Passing signal: 'Window changed'
[2021-06-08 01:58:34+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:58:49+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:59:05+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:59:20+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 01:59:36+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 01:59:51+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:00:07+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:00:22+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:00:38+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:00:53+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:01:09+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 02:01:24+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 02:01:40+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:01:55+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:02:11+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:02:26+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 02:02:42+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:02:57+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:03:12+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:03:28+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[DEBUG tini (1)] Passing signal: 'Urgent I/O condition'
[2021-06-08 02:03:43+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-06-08 02:03:58+0000] INFO - prefect | {"request_id": "192daff4-600b-4d16-9715-25c6f8ac9956", "payload": {"status_code": 200, OUR_INTERNAL_OUTPUT}}
[2021-06-08 02:03:58+0000] DEBUG - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Handling state change from Running to Success
[2021-06-08 02:03:59+0000] INFO - prefect.CloudTaskRunner | Task 'invoke_blob_loader[0]': Finished task run for task with final state: 'Success'
Wai Kiat Tan
06/08/2021, 3:37 AMWai Kiat Tan
06/08/2021, 3:38 AMWai Kiat Tan
06/08/2021, 3:39 AMWai Kiat Tan
06/08/2021, 3:39 AMKevin Kho
<http://logger.info|logger.info>(json.dumps(dto, default=str))
and you would see a Retry state. Are you using a LocalExecutor
? I'll ask the team about that tini
"window changed" to get more info, but I doubt it's that. Could you share the flow code? Just the with Flow() as flow
and onwards. You can share it privately if there's sensitive info though of course remove it.Kevin Kho
flow.run()
left in the code when it was registered causing 2 runs. This can happen if you have flow.run()
and then register with the CLIWai Kiat Tan
06/08/2021, 5:05 AMLocalExecutor
. Sure, i will share the flow code with you separately. thank!Wai Kiat Tan
06/08/2021, 5:06 AMflow.run()
left in the code =(