Rob Fowler

10/15/2020, 7:55 AM
another thread, let's say I can't get the above fixed. I was thinking I can just use a concurrent futures executor. The submit and wait model seems pretty normal and I can submit a thread for execution simply with a concurrent.futures.ThreadPoolExecutor, but the wait needs a bit more than the documentation provides, as it does not pass the 'future' produced by the submit and expects something more complex returned (I can wait on it) is there anyone who can point me to some shortcut on this:
I think I just need to build a result object of some sort.
turns out this is how it probably started but it needs to handle some more complex ducktyped input:
Copy code
class CFExecutor(Executor):
    def __init__(self):
        self.pool = ThreadPoolExecutor(3)

    def submit(self, fn: Callable, *args: Any, extra_context: dict = None, **kwargs: Any) -> Any:
        return self.pool.submit(fn, *args, **kwargs)

    def wait(self, futures: Any) -> Any:
        print(f"type {type(futures)}")
        return next(as_completed((futures,))).result()
the doco says `wait()`: accepts a collection of futures and returns their results but it seems to be passed:
Copy code
upstream_states = executor.wait(
                        {e: state for e, state in upstream_states.items()}
Copy code
final_states = executor.wait(
                    t: task_states.get(t, Pending("Task not evaluated by FlowRunner."))
                    for t in final_tasks
obviously localexecutor works as it just passes this back so I guess I need to accept something called 'Edges' and tasks
seems these are 'future-like' objects


10/15/2020, 9:28 PM
Hi! I’ve read through this and am not quite sure what your intention is.
Are you trying to build a
that uses a thread pool?
I see — the documentation does appear to be lacking here. I’ll investigate this as well but it is unlikely I will have a solution for you today.