-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathevents_task.py
More file actions
82 lines (65 loc) · 2.43 KB
/
events_task.py
File metadata and controls
82 lines (65 loc) · 2.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""sdk internal events task."""
import logging
import threading
import abc
_LOGGER = logging.getLogger(__name__)
class EventsTaskBase(object, metaclass=abc.ABCMeta):
"""task template."""
@abc.abstractmethod
def is_running(self):
"""Return whether the task is running."""
@abc.abstractmethod
def start(self):
"""Start task."""
@abc.abstractmethod
def stop(self):
"""Stop task."""
class EventsTask(EventsTaskBase):
"""sdk internal events processing task."""
_centinel = object()
def __init__(self, notify_internal_events, internal_events_queue):
"""
Class constructor.
:param synchronize_segment: handler to perform segment synchronization on incoming event
:type synchronize_segment: function
:param segment_queue: queue with segment updates notifications
:type segment_queue: queue
"""
self._internal_events_queue = internal_events_queue
self._handler = notify_internal_events
self._running = False
self._worker = None
def is_running(self):
"""Return whether the working is running."""
return self._running
def _run(self):
"""Run worker handler."""
while self.is_running():
event = self._internal_events_queue.get()
if not self.is_running():
break
if event == self._centinel:
continue
_LOGGER.debug('Processing sdk internal event: %s', event.internal_event)
try:
self._handler(event.internal_event, event.metadata)
except Exception:
_LOGGER.error('Exception raised in events manager')
_LOGGER.debug('Exception information: ', exc_info=True)
def start(self):
"""Start worker."""
if self.is_running():
_LOGGER.debug('Worker is already running')
return
self._running = True
_LOGGER.debug('Starting Event Task worker')
self._worker = threading.Thread(target=self._run, name='EventsTaskWorker', daemon=True)
self._worker.start()
def stop(self):
"""Stop worker."""
_LOGGER.debug('Stopping Event Task worker')
if not self.is_running():
_LOGGER.debug('Worker is not running. Ignoring.')
return
self._running = False
self._internal_events_queue.put(self._centinel)