Source code for neoscore.core.propagating_thread
from threading import Thread
# Courtesy of https://stackoverflow.com/a/31614591/5615927
[docs]class PropagatingThread(Thread):
"""A ``Thread`` which propagates uncaught exceptions."""
[docs] def run(self):
self.exc = None
try:
self.ret = self._target(*self._args, **self._kwargs)
except BaseException as e:
self.exc = e
[docs] def join(self, timeout=None):
super(PropagatingThread, self).join(timeout)
if self.exc:
raise self.exc
return self.ret