Hello. I'm still test driving Prefect (v0.14.15), ...
# ask-community
a
Hello. I'm still test driving Prefect (v0.14.15), but my flow stucks and I get zombie processes. Any ideas? Here's the code without imports:
Copy code
SQL_DIR = Path('sql')

@task
def list_query_names():
    return [f.name for f in SQL_DIR.glob('*.sql')]

@task(log_stdout=True, timeout=15, task_run_name='{name}-{date:%F_%T}', checkpoint=False)
def exec_query(name: str):
    sql = Path(SQL_DIR / name).read_text()
    print('Query name: %s' % name)
    engine = sqla.create_engine(DSN)
    rs = engine.execute(sql)
    return dict(keys=rs.keys(), rows=rs.fetchall())

@task
def save_results(rs, name):
    with (OUT_DIR / name).with_suffix('.txt').open('w') as f:
        csv_writer = csv.writer(f, delimiter="\t")
        csv_writer.writerow(rs['keys'])
        csv_writer.writerows(rs['rows'])

with Flow("Queries") as flow:
    query_names = list_query_names()
    results = exec_query.map(query_names)
    save_results.map(results, query_names)
    
flow.executor = LocalDaskExecutor(num_workers=2, schedule='processes')
flow.run()
the imports are:
Copy code
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
import csv
import sqlalchemy as sqla
from pathlib import Path
DSN is any any database connection URLs accepted by SQLAlchemy
d
Hey @Andor Tóth! Can you tell me a bit more about the error you see for the zombies?
How are you running this flow with an agent?
Sometimes Zombies can result from resource starvation, so I’d like to rule that ou tfirst
a
I am executing it locally as a Python script, like
python -i queries.py
, but the result is the same if it is run by an agent
The process list shows the following:
Copy code
prefect  3964559  0.3  1.2 585352 74788 pts/13   Sl+  17:21   0:02  |                   \_ python -i queries.py                                                               
prefect  3964730  0.0  0.1  53468 11980 pts/13   S+   17:21   0:00  |                       \_ /srv/prefect/venv/bin/python -c from multiprocessing.semaphore_tracker import main;main(8) 
prefect  3964732  0.2  0.0      0     0 pts/13   Z+   17:21   0:02  |                       \_ [python] <defunct>
andor.t+ 3936805  0.0  0.0  24112  3796 pts/15   Ss   17:14   0:00  \_ -bash
and the script executes indefinitely
until it is interrupted
output of
prefect diagnostics
follows:
Copy code
{
  "config_overrides": {},
  "env_vars": [],
  "system_information": {
    "platform": "Linux-4.18.0-240.10.1.el8_3.x86_64-x86_64-with-centos-8",
    "prefect_backend": "server",
    "prefect_version": "0.14.15",
    "python_version": "3.6.8"
  }
}
the queries are small
and the output is at most a few hundred records
and the number of columns is below 10
d
How about the logs for the Flow Run?
Or an ID (you can find this in the URL of the Flow Run page)
a
Copy code
[2021-04-06 19:13:59+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'Queries'
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'list_query_names': Starting task run...
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'list_query_names': Finished task run for task with final state: 'Success'
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'exec_query': Starting task run...
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'exec_query': Finished task run for task with final state: 'Mapped'
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'save_results': Starting task run...
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'save_results': Finished task run for task with final state: 'Mapped'
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'exec_query[0]': Starting task run...
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'exec_query[1]': Starting task run...[2021-04-06 19:14:01+0200] INFO - prefect.exec_query[0] | Query name: menu_ab.sql
[2021-04-06 19:14:01+0200] INFO - prefect.exec_query[1] | Query name: zuzda_ct_by_url_last_hour_src.sql
[2021-04-06 19:14:04+0200] INFO - prefect.exec_query[1] | Columns: ['day', 'ts', 'source', 'campaign_id', 'row_id', 'url', 'ct']
[2021-04-06 19:14:04+0200] INFO - prefect.exec_query[0] | Columns: ['day', 'menu', 'submenu', 'page_version', 'ct', 'uc']                                                                                                          [2021-04-06 19:14:05+0200] INFO - prefect.TaskRunner | Task 'exec_query[0]': Finished task run for task with final state: 'Success'
[2021-04-06 19:14:05+0200] INFO - prefect.TaskRunner | Task 'save_results[0]': Starting task run...
[2021-04-06 19:14:05+0200] INFO - prefect.TaskRunner | Task 'save_results[0]': Finished task run for task with final state: 'Success'
[2021-04-06 19:14:05+0200] INFO - prefect.TaskRunner | Task 'exec_query[2]': Starting task run...
[2021-04-06 19:14:06+0200] INFO - prefect.exec_query[2] | Query name: aktualis_ct_by_url_15m.sql
[2021-04-06 19:14:08+0200] INFO - prefect.exec_query[2] | Columns: ['ct', 'url']
[2021-04-06 19:14:09+0200] INFO - prefect.TaskRunner | Task 'exec_query[2]': Finished task run for task with final state: 'Success'
[2021-04-06 19:14:09+0200] INFO - prefect.TaskRunner | Task 'save_results[2]': Starting task run...
[2021-04-06 19:14:09+0200] INFO - prefect.TaskRunner | Task 'save_results[2]': Finished task run for task with final state: 'Success'
after that, nothing happens, until I hit CTRL+C
i've tries these queries separately, and for the last time I've got a timeout for the query handle
d
Does that happen with a different executor?
a
LocalExecutor works fine
d
How about the DaskExecutor?
Even if you run in a local setup (so not in a multi-machine/distributed environment) it still uses a different scheduler
My suspicion is that the LocalDaskExecutor may be having trouble with the
timeout
parameter
But I’m not sure
a
yeah, it seems like that
in the meantime, I'm testing different scenarios
but nothing really changes
d
So, I think you might want to consider one of two paths
If that timeout isn’t that critical, I might suggest just removing it
a
it is critical
Copy code
^C[2021-04-06 19:22:36+0200] INFO - prefect.LocalDaskExecutor | Attempting to interrupt and cancel all running tasks...



^CTraceback (most recent call last):
  File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/executors/dask.py", line 542, in start
    yield
  File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 657, in get_flow_run_state
    s.map_states = executor.wait(mapped_children[t])
  File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/executors/dask.py", line 627, in wait
    futures, scheduler=self.scheduler, pool=self._pool, optimize_graph=False
  File "/srv/prefect/venv/lib/python3.6/site-packages/dask/base.py", line 565, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/srv/prefect/venv/lib/python3.6/site-packages/dask/threaded.py", line 84, in get
    **kwargs
  File "/srv/prefect/venv/lib/python3.6/site-packages/dask/local.py", line 476, in get_async
    key, res_info, failed = queue_get(queue)
  File "/srv/prefect/venv/lib/python3.6/site-packages/dask/local.py", line 133, in queue_get
    return q.get()
  File "/usr/lib64/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/usr/lib64/python3.6/threading.py", line 295, in wait
    waiter.acquire()
KeyboardInterrupt
i got this stack trace
the bottom is weird, because I thought that I'm using processes, not threads
d
We’re going to be rolling out a new feature in the soon-ish future called “Flow SLAs” where you can, for example, tell cloud to “For this Flow, set any Flow Run running longer than x time into a Cancelled State”
I believe the scheduler is still threaded even if the workers are processes
a
ok, that sounds great
d
Meantime, I’d appreciate it if you’d open an issue for the timeouts problem you’re running into above
I fully expect it’s because of the LocalDaskExecutor + timeouts interaction since it’s totally fine running with the local executor
You can also try the DaskExecutor on its own (like I said, even in local mode it still uses a different scheduler)
a
ok, if I can find a simple wait to reproduce, then I will
thanks for the tip
d
👍
And thank you for letting us know!
a
It worked, DaskExecutor finished without a problem
d
Excellent
this is definitely an issue with the LocalDaskExecutor
I’ll open an issue from this thread, I think that’s sufficient
@Marvin open “Timeout Failing with LocalDaskExecutor”
m
z
Hey @Andor Tóth -- I wrote the multiprocess task timeout code 🙂 to enforce a timeout for a task in a process, we run a thread to execute your task then retrieve the data from a queue (as you see in the traceback) with the given timeout. This means the data that's passed must be serializable by cloudpickle -- is it possible that is not the case?
Oh actually, I think I may be wrong -- if you're running your tasks in processes it should just be using a thread (because we can spawn threads) whereas when your task is run in a thread we have to spawn a process to track it 🤦‍♂️ it's a bit confusing because it flip-flops
The queue get call appears to be in dask -- I'm not sure if this is a bug on our side as it looks like there's a
waiter
in dask itself that's blocking execution... hm..
@Andor Tóth -- step 2 of testing, I ran this locally and got a ton of errors from Dask about multiprocessing without a
main
check. Does your issue go away if you do
Copy code
if __name__ == "__main__":
    flow.run()
a
let me check this out
actually I'm already doing it
so this does not matter
records of data is passed
which are tuples of primitive data types
which should be serializable by cloudpickle (but i'm gonna test that)
I have made a trial with timeouts (sql:
select sleep(20000)
), but it did not triggered the error
so I suppose must be data dependent
cloudpickle could serialize the resultset without problem
z
Copy code
import csv
from time import sleep
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor


SLEEP = 20


@task
def list_query_names():
    return ["a", "b", "c"]

@task(log_stdout=True, timeout=15, task_run_name='{name}-{date:%F_%T}', checkpoint=False)
def exec_query(name: str):
    sleep(SLEEP)
    return dict(keys={"d", "e", "f"}, rows=["x", "y", "z"])

@task
def save_results(rs, name):
    with open(name, 'w') as f:
        csv_writer = csv.writer(f, delimiter="\t")
        csv_writer.writerow(rs['keys'])
        csv_writer.writerows(rs['rows'])

with Flow("Queries") as flow:
    query_names = list_query_names()
    results = exec_query.map(query_names)
    save_results.map(results, query_names)
    
flow.executor = LocalDaskExecutor(num_workers=2, schedule='processes')
if __name__ == "__main__":
    flow.run()
errors with a sleep of 20 and runs fine with <15
a
yeah, I'm also exchanging pieces to find out what triggers
with very simple queries, there are no problems
like:
select "a", sleep(10000)
z
Maybe the engine is leaving a hanging process, can you try putting it a
with
per https://docs.sqlalchemy.org/en/14/core/connections.html#basic-usage ?
a
sure
it's not that
Copy code
@task(log_stdout=True, timeout=15, task_run_name='{name}-{date:%F_%T}', checkpoint=False)
def exec_query(name: str):                                                               
    sql = Path(SQL_DIR / name).read_text()                                               
                                                                                         
    print('Query name: %s' % name)                                                       
    engine = sqla.create_engine(DSN)                                                     
    with engine.connect() as conn:                                                       
        rs = conn.execute(sql)                                                           
        results = dict(keys=rs.keys(), rows=rs.fetchall())                               
                                                                                         
    print('Columns: %s' % results['keys'])                                               
                                                                                         
    return results
I have also tried with Python 3.9
z
Sorry if you've said this already, but when it hangs is it hanging after that
print('Columns ...
line or before?
a
Here's the most recent output:
Copy code
[2021-04-06 21:38:34+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'Queries'
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'list_query_names': Starting task run...
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'list_query_names': Finished task run for task with final state: 'Success'
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'exec_query': Starting task run...
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'exec_query': Finished task run for task with final state: 'Mapped'
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'save_results': Starting task run...
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'save_results': Finished task run for task with final state: 'Mapped'
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'exec_query[0]': Starting task run...
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'exec_query[1]': Starting task run...
Query name: aktualis_ct_by_url_15m.sql
Query name: menu_ab.sql
Columns: ['ct', 'url']
Columns: ['day', 'menu', 'submenu', 'page_version', 'ct', 'uc']
[2021-04-06 21:38:37+0200] INFO - prefect.TaskRunner | Task 'exec_query[0]': Finished task run for task with final state: 'Success'
[2021-04-06 21:38:37+0200] INFO - prefect.TaskRunner | Task 'save_results[0]': Starting task run...
[2021-04-06 21:38:37+0200] INFO - prefect.TaskRunner | Task 'save_results[0]': Finished task run for task with final state: 'Success'
[2021-04-06 21:38:37+0200] INFO - prefect.TaskRunner | Task 'exec_query[1]': Finished task run for task with final state: 'Success'
[2021-04-06 21:38:37+0200] INFO - prefect.TaskRunner | Task 'exec_query[2]': Starting task run...
[2021-04-06 21:38:37+0200] INFO - prefect.TaskRunner | Task 'save_results[1]': Starting task run...
[2021-04-06 21:38:37+0200] INFO - prefect.TaskRunner | Task 'save_results[1]': Finished task run for task with final state: 'Success'
Query name: zuzda_ct_by_url_last_hour_src.sql
Columns: ['day', 'ts', 'source', 'campaign_id', 'row_id', 'url', 'ct']
and no lines after that
the process list
Copy code
# ps faux | grep '[q]ueries' -A3 
prefect  4183313  1.5  0.9 556492 55784 pts/9    Sl+  21:38   0:01  |                   \_ python -i queries.py
prefect  4183320  0.0  0.2  53468 12172 pts/9    S+   21:38   0:00  |                       \_ /srv/prefect/venv/bin/python -c from multiprocessing.semaphore_tracker import main;main(7)
prefect  4183331  2.1  0.0      0     0 pts/9    Z+   21:38   0:01  |                       \_ [python] <defunct>
andor.t+  909727  0.0  0.0  24092     8 pts/3    Ss   Mar08   0:00  \_ -bash
and after pressing CTRL+C
Copy code
^C[2021-04-06 21:41:16+0200] INFO - prefect.LocalDaskExecutor | Attempting to interrupt and cancel all running tasks...
^CTraceback (most recent call last):
  File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/executors/dask.py", line 542, in start
    yield
  File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 657, in get_flow_run_state
    s.map_states = executor.wait(mapped_children[t])
  File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/executors/dask.py", line 627, in wait
    futures, scheduler=self.scheduler, pool=self._pool, optimize_graph=False
  File "/srv/prefect/venv/lib/python3.6/site-packages/dask/base.py", line 565, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/srv/prefect/venv/lib/python3.6/site-packages/dask/threaded.py", line 84, in get
    **kwargs
  File "/srv/prefect/venv/lib/python3.6/site-packages/dask/local.py", line 476, in get_async
    key, res_info, failed = queue_get(queue)
  File "/srv/prefect/venv/lib/python3.6/site-packages/dask/local.py", line 133, in queue_get
    return q.get()
  File "/usr/lib64/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/usr/lib64/python3.6/threading.py", line 295, in wait
    waiter.acquire()
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "queries.py", line 43, in <module>
    flow.run()
  File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/core/flow.py", line 1266, in run
    **kwargs,
  File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/core/flow.py", line 1087, in _run
    **kwargs,
  File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 282, in run
    executor=executor,
  File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/utilities/executors.py", line 71, in inner
    return runner_method(self, *args, **kwargs)
  File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 661, in get_flow_run_state
    assert isinstance(final_states, dict)
  File "/usr/lib64/python3.6/contextlib.py", line 99, in __exit__
    self.gen.throw(type, value, traceback)
  File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/executors/dask.py", line 552, in start
    self._pool.join()
  File "/usr/lib64/python3.6/multiprocessing/pool.py", line 550, in join
    p.join()
  File "/usr/lib64/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/usr/lib64/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
I've got to go now
tomorrow I am going to find this out
z
You can set
PREFECT__LOGGING__LEVEL="DEBUG"
and we'll get some more logs from the task runner
Should look something like
Copy code
[2021-04-06 14:43:36-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Calling task.run() method...
[2021-04-06 14:43:36-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Attaching process based timeout handler...
[2021-04-06 14:43:36-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Sending execution to a new process...
[2021-04-06 14:43:36-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[0]': Waiting for process to return with 15s timeout...
[2021-04-06 14:43:36-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Waiting for process to return with 15s timeout...
[2021-04-06 14:43:37-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[0]': Executing...
[2021-04-06 14:43:37-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Executing...
[2021-04-06 14:43:37-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[0]': Passing result back to main process...
[2021-04-06 14:43:37-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Passing result back to main process...
[2021-04-06 14:43:37-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[0]': Execution process closed, collecting result...
[2021-04-06 14:43:37-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Execution process closed, collecting result...
a
looks like this to me:
Copy code
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Passing result back to main process...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[0]': Execution process closed, collecting result...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[0]': Handling state change from Running to Success
[2021-04-06 21:44:28+0200] INFO - prefect.TaskRunner | Task 'exec_query[0]': Finished task run for task with final state: 'Success'
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Execution process closed, collecting result...
[2021-04-06 21:44:28+0200] INFO - prefect.TaskRunner | Task 'save_results[0]': Starting task run...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'save_results[0]': Handling state change from Pending to Running
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'save_results[0]': Calling task.run() method...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Handling state change from Running to Success
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'save_results[0]': Handling state change from Running to Success
[2021-04-06 21:44:28+0200] INFO - prefect.TaskRunner | Task 'exec_query[1]': Finished task run for task with final state: 'Success'
[2021-04-06 21:44:28+0200] INFO - prefect.TaskRunner | Task 'save_results[0]': Finished task run for task with final state: 'Success'
[2021-04-06 21:44:28+0200] INFO - prefect.TaskRunner | Task 'save_results[1]': Starting task run...
[2021-04-06 21:44:28+0200] INFO - prefect.TaskRunner | Task 'exec_query[2]': Starting task run...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'save_results[1]': Handling state change from Pending to Running
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Handling state change from Pending to Running
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'save_results[1]': Calling task.run() method...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Calling task.run() method...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Attaching process based timeout handler...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'save_results[1]': Handling state change from Running to Success
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Sending execution to a new process...
[2021-04-06 21:44:28+0200] INFO - prefect.TaskRunner | Task 'save_results[1]': Finished task run for task with final state: 'Success'
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Waiting for process to return with 15s timeout...
[2021-04-06 21:44:29+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Executing...
Query name: zuzda_ct_by_url_last_hour_src.sql
Columns: ['day', 'ts', 'source', 'campaign_id', 'row_id', 'url', 'ct']
[2021-04-06 21:44:32+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Passing result back to main process...
[2021-04-06 21:44:43+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Execution process closed, collecting result...
it has something to do with this query:
zuzda_ct_by_url_last_hour_src.sql
but standalone, LocalExecutor and DaskExecutor could execute it
each query runs in 2-3 seconds
so this should not be a timeout issue
okay, I have replace the potentially bad query with a simple one:
select * from tmp.zuzda_test
where tmp.zuzda_test contains the resultset of the original query
and nothing has changed
z
Looks like it executes and returns to the main process without hanging... hmm
a
i have also replaced the other 2 queries to simple ones:
select "A", sleep(1000)
with only 10 rows, it's working
with a 100 it also does
300 succeeds
but 500 don't
it dies over 433+ rows
z
That's perplexing. Is it dependent on the data being returned? It's possible this is a weird sqlalchemy/dask/prefect combined bug
a
tomorrow i'm going to try it with PostgreSQL and SQLite
maybe it depends on the driver
once again, thanks for your support
z
Something like this is going to be super hard to track down. Happy to help, but my pragmatic suggestion is to just use the
DaskExecutor
instead of the
LocalDaskExecutor
-- Dask itself typically recommends using the distributed executor even for local work.