Tim Micheletto
12/06/2021, 5:01 AMSnowflakeQuery
task only supports fetch_all
so I have used the snowflake connector directly in my flow. I'm getting some strange issues trying map over the results of the query and distribute them across child tasks with a dask executor. If I try and use fetch_pandas_batches
I get an error indicating the result of that function is not pickleable. Fair enough that is a know limitation, but if use get_result_batches
and call to_pandas
in a mapped task I get some weird behavior. The flow fails with the following error:
Unexpected error: KilledWorker('predict-3815-fc5979b578154850ae381b6b7286993a', <WorkerState '<tcp://10.144.173.27:38363>', name: 4, status: closed, memory: 0, processing: 625>)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
final_states = executor.wait(
File "/usr/local/lib/python3.8/site-packages/prefect/executors/dask.py", line 440, in wait
return self.client.gather(futures)
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1977, in gather
return self.sync(
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 865, in sync
return sync(
File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 327, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 310, in f
result[0] = yield future
File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1842, in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('predict-3815-fc5979b578154850ae381b6b7286993a', <WorkerState '<tcp://10.144.173.27:38363>', name: 4, status: closed, memory: 0, processing: 625>)
and after 10 mins each mapped child task fails with this error
No heartbeat detected from the remote task; marking the run as failed.
Here's what it looks like.
conn_info = {
"account": "some-company.us-east-1",
"user": "some-user",
"role": "some-role",
"warehouse": "some-warehouse",
"client_session_keep_alive": True
}
@task()
def fetch_org_batches(conn_info: Dict) -> List[Any]:
sa_password = get_secret_string(
"some-key")
conn_info['password'] = sa_password
sql = "select * from invoices_with_statement_and_org_info"
with snowflake.connector.connect(**conn_info) as conn:
with conn.cursor() as cur:
cur.execute(sql)
batches = cur.get_result_batches()
num_result_batches = len(batches)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"{num_result_batches} batches retrieved")
return batches
@task()
def predict(batch: Any) -> int:
if batch is not None:
df = batch.to_pandas()
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(df['id'])
return len(df.index)
return 0
@task()
def total(x: Any) -> None:
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"{x} passed to total")
<http://logger.info|logger.info>(f"{sum(x)} rows in total")
def main(branch_name: str) -> None:
with Flow(f"bills-invoices-inference-{branch_name}",
executor=DaskExecutor(cluster_class=lambda: make_executor(image, project_pip_url),
adapt_kwargs={"minimum": 1, "maximum": 10})) as flow:
batches = fetch_org_batches(conn_info)
mapped = predict.map(batches)
total(mapped)
flow.storage = S3(bucket="some-bucket")
flow.run_config = KubernetesRun(
env={
"PREFECT__LOGGING__LEVEL": "INFO",
# Install the latest version of this code from the relevant branch on github.
# Allows for faster iteration without having to rebuild the docker image.
"EXTRA_PIP_PACKAGES": f"dask_kubernetes==2021.10.0 {project_pip_url}"
},
image=image,
labels=[], # TODO: variable per branch normal vs sandbox (also need label on agent)
cpu_limit=2,
cpu_request=2,
memory_limit="5Gi"
)
flow.register(project_name="cups")
# flow.run()
if __name__ == "__main__":
branch = sys.argv[1]
main(branch)
Any ideas what's going on here? Is it possible to support this use case currently?Kevin Kho
Kevin Kho
image
used in both RunConfig and ExecutorBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by