1, Trolley device
In the tutorial is the car device integrated on Taobao. In addition, I also bought some spare parts and configured my own car.
https://detail.tmall.com/item.htm?id=608554421638&spm=a1z09.2.0.0.7e012e8d3NEMy0&_u=12kf16b6b4b
Components include:
1. Trolley bottom plate (2), motor (4), wheel (4), DuPont wire, copper column and several screws
2. Ultrasonic sensor (1) + steering gear (1)
3. Tracking sensor (3)
4. Obstacle avoidance sensors (2, left and right)
5. USB camera (1) + steering gear (2)
6. Raspberry pie 4B main control board (2G version) (1)
7. Raspberry pie expansion board (1)
8. Battery (1)
9. Voltage display module (1)
I finally bought a set and spent 620 yuan. If I buy it in bulk, the price is estimated as follows:
1. One set of trolley bottom plate: 35 yuan
2. Ultrasonic + 1 steering gear + small parts: 30 yuan
3. Tracking sensor: 15 yuan
4. Obstacle avoidance sensor: 5 yuan
5. USB camera + 2 steering gear: 50 yuan
6. Raspberry pie expansion board (2G version): 320 yuan
7. Battery: 20 yuan
8. Voltage display module: 5 yuan
9. Raspberry pie expansion board: 140 yuan
10. One set of electronic course: 0 yuan
Total: 620 yuan.
In other words, the store earned a profit from selling spare parts and expansion boards made by its own factory, but did not earn integration fees.
It is suggested that the most economical way is to buy package B. raspberry pie is not compatible with any USB camera, so you need to buy a package B camera, which is completely unnecessary for C and D. other sensors can be bought in the future. In addition, if you want to save money, you can buy a main control board without raspberry pie 4B and a main control board of raspberry pie ZeroWH, which can save 200 yuan, which is enough for the car to achieve performance.
2, Function point code
2.1. Four wheel motor drive module (the main program will call)
2.1.1 principle
In fact, the raspberry pie integrated expansion board integrates two L298N motor drive chips. Refer to spare parts, and its working principle is as follows:
The simple point is that ENA and ENB control the wheel speed, and IN1 and IN2 control the wheel state (forward rotation, reverse rotation and stop). Output A and output B in the figure above supply power to the wheel motor respectively.
2.1.2 code
#CarMotor.py import RPi.GPIO as GPIO import asyncio class SingleMotor: def __init__(self, IN1, IN2, PWM=None): self.speed = 35 self.freq = 50 self.run_state = "stop" self.PWM = PWM self.IN1 = IN1 self.IN2 = IN2 GPIO.setup(IN1, GPIO.OUT) GPIO.setup(IN2, GPIO.OUT) self.Motor = None if self.PWM != None: GPIO.setup(PWM, GPIO.OUT) self.Motor = GPIO.PWM(PWM, self.freq) self.Motor.start(0) #Set speed (0-100) def set_speed(self, speed): if self.Motor == None: return if speed > 100: self.speed = 100 elif speed < 0: self.speed = 0 else: self.speed = speed self.Motor.ChangeDutyCycle(self.speed) def get_speed(self): return self.speed def inc_speed(self, v): self.set_speed(self.speed + v) def dec_speed(self, v): self.set_speed(self.speed - v) def get_run_state(self): return self.run_state #Forward rotation def up(self): self.run_state = "up" GPIO.output(self.IN1, GPIO.HIGH) GPIO.output(self.IN2, GPIO.LOW) #reversal def down(self): self.run_state = "down" GPIO.output(self.IN1, GPIO.LOW) GPIO.output(self.IN2, GPIO.HIGH) #stop it def stop(self): self.run_state = "stop" GPIO.output(self.IN1, GPIO.LOW) GPIO.output(self.IN2, GPIO.LOW) class CarWheel: def __init__(self, L_Motor, R_Motor): self.L_Wheel = L_Motor self.R_Wheel = R_Motor self.speed = 35 self.set_speed(self.speed) self.run_state = "stop" def set_speed(self, speed): if speed > 100: self.speed = 100 elif speed < 0: self.speed = 0 else: self.speed = speed self.L_Wheel.set_speed(self.speed) self.R_Wheel.set_speed(self.speed) def get_speed(self): return self.speed def inc_speed(self, v): self.L_Wheel.inc_speed(v) self.R_Wheel.inc_speed(v) self.speed = self.L_Wheel.get_speed() def dec_speed(self, v): self.L_Wheel.dec_speed(v) self.R_Wheel.dec_speed(v) self.speed = self.L_Wheel.get_speed() def get_run_state(self): return self.run_state def up(self): self.run_state = "up" self.L_Wheel.up() self.R_Wheel.up() def stop(self): self.run_state = "stop" self.L_Wheel.stop() self.R_Wheel.stop() def down(self): self.run_state = "down" self.L_Wheel.down() self.R_Wheel.down() #Turn the trolley to the left, control the left wheel not to move, and the right wheel can move forward def left(self): self.run_state = "left" self.L_Wheel.stop() self.R_Wheel.up() #Turn the trolley to the right, control the right wheel not to move, and the left wheel can move forward def right(self): self.run_state = "right" self.L_Wheel.up() self.R_Wheel.stop() async def motor_task(): L_Motor = SingleMotor( PWM = 18, IN1 = 22, IN2 = 27 ) R_Motor = SingleMotor( PWM = 23, IN1 = 25, IN2 = 24 ) cm = CarWheel( L_Motor = L_Motor, R_Motor = R_Motor ) t_time = 2 cm.inc_speed(5) cm.up() await asyncio.sleep(t_time) cm.dec_speed(5) cm.down() await asyncio.sleep(t_time) cm.left() await asyncio.sleep(t_time) cm.right() await asyncio.sleep(t_time) cm.stop() await asyncio.sleep(t_time) async def main(): dltasks = set() dltasks.add(asyncio.ensure_future(motor_task())) dones, dltasks = await asyncio.wait(dltasks) for task in dones: print("Task ret: ", task.result()) if __name__ == '__main__': GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) loop = asyncio.get_event_loop() loop.run_until_complete(main()) try: loop.run_forever() except KeyboardInterrupt: pass finally: GPIO.cleanup()
2.2 ultrasonic example
#https://gpiozero.readthedocs.io/en/stable/recipes.html#distance-sensor #DistanceSensor_t.py from gpiozero import DistanceSensor from time import sleep sensor = DistanceSensor(23, 24) while True: print('Distance to nearest object is', sensor.distance, 'm') sleep(1)
2.3 and 2.4G wireless handle control examples
#joystick_t.py import asyncio import pygame from pygame.locals import * async def joystick_task(self): #JOYAXISMOTION joy, axis, value #JOYBALLMOTION joy, ball, rel #JOYHATMOTION joy, hat, value #JOYBUTTONUP joy, button #JOYBUTTONDOWN joy, button while True: for event in pygame.event.get(): # if event.type == pygame.JOYAXISMOTION: # print("Joystick JOYAXISMOTION.") # if event.type == pygame.JOYBALLMOTION: # print("Joystick JOYBALLMOTION.") if event.type == pygame.JOYHATMOTION: # print("Joystick JOYHATMOTION.") if event.value == (-1, 0): print('left') elif event.value == (0, 1): print('up') elif event.value == (0, -1): print('down') elif event.value == (1, 0): print('right') elif event.value == (0, 0): print('stop') else: print(event.value) elif event.type == pygame.JOYBUTTONDOWN: print("Joystick JOYBUTTONDOWN.") if joystick.get_button(0) == 1: print('button[Y]') elif joystick.get_button(1) == 1: print('button[B]') elif joystick.get_button(2) == 1: print('button[A]') elif joystick.get_button(3) == 1: print('button[X]') else: pass elif event.type == pygame.JOYBUTTONUP: print("Joystick JOYBUTTONUP.") else: pass await asyncio.sleep(0.1) async def main(): pygame.init() pygame.joystick.init() # Get count of joysticks joystick_count = pygame.joystick.get_count() print("Number of joysticks: {0}".format(joystick_count)) joystick0 = pygame.joystick.Joystick(0) joystick.init() dltasks = set() dltasks.add(asyncio.ensure_future(joystick_task())) dones, dltasks = await asyncio.wait(dltasks) for task in dones: print("Task ret: ", task.result()) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) try: loop.run_forever() except KeyboardInterrupt: pass finally: pygame.quit()
2.4 network services
#See main program
2.5. Steering gear module (the main program will call)
#CarServo.py import Adafruit_PCA9685 import RPi.GPIO as GPIO import time class CarServo: def __init__(self): #2 camera steering gear, 1 ultrasonic steering gear self.pwm_pca9685 = Adafruit_PCA9685.PCA9685() self.pwm_pca9685.set_pwm_freq(50) self.servo = {} self.set_servo_angle(0, 110) self.set_servo_angle(1, 100) self.set_servo_angle(2, 20) #The input angle is converted to a value of 12 ^ accuracy def set_servo_angle(self, channel, angle): if (channel >= 0) and (channel <= 2): new_angle = angle if angle < 0: new_angle = 0 elif angle > 180: new_angle = 180 else: new_angle = angle print("channel={0}, angle={1}".format(channel, new_angle)) #date=4096*((new_angle*11)+500)/20000#Perform rounding operation date=int(4096*((angle*11)+500)/(20000)+0.5) date = int(4096*((new_angle*11)+500)/(20000)+0.5) self.pwm_pca9685.set_pwm(channel, 0, date) self.servo[channel] = new_angle else: print("set_servo_angle error. servo[{0}] = [{1}]".format(channel, angle)) def inc_servo_angle(self, channel, v): self.set_servo_angle(channel, self.servo[channel] + v) def dec_servo_angle(self, channel, v): self.set_servo_angle(channel, self.servo[channel] - v) if __name__ == '__main__': GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) cs = CarServo() cs.inc_servo_angle(0, 10) cs.inc_servo_angle(1, 10) cs.inc_servo_angle(2, 10) time.sleep(2) cs.dec_servo_angle(0, 10) cs.dec_servo_angle(1, 10) cs.dec_servo_angle(2, 10) time.sleep(2) GPIO.cleanup()
2.6 example of USB camera
#See main program
2.7 example of infrared obstacle avoidance
import RPi.GPIO as GPIO import time pin_avoid_obstacle=18 GPIO.setmode(GPIO.BCM) GPIO.setup(pin_avoid_obstacle, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) try: while True: status = GPIO.input(pin_avoid_obstacle) if status == TRUE: print('I am an infrared obstacle avoidance module. No obstacles are detected. Everything is normal!') else: print('I'm an infrared obstacle avoidance module. If an obstacle is detected, pay attention to parking') time.sleep(0.5) except KeyboradInterrupt: pass finally: GPIO.cleanup()
2.8 example of tracking module
#I don't want to put a black line on the ground floor of my home for the time being, so I didn't write this code
3, Trolley complete code
#ppycar.py #!/usr/bin/python3 # -*- coding: utf-8 -*- import asyncio import aiohttp import websockets import json import time import logging import threading import sys import cv2 from aiohttp import web from concurrent.futures import ThreadPoolExecutor import RPi.GPIO as GPIO import pygame from pygame.locals import * from CarMotor import CarWheel, SingleMotor from gpiozero import DistanceSensor #from CarDistance import CarDistance from CarServo import CarServo class CwCar: def __init__(self): #Left front and rear wheels (circuit ensures consistent action) L_Motor = SingleMotor( PWM = 18, IN1 = 22, IN2 = 27 ) #Right front and rear wheels (circuit ensures consistent action) R_Motor = SingleMotor( PWM = 23, IN1 = 25, IN2 = 24 ) #Trolley self.cm = CarWheel( L_Motor = L_Motor, R_Motor = R_Motor ) #Ultrasonic ranging #self.cd = CarDistance( # GPIO_TRIGGER = 20, # GPIO_ECHO = 21 # ) self.sensor = DistanceSensor(21, 20) #2 camera steering gear, 1 ultrasonic steering gear self.cs = CarServo() #Thread pool, used to encapsulate non co process objects self.__executor = ThreadPoolExecutor(10) self.__loop = asyncio.get_event_loop() self.cap = cv2.VideoCapture(0) self.jpg_bytes = None #Blocking function call encapsulation of non concurrent objects async def ThreadRun(self, func, *args,**kwargs): return await self.__loop.run_in_executor(self.__executor, func, *args,**kwargs) #Regularly update the obstacle distance measured by ultrasound async def distance_task(self, update_time): print("distance_task start") while True: dist_float = self.sensor.distance dist = int(dist_float * 100) #When the distance is less than 40cm and the running state is forward, the trolley stops running if (dist < 80) and (self.cm.get_run_state() == 'up'): self.cm.stop() await asyncio.sleep(update_time) print("distance_task end") #Handle command controls trolley operation async def joystick_task(self): joystick = pygame.joystick.Joystick(0) joystick.init() while True: for event in pygame.event.get(): #The up, down, left and right keys on the left of the handle control the operation of the trolley if event.type == pygame.JOYHATMOTION: if event.value == (-1, 0): print('left') self.cm.left() elif event.value == (0, 1): print('up') dist_float = self.sensor.distance dist = int(dist_float * 100) if dist > 80: self.cm.up() elif event.value == (0, -1): print('down') self.cm.down() elif event.value == (1, 0): print('right') self.cm.right() elif event.value == (0, 0): print('stop') self.cm.stop() else: print(event.value) #Control trolley speed elif event.type == pygame.JOYBUTTONDOWN: print("Joystick JOYBUTTONDOWN.") if joystick.get_button(0) == 1: print('button[Y]') elif joystick.get_button(1) == 1: print('button[B]') self.cm.inc_speed(5) elif joystick.get_button(2) == 1: print('button[A]') elif joystick.get_button(3) == 1: print('button[X]') self.cm.dec_speed(5) #Control camera steering gear elif joystick.get_button(4) == 1: print('button[4]') self.cs.inc_servo_angle(1, 5) elif joystick.get_button(5) == 1: print('button[5]') self.cs.dec_servo_angle(1, 5) elif joystick.get_button(6) == 1: print('button[6]') self.cs.inc_servo_angle(2, 5) elif joystick.get_button(7) == 1: print('button[7]') self.cs.dec_servo_angle(2, 5) else: pass await asyncio.sleep(0.1) #Receive the car control command sent from the remote web page async def car_recv_msg(self, ws, msg): response_ojb = { "cmd" : "None", "ret" : 0, "errinfo": "" } try: recv_text = msg.data recv_json_obj = json.loads(recv_text) print(recv_json_obj) cmd = recv_json_obj['cmd'] response_ojb['cmd'] = cmd print("cmd = {0}".format(cmd)) if cmd == "stop": self.cm.stop() elif cmd == "up": dist = int(await self.ThreadRun(self.cd.distance)) if dist > 80: self.cm.up() elif cmd == "down": self.cm.down() elif cmd == "left": self.cm.left() elif cmd == "right": self.cm.right() elif cmd == "inc": self.cm.inc_speed(5) speed = self.cm.get_speed() response_ojb["speed"] = speed elif cmd == "dec": self.cm.dec_speed(5) speed = self.cm.get_speed() response_ojb["speed"] = speed else: print("cmd error") except Exception as e: print("recv json error:{0}:{1}:{2}" .format( e.__traceback__.tb_frame.f_globals["__file__"], e.__traceback__.tb_lineno, e)) response_ojb["ret"] = 1 response_ojb["errorinfo"] = "recv json error" response_json = json.dumps(response_ojb) await ws.send_str(response_json) #Analysis of websocket receiving message protocol async def websocket_car_handler(self, request): print('websocket_car_handler start...') ws = web.WebSocketResponse() await ws.prepare(request) while not ws.closed: msg = await ws.receive() if msg.type == aiohttp.WSMsgType.text: await self.car_recv_msg(ws, msg) elif msg.type == aiohttp.WSMsgType.close: print('websocket connection closed') elif msg.type == aiohttp.WSMsgType.error: print('ws connection closed with exception {0}'.format(ws.exception())) else: await ws.send_str('websocket_car_handler') return ws #Return camera image data to the request page async def websocket_webcam_handler(self, request): print('websocket_webcam_handler start...') ws = web.WebSocketResponse() await ws.prepare(request) while not ws.closed: msg = await ws.receive() if msg.type == aiohttp.WSMsgType.text: print(msg.data, end='') if self.jpg_bytes != None: await ws.send_bytes(self.jpg_bytes) elif msg.type == aiohttp.WSMsgType.close: print('websocket connection closed') elif msg.type == aiohttp.WSMsgType.error: print('ws connection closed with exception {0}'.format(ws.exception())) else: await ws.send_str('websocket_webcam_handler') return ws #Regularly collect camera image data async def webcam_read_task(self): print("{0}:{1}".format("webcam_read_task", "start")) encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50] while True: await self.ThreadRun(self.__webcam_read, encode_param) await asyncio.sleep(0.05) print("{0}:{1}".format("webcam_read_task", "end")) #Camera image data acquisition def __webcam_read(self, encode_param): ret, frame = self.cap.read() ret, jpg = cv2.imencode('.jpg', frame, encode_param) self.jpg_bytes = jpg.tobytes() async def main(main_loop): cc = CwCar() local_ip = '0.0.0.0' local_port = 5678 app = web.Application() app.router.add_route('GET', '/ppycar', cc.websocket_car_handler) app.router.add_route('GET', '/sendframe', cc.websocket_webcam_handler) app.router.add_static('/static', path='webroot', show_index=True) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, local_ip, local_port) print("MainLoop:Server started at http://{0}...:{1}".format(local_ip, local_port)) dltasks = set() dltasks.add(asyncio.ensure_future(cc.joystick_task())) #Handle control dltasks.add(asyncio.ensure_future(site.start())) #websockets web page control dltasks.add(asyncio.ensure_future(cc.webcam_read_task())) #Timing camera image data acquisition dltasks.add(asyncio.ensure_future(cc.distance_task(update_time = 0.05)))#Timing ultrasonic ranging dones, dltasks = await asyncio.wait(dltasks) for task in dones: print("Main Task ret: ", task.result()) if __name__ == '__main__': pygame.init() pygame.joystick.init() joystick_count = pygame.joystick.get_count() print("Number of joysticks: {0}".format(joystick_count)) #Make sure there is and only one handle control if joystick_count != 1: print("joystick_count < 1, sys.exit.") GPIO.cleanup() pygame.quit() sys.exit() GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) main_loop = asyncio.get_event_loop() main_loop.run_until_complete(main(main_loop)) try: main_loop.run_forever() except KeyboardInterrupt: pass finally: GPIO.cleanup() pygame.quit()
4, Web page control complete code
ppy_car_web.html
<!DOCTYPE html> <html lang="zh-cmn-Hans"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width,initial-scale=1,user-scalable=0,viewport-fit=cover"> <title>Trolley remote control</title> <link rel="stylesheet" href="https://res.wx.qq.com/open/libs/weui/2.4.4/weui.css"/> <script type="text/javascript"> let ws; let wsurl = "ws://" + window.location.hostname + ":5678/ppycar" //let wsurl = "ws://192.168.3.128:5678/ppycar" let speed = 35; function car_cmd(car_cmd_json){ let jsonText = JSON.stringify(car_cmd_json); console.log(jsonText); if (ws.readyState == 1){ ws.send(jsonText); }else{ console.log("ws.readyState = " + ws.readyState) } } function car_speed(cmd){ car_cmd({cmd: cmd}) } function car_move(cmd){ car_cmd({cmd: cmd}) } function car_distance(){ car_cmd({cmd: 'distance'}) } function ws_conn(){ ws = new WebSocket(wsurl); console.log("ws_conn."); ws.onopen = function() { console.log("do_ws_open."); }; ws.onmessage = function (evt) { let received_msg = evt.data; console.log("recv info:" + received_msg); let obj = JSON.parse(received_msg) if((obj.cmd == 'inc') || (obj.cmd == 'dec')){ document.getElementById("car_speed").innerHTML = obj.speed + "(Interval: 0-100)"; } }; ws.onclose = function() { console.log("ws close."); }; } function ws_close(){ console.log("do_ws_close."); ws.close(); } </script> <script> let webcam_wsurl = "ws://" + window.location.hostname + ":5678/sendframe" let socket = new WebSocket(webcam_wsurl); let img = new Image(); function sendMsg() { socket.send("update"); console.log("socket: send update"); } function Uint8ToString(u8a) { var CHUNK_SZ = 0x8000; var c = []; for (var i = 0; i < u8a.length; i += CHUNK_SZ) { c.push(String.fromCharCode(...u8a.subarray(i, i + CHUNK_SZ))); } return c.join(""); } function drawFrame(frame) { var uint8Arr = new Uint8Array(frame); var str = Uint8ToString(uint8Arr); var base64String = btoa(str); img.onload = function () { context.drawImage(this, 0, 0, canvas.width, canvas.height); }; img.src = "data:image/png;base64," + base64String; } socket.onopen = () => { console.log("socket: connected"); }; socket.onmessage = (msg) => { msg.data.arrayBuffer().then((buffer) => { drawFrame(buffer); console.log("socket: frame updated"); }); }; </script> </head> <body onload="ws_conn();"> <div class="page"> <div class="weui-form"> <div class="weui-form__text-area"> <h2 class="weui-form__title">Trolley remote control</h2> </div> <div class="weui-cells weui-cells_form"> <div class="weui-cell"> <div class="weui-cell__bd"> </div> <div class="weui-cell__bd"> <button class="weui-btn weui-btn_primary" onclick="car_move('up');" id="car_up">forward</button> </div> <div class="weui-cell__bd"> </div> </div> <div class="weui-cell"> <div class="weui-cell__bd"> <button class="weui-btn weui-btn_primary" onclick="car_move('left');" id="car_left">Turn left</button> </div> <div class="weui-cell__bd"> <button class="weui-btn weui-btn_primary" onclick="car_move('down');" id="car_down">back off</button> </div> <div class="weui-cell__bd"> <button class="weui-btn weui-btn_primary" onclick="car_move('right');" id="car_right">Turn right</button> </div> </div> <div class="weui-cell"> <div class="weui-cell__bd"> <button class="weui-btn weui-btn_primary" onclick="car_speed('dec');" id="speed_dec">Slow down</button> </div> <div class="weui-cell__bd"> <button class="weui-btn weui-btn_primary" onclick="car_speed('inc');" id="speed_inc">accelerate</button> </div> <div class="weui-cell__bd"> <button class="weui-btn weui-btn_primary" onclick="car_move('stop');" id="cat_stop">stop it</button> </div> </div> </div> <div class="weui-form__text-area"> <h2 class="weui-form__title">Trolley data acquisition</h2> </div> <div class="weui-cells"> <div class="weui-cell weui-cell_example"> <div class="weui-cell__hd"><i class="weui-icon-waiting weui-icon_msg"></i></div> <div class="weui-cell__bd"> <p>Trolley Speed</p> </div> <div class="weui-cell__ft" id="car_speed">35(0-100)</div> </div> <div class="weui-cell weui-cell_example"> <div class="weui-cell__hd"><i class="weui-icon-waiting weui-icon_msg"></i></div> <div class="weui-cell__bd"> <p>Ultrasonic distance</p> </div> <div class="weui-cell__ft">0(Mm)</div> </div> </div> <div class="weui-form__text-area"> <h2 class="weui-form__title">Car camera</h2> </div> <div class="weui-cells" align="center"> <canvas id="canvas-video" width="512" height="384"></canvas> </div> <script> const canvas = document.getElementById("canvas-video"); const context = canvas.getContext("2d"); // show loading notice context.fillStyle = "#333"; context.fillText("Loading...", canvas.width / 2 - 30, canvas.height / 3); setInterval(() => { socket.send("x"); }, 100); </script> </div> </div> </body> </html>