81 lines
2.4 KiB
Python
81 lines
2.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from concurrent.futures import Executor, Future, ThreadPoolExecutor
|
|
from types import TracebackType
|
|
from typing import Any, Callable, Dict, Optional, Tuple, Type
|
|
|
|
from pyee.base import EventEmitter
|
|
|
|
__all__ = ["ExecutorEventEmitter"]
|
|
|
|
|
|
class ExecutorEventEmitter(EventEmitter):
|
|
"""An event emitter class which runs handlers in a `concurrent.futures`
|
|
executor.
|
|
|
|
By default, this class creates a default `ThreadPoolExecutor`, but
|
|
a custom executor may also be passed in explicitly to, for instance,
|
|
use a `ProcessPoolExecutor` instead.
|
|
|
|
This class runs all emitted events on the configured executor. Errors
|
|
captured by the resulting Future are automatically emitted on the
|
|
`error` event. This is unlike the EventEmitter, which have no error
|
|
handling.
|
|
|
|
The underlying executor may be shut down by calling the `shutdown`
|
|
method. Alternately you can treat the event emitter as a context manager:
|
|
|
|
```py
|
|
with ExecutorEventEmitter() as ee:
|
|
# Underlying executor open
|
|
|
|
@ee.on('data')
|
|
def handler(data):
|
|
print(data)
|
|
|
|
ee.emit('event')
|
|
|
|
# Underlying executor closed
|
|
```
|
|
|
|
Since the function call is scheduled on an executor, emit is always
|
|
non-blocking.
|
|
|
|
No effort is made to ensure thread safety, beyond using an executor.
|
|
"""
|
|
|
|
def __init__(self, executor: Optional[Executor] = None):
|
|
super(ExecutorEventEmitter, self).__init__()
|
|
if executor:
|
|
self._executor: Executor = executor
|
|
else:
|
|
self._executor = ThreadPoolExecutor()
|
|
|
|
def _emit_run(
|
|
self,
|
|
f: Callable,
|
|
args: Tuple[Any, ...],
|
|
kwargs: Dict[str, Any],
|
|
):
|
|
future: Future = self._executor.submit(f, *args, **kwargs)
|
|
|
|
@future.add_done_callback
|
|
def _callback(f: Future) -> None:
|
|
exc: Optional[BaseException] = f.exception()
|
|
if isinstance(exc, Exception):
|
|
self.emit("error", exc)
|
|
elif exc is not None:
|
|
raise exc
|
|
|
|
def shutdown(self, wait: bool = True) -> None:
|
|
"""Call `shutdown` on the internal executor."""
|
|
|
|
self._executor.shutdown(wait=wait)
|
|
|
|
def __enter__(self) -> "ExecutorEventEmitter":
|
|
return self
|
|
|
|
def __exit__(
|
|
self, type: Type[Exception], value: Exception, traceback: TracebackType
|
|
) -> Optional[bool]:
|
|
self.shutdown()
|