Payam Vaezi
12/13/2021, 3:09 PM3GiB
memory usage, while running in cloud with prefect server I’m getting above 16GiB
memory usage where job gets killed as a result of that. Any idea what may have caused this discrepancy in memory usage?Kevin Kho
12/13/2021, 3:16 PMPayam Vaezi
12/13/2021, 3:32 PM32GiB
of memory limit and ran out of memory again!Kevin Kho
12/13/2021, 3:47 PMflow.run()
also uses the executor to run the Flow so this is surprising. Could you show me how you define your executor?Payam Vaezi
12/13/2021, 3:57 PMimport prefect
prefect_executor_kwargs={"scheduler": "processes", "num_workers": 8, "threads_per_worker": 1}
flow = prefect.Flow(name="test", executor=prefect.executors.LocalDaskExecutor(**prefect_executor_kwargs))
Kevin Kho
12/13/2021, 4:00 PMPayam Vaezi
12/13/2021, 4:03 PMKevin Kho
12/13/2021, 4:05 PM@task()
def something(location):
df = pd.read_csv(location)
df['new_col'] = ...
df.to_csv(location)
return location
or you can also try turning checkpointing off so the object doesn’t get persisted
@task(checkpoint=False)
def something(location):
df = pd.read_csv(location)
return df
The Result
interface gives an easy way to save this
@task()
def something(location):
s3_result = S3Result(..., location=...)
df = s3_result.read(location)
df['new_col'] = ...
s3_result.write(..., location=...)
return location
Payam Vaezi
12/13/2021, 4:08 PMclass PrefectTransformer(Task):
def __init__(self, **kwargs):
super().__init__(name="test", checkpoint=False)
...
def run(self, **kwargs):
...
Kevin Kho
12/13/2021, 4:10 PMPayam Vaezi
12/13/2021, 4:10 PMKevin Kho
12/13/2021, 4:12 PMPayam Vaezi
12/13/2021, 4:17 PMKevin Kho
12/13/2021, 4:17 PMPayam Vaezi
12/13/2021, 4:19 PMKevin Kho
12/13/2021, 4:20 PMPayam Vaezi
12/13/2021, 4:20 PMAnna Geller
12/13/2021, 10:33 PMPayam Vaezi
12/14/2021, 4:07 PM{'env': None, 'type': 'KubernetesRun', 'image': '***', 'labels': ['default'], 'cpu_limit': '8000m', '__version__': '0.15.7', 'cpu_request': '8000m', 'job_template': {'kind': 'Job', 'spec': {'template': {'spec': {'containers': [{'env': [{'name': 'PREFECT__LOGGING__EXTRA_LOGGERS', 'value': "['mlaasflow']"}, {'name': 'PREFECT__CLOUD__HEARTBEAT_MODE', 'value': 'thread'}], 'name': 'flow', 'image': '***', 'resources': {'limits': {'cpu': '200m'}, 'requests': {'cpu': '200m'}}, 'imagePullPolicy': 'IfNotPresent', 'securityContext': {'readOnlyRootFilesystem': False}}], 'restartPolicy': 'Never', 'securityContext': {'runAsUser': 1000, 'capabilities': {'drop': ['ALL']}, 'runAsNonRoot': True, 'readOnlyRootFilesystem': False}, 'imagePullSecrets': [{'name': ''}]}}}, 'metadata': {'name': 'prefect-job', 'labels': {}}, 'apiVersion': 'batch/v1'}, 'memory_limit': '16384Mi', 'memory_request': '16384Mi', 'image_pull_policy': None, 'job_template_path': None, 'image_pull_secrets': None, 'service_account_name': None}
Anna Geller
12/14/2021, 4:43 PMflow.executor = LocalDaskExecutor()
#2 Try providing a custom job template to your KubernetesRun - I know you assigned 8000m as CPU request for your flow’s Kubernetes job, but this still looks suspicious:
"resources": {
"limits": {"cpu": "200m"},
"requests": {"cpu": "200m"},
},
Payam Vaezi
12/14/2021, 5:23 PMAnna Geller
12/16/2021, 10:11 AM