Tim Micheletto
12/06/2021, 5:01 AMSnowflakeQuery
task only supports fetch_all
so I have used the snowflake connector directly in my flow. I'm getting some strange issues trying map over the results of the query and distribute them across child tasks with a dask executor. If I try and use fetch_pandas_batches
I get an error indicating the result of that function is not pickleable. Fair enough that is a know limitation, but if use get_result_batches
and call to_pandas
in a mapped task I get some weird behavior. The flow fails with the following error:
Unexpected error: KilledWorker('predict-3815-fc5979b578154850ae381b6b7286993a', <WorkerState '<tcp://10.144.173.27:38363>', name: 4, status: closed, memory: 0, processing: 625>)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
final_states = executor.wait(
File "/usr/local/lib/python3.8/site-packages/prefect/executors/dask.py", line 440, in wait
return self.client.gather(futures)
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1977, in gather
return self.sync(
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 865, in sync
return sync(
File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 327, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 310, in f
result[0] = yield future
File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1842, in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('predict-3815-fc5979b578154850ae381b6b7286993a', <WorkerState '<tcp://10.144.173.27:38363>', name: 4, status: closed, memory: 0, processing: 625>)
and after 10 mins each mapped child task fails with this error
No heartbeat detected from the remote task; marking the run as failed.
Here's what it looks like.
conn_info = {
"account": "some-company.us-east-1",
"user": "some-user",
"role": "some-role",
"warehouse": "some-warehouse",
"client_session_keep_alive": True
}
@task()
def fetch_org_batches(conn_info: Dict) -> List[Any]:
sa_password = get_secret_string(
"some-key")
conn_info['password'] = sa_password
sql = "select * from invoices_with_statement_and_org_info"
with snowflake.connector.connect(**conn_info) as conn:
with conn.cursor() as cur:
cur.execute(sql)
batches = cur.get_result_batches()
num_result_batches = len(batches)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"{num_result_batches} batches retrieved")
return batches
@task()
def predict(batch: Any) -> int:
if batch is not None:
df = batch.to_pandas()
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(df['id'])
return len(df.index)
return 0
@task()
def total(x: Any) -> None:
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"{x} passed to total")
<http://logger.info|logger.info>(f"{sum(x)} rows in total")
def main(branch_name: str) -> None:
with Flow(f"bills-invoices-inference-{branch_name}",
executor=DaskExecutor(cluster_class=lambda: make_executor(image, project_pip_url),
adapt_kwargs={"minimum": 1, "maximum": 10})) as flow:
batches = fetch_org_batches(conn_info)
mapped = predict.map(batches)
total(mapped)
flow.storage = S3(bucket="some-bucket")
flow.run_config = KubernetesRun(
env={
"PREFECT__LOGGING__LEVEL": "INFO",
# Install the latest version of this code from the relevant branch on github.
# Allows for faster iteration without having to rebuild the docker image.
"EXTRA_PIP_PACKAGES": f"dask_kubernetes==2021.10.0 {project_pip_url}"
},
image=image,
labels=[], # TODO: variable per branch normal vs sandbox (also need label on agent)
cpu_limit=2,
cpu_request=2,
memory_limit="5Gi"
)
flow.register(project_name="cups")
# flow.run()
if __name__ == "__main__":
branch = sys.argv[1]
main(branch)
Any ideas what's going on here? Is it possible to support this use case currently?Kevin Kho
Kevin Kho
image
used in both RunConfig and Executor