g.suijker
04/08/2021, 11:23 AMMarko Herkaliuk
04/08/2021, 1:05 PMKevin Kho
g.suijker
04/08/2021, 1:42 PMMarko Herkaliuk
04/08/2021, 1:45 PMg.suijker
04/08/2021, 1:53 PMKevin Kho
Marko Herkaliuk
04/08/2021, 2:10 PMg.suijker
04/08/2021, 2:11 PMg.suijker
04/08/2021, 2:14 PMKevin Kho
Dylan
When running the flow in Prefect Server, with Docker storage and a Docker agent I get around 1000 rows/sec insertions into the sql server db.
While in Prefect Cloud, with Docker storage and a Kubernetes agent I get around 40 rows/sec insertions.
Dylan
Dylan
Dylan
Dylan
What also struck me was that after we switched to the new payment plan with unlimited concurrency, the performance on Cloud for this specific run deteriorated (running times increased by 2x). I do not understand how that could happen, since it is just one flow without mapping or dependent tasks.
Depending on where/when your Flow Runs are being executed in K8s, the increased concurrency may have put additional resource strain on your cluster. Your Flow’s config for the docker vs. k8s setups should help us debug 😄g.suijker
04/08/2021, 2:31 PMfor Prefect Server:
storage = Docker(
python_dependencies=["pyodbc", "pandas", "PyMySQL"],
dockerfile = 'dockerfile',
)
run_config = DockerRun(labels=['kube-office'])
with Flow(f"{flow_name}", storage=storage, run_config=run_config) as flow:
------------------
Prefect Cloud:
docker_storage = Docker(
registry_url="xxx",
image_name=f"prefect/{flow_name}",
image_tag="latest",
python_dependencies=["pyodbc", "pandas", "PyMySQL"],
dockerfile = 'dockerfile',
)
run_config = KubernetesRun(labels=['kube-office'])
with Flow(f"{flow_name}", storage=storage, run_config=run_config) as flow:
Using the same dockerfile for both
Dylan
cpu_limit
, cpu_request
, mem_limit
, and mem_request
to make sure the Kubernetes job that the Flow Run executes in has the proper resources availableDylan
Dylan
Dylan
Dylan
g.suijker
04/08/2021, 2:59 PMDylan
g.suijker
04/12/2021, 2:51 PMDylan
g.suijker
04/13/2021, 9:06 AMZanie
Zanie
g.suijker
04/13/2021, 2:58 PMZanie
g.suijker
04/14/2021, 7:09 AMg.suijker
04/23/2021, 7:28 AMdef sqlServerExecute(query, params=None):
"""Execute query with values provided to sqlServer"""
dwh_connection_string = connection_string
connection = pyodbc.connect(dwh_connection_string)
try:
cursor = connection.cursor()
cursor.fast_executemany = True
if params:
cursor.executemany(query, params)
else:
cursor.execute(query)
connection.commit()
except (Exception, pyodbc.DatabaseError):
connection.rollback()
raise
finally:
cursor.close()
connection.close()
Which we changed to, where the connection was made before the insert loop and closed after :
def sqlServerExecute(query, connection, params=None):
"""Execute query with values provided to sqlServer"""
try:
cursor = connection.cursor()
cursor.fast_executemany = True
if params:
cursor.executemany(query, params)
else:
cursor.execute(query)
connection.commit()
except (Exception, pyodbc.DatabaseError):
connection.rollback()
raise
finally:
cursor.close()
This resulted in equal insert performance locally (flow.run()) and Cloud with Docker storage and Kubernetes Agent.g.suijker
04/23/2021, 7:40 AMg.suijker
04/23/2021, 9:28 AMZanie