RoboAide
Project to improve a DIY robotic arm used for mobility assistance
Communication.py
Go to the documentation of this file.
1 from PySide2.QtCore import QThread
2 import serial
3 import serial.tools.list_ports
4 import struct
5 
6 class MessageReception(QThread):
7  """
8  Class for a thread that handles incoming serial messages
9  """
10  def __init__(self, mainWindow):
11  super(MessageReception, self).__init__()
12  self.mainWindow = mainWindow
13  self.shouldRun = True
14  self.counter = 0
15  self.firstMessage = True
16 
17  def run(self):
18  """
19  run thread
20  """
21  print("Message Reception thread started")
22  while self.shouldRun:
23  # print(self.mainWindow.comm.read())
24  message = self.mainWindow.comm.read(self.mainWindow.messageSize) # TODO: Find out why an extra byte is received (only happens with openCr)
25  if len(message) == self.mainWindow.messageSize:
26  print("message received")
27  unpacked_msg = self.mainWindow.s.unpack(message) # Unpack message
28  print(str(self.counter) + ": ", end='')
29  print(unpacked_msg)
30  self.counter += 1
31  self.setMotorCurrentPosition(unpacked_msg)
32  self.setDrawerState(unpacked_msg)
33  print("done")
34 
35  def stop(self):
36  """
37  Method to stop and close the thread
38  """
39  self.shouldRun = False
40  print("Stopping Message Reception thread")
41 
42  def setMotorCurrentPosition(self, msg):
43  """
44  Method to set the current position of all motors based on the information in the received message
45  :param msg: received message
46  """
47  for i in range(self.mainWindow.numberOfMotors):
48  self.mainWindow.dictMot["motor" + str(i+1)].setCurrentPosition(msg[i+1])
49  if self.firstMessage:
50  for i in range(self.mainWindow.numberOfMotors):
51  self.mainWindow.dictMot["motor" + str(i + 1)].setGoalPosition(msg[i + 1])
52  self.mainWindow.updateSliderPositions()
53  self.firstMessage = False
54 
55  def setDrawerState(self, msg):
56  """
57  Update drawer state (open/closed)
58  :param msg: received message
59  """
60  for i in range(len(self.mainWindow.drawersList)):
61  self.mainWindow.drawersList[i].setState(msg[i+8])
62 
63 
64 class MessageTransmission(QThread):
65  """
66  Class for a thread that handles outgoing serial messages
67  """
68  def __init__(self, mainWindow):
69  super(MessageTransmission, self).__init__()
70  self.mainWindow = mainWindow
71  self.shouldRun = True
72  self.counter = 0
73  self.firstMessage = True
74 
75  def run(self):
76  """
77  run thread
78  """
79  print("Message Transmission thread started")
80  while self.shouldRun:
81  self.msleep(500)
82  if len(self.mainWindow.msgDeque) > 0:
83  self.mainWindow.msgMu.lock()
84  self.mainWindow.comm.write(self.mainWindow.msgDeque.popleft())
85  self.mainWindow.msgMu.unlock()
86 
87  def stop(self):
88  """
89  Method to stop and close the thread
90  """
91  self.shouldRun = False
92  print("Stopping Message Transmission thread")
93 
94 
96  """
97  Initialize serial communication with a specified port
98  :param port: serial port name
99  :return: serial object and bool indicating if connection was successful
100  """
101  try:
102  ser = serial.Serial(port, 9600, timeout=0.1)
103  print("Connected to %s" % port)
104  connected = True
105  except serial.serialutil.SerialException:
106  print("Failed to connect to %s." % port)
107  ser = None
108  connected = False
109  return ser, connected
110 
111 
113  """
114  scan available serial ports and return the list
115  :return: list of available serial ports
116  """
117  ports = serial.tools.list_ports.comports()
118  ports_list = ['Select a communication port']
119  ports_list.extend(ports)
120  return ports_list
def __init__(self, mainWindow)
def initSerialConnection(port)