Payam Vaezi
12/13/2021, 3:09 PM3GiB
memory usage, while running in cloud with prefect server I’m getting above 16GiB
memory usage where job gets killed as a result of that. Any idea what may have caused this discrepancy in memory usage?Kevin Kho
Payam Vaezi
12/13/2021, 3:32 PM32GiB
of memory limit and ran out of memory again!Kevin Kho
flow.run()
also uses the executor to run the Flow so this is surprising. Could you show me how you define your executor?Payam Vaezi
12/13/2021, 3:57 PMimport prefect
prefect_executor_kwargs={"scheduler": "processes", "num_workers": 8, "threads_per_worker": 1}
flow = prefect.Flow(name="test", executor=prefect.executors.LocalDaskExecutor(**prefect_executor_kwargs))
Kevin Kho
Payam Vaezi
12/13/2021, 4:03 PMKevin Kho
@task()
def something(location):
df = pd.read_csv(location)
df['new_col'] = ...
df.to_csv(location)
return location
or you can also try turning checkpointing off so the object doesn’t get persisted
@task(checkpoint=False)
def something(location):
df = pd.read_csv(location)
return df
The Result
interface gives an easy way to save this
@task()
def something(location):
s3_result = S3Result(..., location=...)
df = s3_result.read(location)
df['new_col'] = ...
s3_result.write(..., location=...)
return location
Payam Vaezi
12/13/2021, 4:08 PMclass PrefectTransformer(Task):
def __init__(self, **kwargs):
super().__init__(name="test", checkpoint=False)
...
def run(self, **kwargs):
...
Kevin Kho
Payam Vaezi
12/13/2021, 4:10 PMKevin Kho
Payam Vaezi
12/13/2021, 4:17 PMKevin Kho
Payam Vaezi
12/13/2021, 4:19 PMKevin Kho
Payam Vaezi
12/13/2021, 4:20 PMAnna Geller
Payam Vaezi
12/14/2021, 4:07 PM{'env': None, 'type': 'KubernetesRun', 'image': '***', 'labels': ['default'], 'cpu_limit': '8000m', '__version__': '0.15.7', 'cpu_request': '8000m', 'job_template': {'kind': 'Job', 'spec': {'template': {'spec': {'containers': [{'env': [{'name': 'PREFECT__LOGGING__EXTRA_LOGGERS', 'value': "['mlaasflow']"}, {'name': 'PREFECT__CLOUD__HEARTBEAT_MODE', 'value': 'thread'}], 'name': 'flow', 'image': '***', 'resources': {'limits': {'cpu': '200m'}, 'requests': {'cpu': '200m'}}, 'imagePullPolicy': 'IfNotPresent', 'securityContext': {'readOnlyRootFilesystem': False}}], 'restartPolicy': 'Never', 'securityContext': {'runAsUser': 1000, 'capabilities': {'drop': ['ALL']}, 'runAsNonRoot': True, 'readOnlyRootFilesystem': False}, 'imagePullSecrets': [{'name': ''}]}}}, 'metadata': {'name': 'prefect-job', 'labels': {}}, 'apiVersion': 'batch/v1'}, 'memory_limit': '16384Mi', 'memory_request': '16384Mi', 'image_pull_policy': None, 'job_template_path': None, 'image_pull_secrets': None, 'service_account_name': None}
Anna Geller
flow.executor = LocalDaskExecutor()
#2 Try providing a custom job template to your KubernetesRun - I know you assigned 8000m as CPU request for your flow’s Kubernetes job, but this still looks suspicious:
"resources": {
"limits": {"cpu": "200m"},
"requests": {"cpu": "200m"},
},
Payam Vaezi
12/14/2021, 5:23 PMAnna Geller