python -- Serial Communication

Serial communication of Python

Serial communication is a necessary skill for embedded engineers. If you can write a simple host computer, you will get a lot of points.
This series is a record of relevant work. After all, Python is not often used and always forgets a lot of things. It's too much trouble to learn it again when you use it. After all, three months later, the program I wrote was like a heavenly book.

Development environment:
The Python version is the latest 3.10.1,
Pycharm2021 is used for compilation and debugging 4 Community Edition,
Library to be used: serial communication library pyserial. Command line: pip3 install pyserial. Press enter to install.
Serial port equipment: own STM32 board
I like to use new versions of various versions of software and tools. The main tool software is updated about once a year.
Because I have suffered losses before, Protel99 has been used for many years. Later, when I use AD2013, I feel that it has all changed and is extremely unsuitable. And UG is the same. If you upgrade every year, there won't be too many software updates. It's easier to accept.

pyserial usage

First, import the pyserial library and define global variables

import serial
import serial.tools.list_ports
import threading

com_rx_buf = ''				# Receive buffer
com_tx_buf = ''				# Send buffer
COMM = serial.Serial()		# Define serial port object
port_list: list				# List of available serial ports
port_select: list			# Select a good serial port

port_ list = serial. tools. list_ ports. The # return value of components() is the list of available serial ports__ len__ Property is the number of available serial ports. Port of this machine_ list. Len () is 2.

post_list.len(), 2
post_ list[0]. The device field is the serial port number, which is COM99
post_ list[1]. The device field is the serial port number, which is COM97
The serial port number can be set in the device manager. I usually arrange a large serial port number for the specified device. The last item in the list is the device I want.

Open serial port

    COMM = serial.Serial(serial_port, 115200, timeout=0.01)
    if COMM.isOpen():
        print(serial_port, "open success")
        return 0
    else:
        print("open failed")
        return 255

Close the serial port

The corresponding is to close the serial port. The serial port must be closed at the end of the program. Otherwise, if the program exits abnormally and runs the program again, the serial port will make an error because it is already open.

    COMM.close()

send data

COMM.write(buf)

receive data

buf = COMM.readall() can read out all data received by the serial port. You can open a receiving thread, which is an endless loop. After a certain delay time, you can execute this instruction once.
It should be noted that if the other party sends a long string of data and the delay time expires just when part of the data is sent, the received data will not be complete. At this time, after another delay, execute readall() again to read all the data. Put the receiving thread in the receiving thread to complete the work.

The whole program is as follows. There are many more functions. The files are picked up from the existing projects. Pick up the useful ones.

# Installation: PIP3 install pyserial / / python3
import serial
import serial.tools.list_ports
import time
import threading


com_rx_buf = ''				# Receive buffer
com_tx_buf = ''				# Send buffer
COMM = serial.Serial()		# Define serial port object
port_list: list				# List of available serial ports
port_select: list			# Select a good serial port


# No serial port returns 0,
# Returns the list of available serial ports
def get_com_list():
    global port_list
    # a = serial.tools.list_ports.comports()
    # print(a)
    # port_list = list(serial.tools.list_ports.comports())
    port_list = serial.tools.list_ports.comports()
    return port_list


def set_com_port(n=0):
    global port_list
    global port_select
    port_select = port_list[n]
    return port_select.device


# Open serial port
def serial_open(n=0):
    global COMM
    serial_port = set_com_port(n)
    COMM = serial.Serial(serial_port, 115200, timeout=0.01)
    if COMM.isOpen():
        print(serial_port, "open success")
        return 0
    else:
        print("open failed")
        return 255


# Close the serial port
def serial_close():
    global COMM
    COMM.close()
    print(COMM.name + "closed.")


def set_com_rx_buf(buf=''):
    global com_rx_buf
    com_rx_buf = buf


def set_com_tx_buf(buf=''):
    global com_tx_buf
    com_tx_buf = buf


def get_com_rx_buf():
    global com_rx_buf
    return com_rx_buf


def get_com_tx_buf():
    global com_tx_buf
    return com_tx_buf


def thread_com_receive():
    while True:
        try:
            rx_buf = ''
            rx_buf = COMM.read()  # Convert to integer number
            if rx_buf != b'':
                time.sleep(0.01)
                rx_buf = rx_buf + COMM.read_all()
                print("Serial port receives message:", rx_buf)
            time.sleep(0.01)
        except:
            pass
    pass


# def serial_encode(addr=0, command=0, param1=0, param0=0):
#     buf = [addr, command, param1, param0, 0, 0, 0, 0]
#     print(buf)
#     return buf


def serial_send_command(addr=0, command=0, param1=0, param0=0, data3=0, data2=0, data1=0, data0=0):
    buf = [addr, command, param1, param0, data3, data2, data1, data0]
    COMM.write(buf)
    pass


def serial_init():
    buf = "AT+CG\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 254  # Failed to enter debugging mode

    buf = "AT+CAN_MODE=0\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 253          # Failed to enter normal mode, and the module is in 1 state, i.e. loopback mode

    buf = "AT+CAN_BAUD=500000\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 253          # Baud rate setting failed

    buf = "AT+FRAMEFORMAT=1,0,\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 253          # Baud rate setting failed

    buf = "AT+ET\r\n"       # Enter transparent mode
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 255  # Not a CAN module


if __name__ == '__main__':
    get_com_list()
    len = port_list.__len__()
    device = port_list[0].device
    print(len, device)
    serial_open()
    thread1 = threading.Thread(target=thread_com_receive)
    thread1.start()
    # serial_close()

Program execution results

Keywords: Python Embedded system

Added by bobvaz on Tue, 18 Jan 2022 21:22:37 +0200