Hmm, now I’m getting `ERROR - prefect.FlowRunner |...
# ask-community
w
Hmm, now I’m getting
ERROR - prefect.FlowRunner | Unexpected error: TypeError('code() takes at most 15 arguments (16 given)')
Is this potentially because I’m running Python 3.9?
k
This is very likely a Python version mismatch between registration and runtime yep
w
Copy code
Flow URL: <http://localhost:8080/default/flow/1b956c24-0b66-497e-8cc8-51dc72be26c0>
 └── ID: 0f8714ad-ce14-4577-bb32-5a52380cc798
 └── Project: demo
 └── Labels: []
[2021-08-23 20:49:59-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'increment a random sample'
[2021-08-23 20:49:59-0400] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `dask_kubernetes.core.KubeCluster`...
Creating scheduler pod on cluster. This may take some time.
[2021-08-23 20:50:10-0400] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://dask-wilsonbilkovich-af889c88-c.prefect:8787/status>
INFO:prefect.DaskExecutor:The Dask dashboard is available at <http://dask-wilsonbilkovich-af889c88-c.prefect:8787/status>
[2021-08-23 20:50:28-0400] ERROR - prefect.FlowRunner | Unexpected error: TypeError('code() takes at most 15 arguments (16 given)')
Traceback (most recent call last):
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/envs/addemart/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/envs/addemart/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
    final_states = executor.wait(
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/envs/addemart/lib/python3.9/site-packages/prefect/executors/dask.py", line 424, in wait
    return self.client.gather(futures)
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/envs/addemart/lib/python3.9/site-packages/distributed/client.py", line 1948, in gather
    return self.sync(
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/envs/addemart/lib/python3.9/site-packages/distributed/client.py", line 845, in sync
    return sync(
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/envs/addemart/lib/python3.9/site-packages/distributed/utils.py", line 325, in sync
    raise exc.with_traceback(tb)
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/envs/addemart/lib/python3.9/site-packages/distributed/utils.py", line 308, in f
    result[0] = yield future
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/envs/addemart/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/envs/addemart/lib/python3.9/site-packages/distributed/client.py", line 1813, in _gather
    raise exception.with_traceback(traceback)
  File "/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
TypeError: code() takes at most 15 arguments (16 given)
ERROR:prefect.FlowRunner:Unexpected error: TypeError('code() takes at most 15 arguments (16 given)')
k
Maybe you need to specify the Python version for your images? If you’re using Dask, you would need consistent versions for registration, agent, scheduler and workers
w
That makes sense, it started happening when I switched from
daskdev/dask:latest
to
prefecthq/prefect:latest
k
w
I guess I’ll use the dask image for now, it seems to make it past this step. Any idea how I can debug what I now see further?
Copy code
[2021-08-24 12:04:09-0400] INFO - prefect.S3 | Uploading script ./demo_flow.py to prefect/flows/demo/random-sample-wlb in addepar-research-data
Flow URL: <http://localhost:8080/default/flow/1b956c24-0b66-497e-8cc8-51dc72be26c0>
 └── ID: 23dddc05-7835-4cbe-9dbe-4365a5444db5
 └── Project: demo
 └── Labels: []
[2021-08-24 12:04:12-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'increment a random sample'
[2021-08-24 12:04:12-0400] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `dask_kubernetes.core.KubeCluster`...
Creating scheduler pod on cluster. This may take some time.
[2021-08-24 12:04:23-0400] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://dask-wilsonbilkovich-eb6dad6c-2.prefect:8787/status>
INFO:prefect.DaskExecutor:The Dask dashboard is available at <http://dask-wilsonbilkovich-eb6dad6c-2.prefect:8787/status>
/Users/wilson.bilkovich/.pyenv/versions/3.9.6/envs/addemart/lib/python3.9/site-packages/distributed/client.py:1105: VersionMismatchWarning: Mismatched versions found

+-------------+-----------+-----------+---------+
| Package     | client    | scheduler | workers |
+-------------+-----------+-----------+---------+
| blosc       | None      | 1.10.2    | None    |
| dask        | 2021.08.0 | 2021.08.1 | None    |
| distributed | 2021.08.0 | 2021.08.1 | None    |
| lz4         | None      | 3.1.3     | None    |
+-------------+-----------+-----------+---------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
[2021-08-24 12:04:34-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
INFO:prefect.FlowRunner:Flow run SUCCESS: all reference tasks succeeded
ERROR:asyncio:Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x122f875b0>
ERROR:asyncio:Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x122f87490>
ERROR:asyncio:Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x12300f160>, 27.302937278)]']
connector: <aiohttp.connector.TCPConnector object at 0x122f879d0>
ERROR:asyncio:Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x122f40be0>, 27.39289196)]']
connector: <aiohttp.connector.TCPConnector object at 0x122f87790>
ERROR:asyncio:Fatal error on SSL transport
protocol: <asyncio.sslproto.SSLProtocol object at 0x122faa460>
transport: <_SelectorSocketTransport closing fd=17>
Traceback (most recent call last):
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/lib/python3.9/asyncio/selector_events.py", line 918, in write
    n = self._sock.send(data)
OSError: [Errno 9] Bad file descriptor

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/lib/python3.9/asyncio/sslproto.py", line 684, in _process_write_backlog
    self._transport.write(chunk)
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/lib/python3.9/asyncio/selector_events.py", line 924, in write
    self._fatal_error(exc, 'Fatal write error on socket transport')
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/lib/python3.9/asyncio/selector_events.py", line 719, in _fatal_error
    self._force_close(exc)
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/lib/python3.9/asyncio/selector_events.py", line 731, in _force_close
    self._loop.call_soon(self._call_connection_lost, exc)
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/lib/python3.9/asyncio/base_events.py", line 746, in call_soon
    self._check_closed()
  File "/Users/wilson.bilkovich/.pyenv/versions/3.9.6/lib/python3.9/asyncio/base_events.py", line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Some sort of TLS error but not a very descriptive one
Also it worries me that the Worker versions all say None. I feel like I’m configuring the worker image?
k
Yeah it’s kinda concerning all the workers say none.
KubernetesRun
is the client image (or whatever is in your agent).
DaskExectuor
should give the worker pods. Are you still using KubeCluster?
w
I am, yeah.
Copy code
f.run_config = KubernetesRun(
    image="daskdev/dask:latest", env={"EXTRA_PIP_PACKAGES": "prefect"}
)

executor = DaskExecutor(
    cluster_class=KubeCluster,
    cluster_kwargs=dict(
        namespace="prefect",
        pod_template=pod_spec,
        n_workers=1,
    ),
)
k
Oh wait I think I know what this is
Are you using anything
boto3
related?
aiobotocore==1.4.0
, released last friday and broke a lot of stuff. workaround is to pin to
aiobotocore==1.3.3
w
I guess I am, in the sense that I’m using S3 storage, which wraps boto I believe
So, the output has
client
,
scheduler
, and
worker
versions.. client is configured by RunConfig, scheduler is configured by Executor? If that’s true, what configures
worker
?
k
With KubeCluster specifically, they can be configured independently by providing the
scheduler_pod_template
, but it defaults to the
pod_template
anyway so if you set
pod_template
, it and nothing for the scheduler, it should apply from my understanding. I think something may have just gone wrong with worker initialization in this case?
w
Hmm. Capturing the error output, assuming there is some, seems to be a bit tricky.
I wonder if I can tell Dask not to clean up worker pods
Everything says ‘success’ when I run it, but I never see the logger output I’m expecting
k
Work that happens on the Dask workers write their logs to the workers, but Dask doesn’t have a native way to ship logs between workers and the client. You need some kind of third party service in order to persist logs elsewhere. The native Python logger loses its configuration once pickled and shipped to the workers, and then unpickled. So even if the
CloudHandler
is attached on the Client side, it doesn’t make its way to the workers.
Also, I’m not sure there is a way to leave the pods on for
KubeCluster
per this