librelay

1from .librelay import LibRelay02, APIAction, RelayStates, RelayQueries
2
3__all__ = [
4	"LibRelay02",
5	"APIAction",
6	"RelayStates",
7	"RelayQueries",
8]
class LibRelay02(threading.Thread):
 47class LibRelay02(Thread):
 48    def __init__(self, device:str="/dev/ttyACM0", bauds:int=115200) -> None:
 49        super().__init__()
 50        self._device = device
 51        self._bauds = bauds
 52        self._running = True
 53        self._filedesc = None
 54        self._input_queue = None
 55        self._output_queue = None
 56        self._populate_relay_states_apis()
 57
 58    def input_queue(self) -> Optional[Queue]:
 59        """
 60        Gets the input queue
 61        returns: Optional[Queue]
 62        """
 63        return self._input_queue
 64    
 65    def set_input_queue(self, q:Queue) -> None:
 66        """
 67        Assign a queue for incoming commands
 68        q: the input queue
 69        returns: None
 70        """
 71        self._input_queue = q
 72
 73    def output_queue(self) -> Optional[Queue]:
 74        """
 75        Gets the output queue
 76        returns: Optional[Queue]
 77        """
 78        return self._output_queue
 79
 80    def set_output_queue(self, q:Queue) -> None:
 81        """
 82        Assign a queue for outgoing messages
 83        q: the output queue
 84        returns: None
 85        """
 86        self._output_queue = q
 87
 88    def setup(self) -> bool:
 89        """
 90        Setups the class, open the serial port
 91        with the configuration
 92        returns: True on success, else False
 93        """
 94        try:
 95            self._filedesc = serial.Serial(
 96                self._device,
 97                self._bauds,
 98                parity=serial.PARITY_NONE,
 99                bytesize=8,
100                stopbits=1
101                )
102        except Exception as e:
103            print(str(e))
104            return False
105        else:
106            return True
107        finally:
108            pass
109
110    def __clear_queue(self, q:Queue) -> None:
111        """
112        Utility to clear a queue
113        returns: None
114        """
115        while not q.empty():
116            try:
117                q.get(block=False)
118            except Exception as e:
119                logger.warning(str(e))
120                continue
121            q.task_done()
122
123    def quit(self) -> None:
124        """
125        Close the file descriptor
126        returns: None
127        """
128        self._running = False
129        self.__clear_queue(self._input_queue)
130        self._filedesc.close()
131
132    def version(self) -> bytes:
133        """
134        Ask the system its version through the serial port
135        returns: bytes
136        """
137        to_write = struct.pack('!B', RelayQueries.VERSION.value)
138        self._filedesc.write(to_write)
139        time.sleep(0.1)
140        res = self._filedesc.read(size=2)
141        return res
142
143    def status(self) -> bytes:
144        """
145        Ask the system each contacteur states
146        returns: bytes
147        """
148        to_write = struct.pack('!B', RelayQueries.STATUS.value)
149        self._filedesc.write(to_write)
150        time.sleep(0.1)
151        res = self._filedesc.read(size=1)
152        return res
153
154    def _populate_relay_states_apis(self) -> None:
155        """
156        Create methods that will be assignated to the current class,
157        based on the RelayStates and RelayQueries classes
158        returns: None
159        """
160        def __make_command_fct(instance, name, value):
161            def tmp_fct() -> bytes:
162                to_write = struct.pack('!B', value)
163                res = instance._filedesc.write(to_write)
164                time.sleep(0.1)
165                return res
166            tmp_fct.__name__ = name
167            tmp_fct.__qualname__ = name
168            tmp_fct.__doc__ = name
169            return tmp_fct
170
171        instance = self
172        for member in RelayStates:
173            f = __make_command_fct(instance, member.name.lower(), member.value)
174            setattr(instance, member.name.lower(), f)
175
176    def _analyse(self, command:dict) -> Optional[bytes]:
177        """
178        Analyse commands and send them to the relay
179
180        Format is a dictionnary
181
182        Example :
183        { "action": APIAction.SET_STATE.name, "content": RelayStates.ALL_ON.name }
184        { "action": APIAction.QUERY.name, "content": RelayQueries.VERSION.name }
185
186        returns: bytes from the serial response
187        """
188        if not "action" in command:
189            return
190
191        if not APIAction.SET_STATE.name in command["action"] and not APIAction.QUERY.name in command["action"]:
192            return
193
194        try:
195            res = getattr(self, command["content"].lower())()
196        except Exception as e:
197            print(str(e))
198            return
199        else:
200            return res
201
202
203    def run(self) -> None:
204        """
205        The threading callback, loops on the input queue
206        for incoming commands
207        returns: None
208        """
209        while self._running:
210            command = self._input_queue.get()
211            res = self._analyse(command)
212            self._output_queue.put(res)

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

LibRelay02(device: str = '/dev/ttyACM0', bauds: int = 115200)
48    def __init__(self, device:str="/dev/ttyACM0", bauds:int=115200) -> None:
49        super().__init__()
50        self._device = device
51        self._bauds = bauds
52        self._running = True
53        self._filedesc = None
54        self._input_queue = None
55        self._output_queue = None
56        self._populate_relay_states_apis()

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

