Hi, I'm trying to utilise the ability to fetch bat...
# ask-community
t
Hi, I'm trying to utilise the ability to fetch batches of results from a snowflake query as described here in a prefect flow. In notice that the prefect
SnowflakeQuery
task only supports
fetch_all
so I have used the snowflake connector directly in my flow. I'm getting some strange issues trying map over the results of the query and distribute them across child tasks with a dask executor. If I try and use
fetch_pandas_batches
I get an error indicating the result of that function is not pickleable. Fair enough that is a know limitation, but if use
get_result_batches
and call
to_pandas
in a mapped task I get some weird behavior. The flow fails with the following error:
Copy code
Unexpected error: KilledWorker('predict-3815-fc5979b578154850ae381b6b7286993a', <WorkerState '<tcp://10.144.173.27:38363>', name: 4, status: closed, memory: 0, processing: 625>)
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
    final_states = executor.wait(
  File "/usr/local/lib/python3.8/site-packages/prefect/executors/dask.py", line 440, in wait
    return self.client.gather(futures)
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1977, in gather
    return self.sync(
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 865, in sync
    return sync(
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 327, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 310, in f
    result[0] = yield future
  File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1842, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('predict-3815-fc5979b578154850ae381b6b7286993a', <WorkerState '<tcp://10.144.173.27:38363>', name: 4, status: closed, memory: 0, processing: 625>)
and after 10 mins each mapped child task fails with this error
Copy code
No heartbeat detected from the remote task; marking the run as failed.
Here's what it looks like.
Copy code
conn_info = {
    "account": "some-company.us-east-1",
    "user": "some-user",
    "role": "some-role",
    "warehouse": "some-warehouse",
    "client_session_keep_alive": True
}


@task()
def fetch_org_batches(conn_info: Dict) -> List[Any]:

    sa_password = get_secret_string(
        "some-key")
    conn_info['password'] = sa_password

    sql = "select * from invoices_with_statement_and_org_info"

    with snowflake.connector.connect(**conn_info) as conn:
        with conn.cursor() as cur:
            cur.execute(sql)

            batches = cur.get_result_batches()

            num_result_batches = len(batches)

            logger = prefect.context.get("logger")

            <http://logger.info|logger.info>(f"{num_result_batches} batches retrieved")

            return batches


@task()
def predict(batch: Any) -> int:
    if batch is not None:
        df = batch.to_pandas()

        logger = prefect.context.get("logger")

        <http://logger.info|logger.info>(df['id'])

        return len(df.index)
    return 0


@task()
def total(x: Any) -> None:
    logger = prefect.context.get("logger")

    <http://logger.info|logger.info>(f"{x} passed to total")
    <http://logger.info|logger.info>(f"{sum(x)} rows in total")

def main(branch_name: str) -> None:
    
    with Flow(f"bills-invoices-inference-{branch_name}",
              executor=DaskExecutor(cluster_class=lambda: make_executor(image, project_pip_url),
                                    adapt_kwargs={"minimum": 1, "maximum": 10})) as flow:

        batches = fetch_org_batches(conn_info)

        mapped = predict.map(batches)
        total(mapped)

    flow.storage = S3(bucket="some-bucket")

    flow.run_config = KubernetesRun(
        env={
            "PREFECT__LOGGING__LEVEL": "INFO",
            # Install the latest version of this code from the relevant branch on github.
            # Allows for faster iteration without having to rebuild the docker image.
            "EXTRA_PIP_PACKAGES": f"dask_kubernetes==2021.10.0 {project_pip_url}"
        },
        image=image,
        labels=[],  # TODO: variable per branch normal vs sandbox (also need label on agent)
        cpu_limit=2,
        cpu_request=2,
        memory_limit="5Gi"
    )

    flow.register(project_name="cups")
    # flow.run()

if __name__ == "__main__":
    branch = sys.argv[1]

    main(branch)
Any ideas what's going on here? Is it possible to support this use case currently?
k
Hey @Tim Micheletto, ultimately what is happening here is the Dask KilledWorker error. Kind of hard to tell what is going on. Have you seen the Dask docs about KilledWorkers ? There are also some diagnosing steps on that page. From experience, I’ve seen this with version mismatches causing errors and memory issues. The heartbeat not being detected in expected. This is Prefect alerting you it lost communication with your Flow. The Flow process updates its state by hitting the Prefect API. If the process dies, there is nothing to update the state. Could you please move the traceback and code to the thread when you get a chance so that we don’t crowd the main channel?
Do you have the same image for the Kubernetes run and Dask cluster? I think so but just making sure since I see the
image
used in both RunConfig and Executor