librelay
47class LibRelay02(Thread): 48 def __init__(self, device:str="/dev/ttyACM0", bauds:int=115200) -> None: 49 super().__init__() 50 self._device = device 51 self._bauds = bauds 52 self._running = True 53 self._filedesc = None 54 self._input_queue = None 55 self._output_queue = None 56 self._populate_relay_states_apis() 57 58 def input_queue(self) -> Optional[Queue]: 59 """ 60 Gets the input queue 61 returns: Optional[Queue] 62 """ 63 return self._input_queue 64 65 def set_input_queue(self, q:Queue) -> None: 66 """ 67 Assign a queue for incoming commands 68 q: the input queue 69 returns: None 70 """ 71 self._input_queue = q 72 73 def output_queue(self) -> Optional[Queue]: 74 """ 75 Gets the output queue 76 returns: Optional[Queue] 77 """ 78 return self._output_queue 79 80 def set_output_queue(self, q:Queue) -> None: 81 """ 82 Assign a queue for outgoing messages 83 q: the output queue 84 returns: None 85 """ 86 self._output_queue = q 87 88 def setup(self) -> bool: 89 """ 90 Setups the class, open the serial port 91 with the configuration 92 returns: True on success, else False 93 """ 94 try: 95 self._filedesc = serial.Serial( 96 self._device, 97 self._bauds, 98 parity=serial.PARITY_NONE, 99 bytesize=8, 100 stopbits=1 101 ) 102 except Exception as e: 103 print(str(e)) 104 return False 105 else: 106 return True 107 finally: 108 pass 109 110 def __clear_queue(self, q:Queue) -> None: 111 """ 112 Utility to clear a queue 113 returns: None 114 """ 115 while not q.empty(): 116 try: 117 q.get(block=False) 118 except Exception as e: 119 logger.warning(str(e)) 120 continue 121 q.task_done() 122 123 def quit(self) -> None: 124 """ 125 Close the file descriptor 126 returns: None 127 """ 128 self._running = False 129 self.__clear_queue(self._input_queue) 130 self._filedesc.close() 131 132 def version(self) -> bytes: 133 """ 134 Ask the system its version through the serial port 135 returns: bytes 136 """ 137 to_write = struct.pack('!B', RelayQueries.VERSION.value) 138 self._filedesc.write(to_write) 139 time.sleep(0.1) 140 res = self._filedesc.read(size=2) 141 return res 142 143 def status(self) -> bytes: 144 """ 145 Ask the system each contacteur states 146 returns: bytes 147 """ 148 to_write = struct.pack('!B', RelayQueries.STATUS.value) 149 self._filedesc.write(to_write) 150 time.sleep(0.1) 151 res = self._filedesc.read(size=1) 152 return res 153 154 def _populate_relay_states_apis(self) -> None: 155 """ 156 Create methods that will be assignated to the current class, 157 based on the RelayStates and RelayQueries classes 158 returns: None 159 """ 160 def __make_command_fct(instance, name, value): 161 def tmp_fct() -> bytes: 162 to_write = struct.pack('!B', value) 163 res = instance._filedesc.write(to_write) 164 time.sleep(0.1) 165 return res 166 tmp_fct.__name__ = name 167 tmp_fct.__qualname__ = name 168 tmp_fct.__doc__ = name 169 return tmp_fct 170 171 instance = self 172 for member in RelayStates: 173 f = __make_command_fct(instance, member.name.lower(), member.value) 174 setattr(instance, member.name.lower(), f) 175 176 def _analyse(self, command:dict) -> Optional[bytes]: 177 """ 178 Analyse commands and send them to the relay 179 180 Format is a dictionnary 181 182 Example : 183 { "action": APIAction.SET_STATE.name, "content": RelayStates.ALL_ON.name } 184 { "action": APIAction.QUERY.name, "content": RelayQueries.VERSION.name } 185 186 returns: bytes from the serial response 187 """ 188 if not "action" in command: 189 return 190 191 if not APIAction.SET_STATE.name in command["action"] and not APIAction.QUERY.name in command["action"]: 192 return 193 194 try: 195 res = getattr(self, command["content"].lower())() 196 except Exception as e: 197 print(str(e)) 198 return 199 else: 200 return res 201 202 203 def run(self) -> None: 204 """ 205 The threading callback, loops on the input queue 206 for incoming commands 207 returns: None 208 """ 209 while self._running: 210 command = self._input_queue.get() 211 res = self._analyse(command) 212 self._output_queue.put(res)
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
48 def __init__(self, device:str="/dev/ttyACM0", bauds:int=115200) -> None: 49 super().__init__() 50 self._device = device 51 self._bauds = bauds 52 self._running = True 53 self._filedesc = None 54 self._input_queue = None 55 self._output_queue = None 56 self._populate_relay_states_apis()
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
58 def input_queue(self) -> Optional[Queue]: 59 """ 60 Gets the input queue 61 returns: Optional[Queue] 62 """ 63 return self._input_queue
Gets the input queue returns: Optional[Queue]
65 def set_input_queue(self, q:Queue) -> None: 66 """ 67 Assign a queue for incoming commands 68 q: the input queue 69 returns: None 70 """ 71 self._input_queue = q
Assign a queue for incoming commands q: the input queue returns: None
73 def output_queue(self) -> Optional[Queue]: 74 """ 75 Gets the output queue 76 returns: Optional[Queue] 77 """ 78 return self._output_queue
Gets the output queue returns: Optional[Queue]
80 def set_output_queue(self, q:Queue) -> None: 81 """ 82 Assign a queue for outgoing messages 83 q: the output queue 84 returns: None 85 """ 86 self._output_queue = q
Assign a queue for outgoing messages q: the output queue returns: None
88 def setup(self) -> bool: 89 """ 90 Setups the class, open the serial port 91 with the configuration 92 returns: True on success, else False 93 """ 94 try: 95 self._filedesc = serial.Serial( 96 self._device, 97 self._bauds, 98 parity=serial.PARITY_NONE, 99 bytesize=8, 100 stopbits=1 101 ) 102 except Exception as e: 103 print(str(e)) 104 return False 105 else: 106 return True 107 finally: 108 pass
Setups the class, open the serial port with the configuration returns: True on success, else False
123 def quit(self) -> None: 124 """ 125 Close the file descriptor 126 returns: None 127 """ 128 self._running = False 129 self.__clear_queue(self._input_queue) 130 self._filedesc.close()
Close the file descriptor returns: None
132 def version(self) -> bytes: 133 """ 134 Ask the system its version through the serial port 135 returns: bytes 136 """ 137 to_write = struct.pack('!B', RelayQueries.VERSION.value) 138 self._filedesc.write(to_write) 139 time.sleep(0.1) 140 res = self._filedesc.read(size=2) 141 return res
Ask the system its version through the serial port returns: bytes
143 def status(self) -> bytes: 144 """ 145 Ask the system each contacteur states 146 returns: bytes 147 """ 148 to_write = struct.pack('!B', RelayQueries.STATUS.value) 149 self._filedesc.write(to_write) 150 time.sleep(0.1) 151 res = self._filedesc.read(size=1) 152 return res
Ask the system each contacteur states returns: bytes
203 def run(self) -> None: 204 """ 205 The threading callback, loops on the input queue 206 for incoming commands 207 returns: None 208 """ 209 while self._running: 210 command = self._input_queue.get() 211 res = self._analyse(command) 212 self._output_queue.put(res)
The threading callback, loops on the input queue for incoming commands returns: None
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- daemon
- isDaemon
- setDaemon
- getName
- setName
- native_id
16class APIAction(Enum): 17 """ 18 List of action to specify when dealing with the Queue 19 SET_STATE: set the state of the relay 20 QUERY: get info from the relay 21 """ 22 UNKNOWN = 0 23 SET_STATE = 1 24 QUERY = 2
List of action to specify when dealing with the Queue SET_STATE: set the state of the relay QUERY: get info from the relay
Inherited Members
- enum.Enum
- name
- value
27class RelayStates(Enum): 28 """ 29 Values to send to the relay to actually set the state 30 """ 31 ALL_ON = 100 32 ALL_OFF = 110 33 ONE_ON = 101 34 ONE_OFF = 111 35 TWO_ON = 102 36 TWO_OFF = 112
Values to send to the relay to actually set the state
Inherited Members
- enum.Enum
- name
- value
39class RelayQueries(Enum): 40 """ 41 Values to send to the relay to get info 42 """ 43 VERSION = 90 44 STATUS = 91
Values to send to the relay to get info
Inherited Members
- enum.Enum
- name
- value