def input_queue(self) -> Optional[queue.Queue]:
58    def input_queue(self) -> Optional[Queue]:
59        """
60        Gets the input queue
61        returns: Optional[Queue]
62        """
63        return self._input_queue

Gets the input queue returns: Optional[Queue]

def set_input_queue(self, q: queue.Queue) -> None:
65    def set_input_queue(self, q:Queue) -> None:
66        """
67        Assign a queue for incoming commands
68        q: the input queue
69        returns: None
70        """
71        self._input_queue = q

Assign a queue for incoming commands q: the input queue returns: None

def output_queue(self) -> Optional[queue.Queue]:
73    def output_queue(self) -> Optional[Queue]:
74        """
75        Gets the output queue
76        returns: Optional[Queue]
77        """
78        return self._output_queue

Gets the output queue returns: Optional[Queue]

def set_output_queue(self, q: queue.Queue) -> None:
80    def set_output_queue(self, q:Queue) -> None:
81        """
82        Assign a queue for outgoing messages
83        q: the output queue
84        returns: None
85        """
86        self._output_queue = q

Assign a queue for outgoing messages q: the output queue returns: None

def setup(self) -> bool:
 88    def setup(self) -> bool:
 89        """
 90        Setups the class, open the serial port
 91        with the configuration
 92        returns: True on success, else False
 93        """
 94        try:
 95            self._filedesc = serial.Serial(
 96                self._device,
 97                self._bauds,
 98                parity=serial.PARITY_NONE,
 99                bytesize=8,
100                stopbits=1
101                )
102        except Exception as e:
103            print(str(e))
104            return False
105        else:
106            return True
107        finally:
108            pass

Setups the class, open the serial port with the configuration returns: True on success, else False

def quit(self) -> None:
123    def quit(self) -> None:
124        """
125        Close the file descriptor
126        returns: None
127        """
128        self._running = False
129        self.__clear_queue(self._input_queue)
130        self._filedesc.close()

Close the file descriptor returns: None

def version(self) -> bytes:
132    def version(self) -> bytes:
133        """
134        Ask the system its version through the serial port
135        returns: bytes
136        """
137        to_write = struct.pack('!B', RelayQueries.VERSION.value)
138        self._filedesc.write(to_write)
139        time.sleep(0.1)
140        res = self._filedesc.read(size=2)
141        return res

Ask the system its version through the serial port returns: bytes

def status(self) -> bytes:
143    def status(self) -> bytes:
144        """
145        Ask the system each contacteur states
146        returns: bytes
147        """
148        to_write = struct.pack('!B', RelayQueries.STATUS.value)
149        self._filedesc.write(to_write)
150        time.sleep(0.1)
151        res = self._filedesc.read(size=1)
152        return res

Ask the system each contacteur states returns: bytes

def run(self) -> None:
203    def run(self) -> None:
204        """
205        The threading callback, loops on the input queue
206        for incoming commands
207        returns: None
208        """
209        while self._running:
210            command = self._input_queue.get()
211            res = self._analyse(command)
212            self._output_queue.put(res)

The threading callback, loops on the input queue for incoming commands returns: None

Inherited Members
threading.Thread
start
join
name
ident
is_alive
daemon
isDaemon
setDaemon
getName
setName
native_id
class APIAction(enum.Enum):
16class APIAction(Enum):
17    """
18    List of action to specify when dealing with the Queue
19    SET_STATE: set the state of the relay
20    QUERY: get info from the relay
21    """
22    UNKNOWN = 0
23    SET_STATE = 1
24    QUERY = 2

List of action to specify when dealing with the Queue SET_STATE: set the state of the relay QUERY: get info from the relay

UNKNOWN = <APIAction.UNKNOWN: 0>
SET_STATE = <APIAction.SET_STATE: 1>
QUERY = <APIAction.QUERY: 2>
Inherited Members
enum.Enum
name
value
class RelayStates(enum.Enum):
27class RelayStates(Enum):
28    """
29    Values to send to the relay to actually set the state
30    """
31    ALL_ON  =   100
32    ALL_OFF =   110
33    ONE_ON  =   101
34    ONE_OFF =   111
35    TWO_ON  =   102
36    TWO_OFF =   112

Values to send to the relay to actually set the state

ALL_ON = <RelayStates.ALL_ON: 100>
ALL_OFF = <RelayStates.ALL_OFF: 110>
ONE_ON = <RelayStates.ONE_ON: 101>
ONE_OFF = <RelayStates.ONE_OFF: 111>
TWO_ON = <RelayStates.TWO_ON: 102>
TWO_OFF = <RelayStates.TWO_OFF: 112>
Inherited Members
enum.Enum
name
value
class RelayQueries(enum.Enum):
39class RelayQueries(Enum):
40    """
41    Values to send to the relay to get info
42    """
43    VERSION =   90
44    STATUS  =   91

Values to send to the relay to get info

VERSION = <RelayQueries.VERSION: 90>
STATUS = <RelayQueries.STATUS: 91>
Inherited Members
enum.Enum
name
value