tl;dr: Das Auto sendet einzelne Kamerabilder als Bytes an ein Canvas im Controller.
...oder wie ich eine mobile Kamera baute (Teil 7)
Eine handliche Steuerung gibt es seit dem letzten Blog–Artikel schon, und jetzt geht es los mit dem Fahren per Kamera ohne Sichtkontakt. Drei wichtige Ziele erreichen wir in diesem Blog–Artikel:
Das Kamerabild soll auf dem Controller sichtbar sein.
Das Auto soll sich allein per Kamerabild steuern lassen.
Die Steuersoftware auf dem Auto soll automatisch gestartet werden, sobald der Raspberry Pi eingeschaltet wird.
Es bleiben natürlich wie immer Wünsche offen. Der Code ist ein Kompromiss aus Performance, Lesbarkeit, Komplexität und meiner Zeit für das Projekt. Fühle dich frei, damit dein Projekt zu starten und ihn zu verbessern.
Übrigens gibt es bereits einen anderen Raspberry Pi IP Camera Blog–Artikel. Der wichtigste Unterschied ist, dass dort der Raspberry Pi der Web–Server ist. Dass heißt man muss mit dem Browser direkt auf den Raspberry zugreifen (können). Firewalls und Subnetze sind da ein Hindernis. Hier sind das Auto und der Controller beide Clients und treffen sich auf einem separaten Server, wie in diesem Blog–Artikel erklärt wird.
Damit der Code in dem Artikel leichter zu lesen ist und sich besser kopieren lässt, besteht er aus wenigen großen Dateien. Meinem Drang, Dateien herauszutrennen anstatt Code zu kopieren, konnte ich gerade widerstehen, damit du beim Lesen nicht ständig hin und her springen musst. Normalerweise empfehle ich eine andere Dateistruktur.
Car.py Service
Auto einschalten, einloggen per SSH, car.py starten, endlich losfahren… das nervt. Daher soll die Steuersoftware als Service (aka Daemon) im Hintergrund laufen und automatisch beim Einschalten gestartet werden. Wie in dieser Dokumentation beschrieben, erstellt man einen Service mit einer speziellen Datei in /etc/systemd/system/. Danach muss man den Dienst noch aktivieren mit systemctl enable car.
# place in /etc/systemd/system/
# and systemctl enable car
# check with systemctl status car
# control with systemctl start/stop car
[Unit]
Description=Raspberry Pi Car Client
After=network.target
[Service]
# check path with `which python3`
# replace the IP by domain if available
ExecStart=/usr/bin/python3 -u car.py http://192.168.0.103:3000
WorkingDirectory=/home/pi/socket.io/
StandardOutput=inherit
StandardError=inherit
Restart=always
RestartSec=5
User=pi
[Install]
WantedBy=multi-user.target
Generalprobe am Emulator
Mit einem lokalen Emulator entwickelt es sich viel schneller als auf dem Raspberry. Außerdem ist der Code weniger komplex und daher ein gute Einstieg. Ein paar Design–Entscheidungen vorweg, die im Code ein Rolle spielen:
Die Bildübertragung wird gestoppt, wenn das Auto nicht gesteuert wird.
Mit einem Blick zum Auto sieht man, ob die Kamera benutzt wird (per LED).
Als Controller nehme ich ein Handy mit kleinem Display und eher schmalbandiger Datenleitung an.
In der aktuellen Implementierung wird jeder Frame als einzelnes JPEG als Socket.IO Event übertragen. Einen Videocodec verwende ich nicht, obwohl damit die Bildqualität bei gleicher Datenrate höher sein müsste.
Die Steuersoftware (bzw. der Emulator) hat nun zwei Threads, die parallel laufen: einer empfängt und verarbeitet die Befehle vom Controller, der andere sendet ein Bild von der Kamera nach dem anderen. Die Synchronisation zwischen den Threads ist minimal: kam ein Stop–Befehl, soll nach 10 Sekunden kein Bild mehr übertragen werden.
An meinem Laptop und an dem Raspberry Pi besitzt die Kamera eine Status LED, die die Nutzung der Kamera anzeigt. Wenn man den Zugriff auf die Kamera stoppt, erlischt die LED. Damit haben wir eine fertige Status LED, indem wir die Kamera effizient nutzen – perfekt.
Hier kommt der Emulator–Code. Für die eigentliche Steuersoftware können wir fast alle Teile wiederverwenden.
import cv2
import socketio
import sys
import threading
import time
class AtomicInteger:
"""
thread-safe integer
thanks to https://stackoverflow.com/questions/23547604/python-counter-atomic-increment
"""
def __init__(self, value=0):
self._value = value
self._lock = threading.Lock()
@property
def value(self):
with self._lock:
return self._value
@value.setter
def value(self, v):
with self._lock:
self._value = v
return self._value
class Camera:
"""
camera to be enabled/disabled and auto-calibrated
"""
def __init__(self):
self._camera = None
def _enableUnlessEnabled(self):
if self._camera == None:
self._camera = cv2.VideoCapture(0)
def disable(self):
self._camera = None
def capture(self):
# power up camera
self._enableUnlessEnabled()
# Grab a single frame of video
ret, frame = self._camera.read()
# Resize frame of video to 1/4 size
frameSmall = cv2.resize(frame, (0, 0), fx=0.25, fy=0.25)
# encode data as JPEG
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50]
frameJpeg = cv2.imencode('.jpg', frameSmall, encode_param)
# get bytes
frameBytes = frameJpeg[1].tobytes()
return frameBytes
class VideoStreamer:
"""
Meant to be run by background worker
to send camera captures to clients.
"""
def __init__(self, sio):
self._sio = sio
self._camera = Camera()
self._stopAtTime = AtomicInteger(0)
@property
def stopAtTime(self):
"""
point in time after which no frames are sent
"""
return self._stopAtTime.value
@stopAtTime.setter
def stopAtTime(self, seconds):
self._stopAtTime.value = seconds
def stream(self):
# limit frame rate to upper bound
maxFps = 5
minSendingTime = 1/maxFps
while True:
now = time.time()
if self.stopAtTime > round(now):
self._sio.emit('video', self._camera.capture())
sendingTime = time.time() - now
delay = minSendingTime - sendingTime
if delay > 0:
# wait before sending next frame
# to stay at maximal frame rate
time.sleep(delay)
else:
self._camera.disable()
# busy-waiting: works for now, is not efficient
time.sleep(1)
sio = socketio.Client()
videoStreamer = VideoStreamer(sio)
@sio.event
def connect():
print('connected to server')
@sio.event
def command(cmd):
isStopped = cmd == 'stop'
# stop video stream after 10 s (unless car is moving)
videoStreamer.stopAtTime = round(time.time()) + (10 if isStopped else 99999999)
print('command: ', cmd)
@sio.event
def disconnect():
print('disconnected from server')
if __name__ == '__main__':
try:
sio.connect(sys.argv[1] if len(sys.argv) > 1 else 'http://localhost:3000')
videoStreamWorker = threading.Thread(target=videoStreamer.stream, args=(), daemon=True)
videoStreamWorker.start()
sio.wait()
except KeyboardInterrupt:
print('bye')
Video–Stream darstellen
Unsere Bildfolge kommt als Bytes am Controller an und muss angezeigt werden… wenn der Server sie auch sendet. Alle Bilder werden (noch) über den Server umgeleitet; als Binärdaten.
const express = require('express');
const app = express();
const http = require('http').createServer(app);
const io = require('socket.io')(http);
app.get('/', (req, res) => {
res.sendFile(__dirname + '/static/controller.html');
});
app.use('/', express.static('static'));
io.on('connection', socket => {
console.log('something connected');
// send command to car to indicate that controller is ready (starts video stream)
socket.broadcast.binary(false).volatile.emit('command', 'stop')
// broadcast commands
socket.on('command', cmd => socket.broadcast.binary(false).volatile.emit('command', cmd));
socket.on('video', frame => socket.broadcast.binary(true).volatile.emit('video', frame));
// stop on disconnect
socket.on('disconnect', () => socket.broadcast.binary(false).volatile.emit('command', 'stop'));
});
http.listen(3000, () => {
console.log('listening on *:3000');
});
Die JPEG–Bytes werden unverändert übertragen und insbesondere nicht in Base64 kodiert. Wenn die Bytes empfangen werden, wird direkt ein JPEG daraus erzeugt. Das JPEG wird anschließend skaliert, in ein Canvas gezeichnet und wieder gelöscht. Damit lässt sich ein flüssiges Video darstellen.
Die folgende car.py lässt sich auf dem Raspberry Auto ausführen. Die meisten Teile sind die aus Teil 5 (abgesehen von winzigen Änderungen) oder aus dem Emulator in diesem bereits bekannt. Nur die Klasse Camera ist neu.
import cv2
import socketio
import RPi.GPIO as GPIO
from picamera import PiCamera
from picamera.array import PiRGBArray
import time
import sys
import threading
import io
print("""
Socket.io client for the car.
Connects to server and receives commands and events.
""")
class AtomicInteger:
"""
thread-safe interger
thanks to https://stackoverflow.com/questions/23547604/python-counter-atomic-increment
"""
def __init__(self, value=0):
self._value = value
self._lock = threading.Lock()
@property
def value(self):
with self._lock:
return self._value
@value.setter
def value(self, v):
with self._lock:
self._value = v
return self._value
class Camera:
"""
camera to be enabled/disabled and auto-calibrated
"""
def __init__(self):
self._camera = None
self._width = 320
self._height = 256
# camera is attached to car upside down
self._rotationMatrix = cv2.getRotationMatrix2D(
(self._width/2, self._height/2), 180, 1.0)
def _enableUnlessEnabled(self):
if self._camera == None:
self._camera = PiCamera()
self._camera.resolution = (self._width, self._height)
self._camera.framerate = 60
# https://picamera.readthedocs.io/en/release-1.13/recipes1.html#capturing-consistent-images
# high ISO for lower shutter speed thus better images while moving
self._camera.iso = 1200
# Wait for the automatic gain control to settle
time.sleep(2)
# Now fix the values
self._camera.shutter_speed = self._camera.exposure_speed
self._camera.exposure_mode = 'off'
awb_gains = self._camera.awb_gains
self._camera.awb_mode = 'off'
self._camera.awb_gains = awb_gains
def disable(self):
if self._camera != None:
self._camera.close()
self._camera = None
def capture(self):
self._enableUnlessEnabled()
# capture frame
capture = PiRGBArray(self._camera, size=(self._width, self._height))
self._camera.capture(capture, format="bgr")
frame = capture.array
# rotate (since camera is attached upside down)
frameRotated = cv2.warpAffine(frame, self._rotationMatrix, (self._width, self._height))
# compress
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50]
frameJpeg = cv2.imencode('.jpg', frameRotated, encode_param)
# get bytes
frameBytes = frameJpeg[1].tobytes()
return frameBytes
class VideoStreamer:
"""
Meant to be run by background worker
to send camera captures to clients.
"""
def __init__(self, sio):
self._sio = sio
self._camera = Camera()
self._stopAtTime = AtomicInteger(0)
@property
def stopAtTime(self):
return self._stopAtTime.value
@stopAtTime.setter
def stopAtTime(self, seconds):
self._stopAtTime.value = seconds
def stream(self):
maxFps = 5
minSendingTime = 1/maxFps
while True:
now = time.time()
if self.stopAtTime > round(time.time()):
self._sio.emit('video', self._camera.capture())
sendingTime = time.time() - now
delay = minSendingTime - sendingTime
if delay > 0:
time.sleep(delay)
else:
self._camera.disable()
time.sleep(1)
class Wheel:
"""
wheel(s) on one side of the car
"""
def __init__(self, enPin: int, inForewardPin: int, inBackwardPin: int):
GPIO.setup(inForewardPin, GPIO.OUT)
self.inForewardPin = inForewardPin
GPIO.output(inForewardPin, False)
self.inBackwardPin = inBackwardPin
GPIO.setup(inBackwardPin, GPIO.OUT)
GPIO.output(inBackwardPin, False)
GPIO.setup(enPin, GPIO.OUT)
self.pwm = GPIO.PWM(enPin, 100)
self.pwm.start(0)
self.pwm.ChangeDutyCycle(0)
def setSpeed(self, speed: int):
"""
speed might be -3, -2, -1, 0, 1, 2, 3
"""
force = min(3, abs(speed))
isForewards = speed > 0
if (isForewards):
GPIO.output(self.inBackwardPin, False)
GPIO.output(self.inForewardPin, True)
else:
GPIO.output(self.inForewardPin, False)
GPIO.output(self.inBackwardPin, force > 0)
if force > 0:
self.pwm.ChangeDutyCycle(70 + (10 * force))
else:
self.pwm.ChangeDutyCycle(0)
if __name__ == '__main__':
GPIO.setmode(GPIO.BOARD)
RightWheels = Wheel(3, 5, 7)
LeftWheels = Wheel(8, 10, 12)
def drive(speedLeft, speedRight):
LeftWheels.setSpeed(speedLeft)
RightWheels.setSpeed(speedRight)
sio = socketio.Client()
videoStreamer = VideoStreamer(sio)
try:
@sio.event
def connect():
# a LED would be handy
drive(0, 0)
@sio.event
def command(data):
isStopped = data == 'stop'
# stop video stream after 10 s (unless car is moving)
videoStreamer.stopAtTime = round(time.time()) + (10 if isStopped else 99999999)
if data == 'left':
drive(-1, 1)
elif data == 'right':
drive(1, -1)
elif data == 'foreward':
drive(3, 3)
elif data == 'backward':
drive(-3, -3)
elif data == 'stop':
drive(0, 0)
else:
print("unknown command '%s'" % data)
drive(0, 0)
@sio.event
def disconnect():
# a LED would be handy
drive(0, 0)
sio.connect(sys.argv[1] if len(sys.argv) > 1 else 'http://localhost:3000')
videoStreamWorker = threading.Thread(target=videoStreamer.stream, args=(), daemon=True)
videoStreamWorker.start()
sio.wait()
except KeyboardInterrupt:
LeftWheels.setSpeed(0)
RightWheels.setSpeed(0)
GPIO.cleanup()
print('bye')
Das Auto lässt sich jetzt ohne Sichtverbindung nur über die Kamera steuern. Ziel erreicht – oder sagen wir fast. Auch bei starker Kompression bekomme ich von dem Auto nur bescheidene 2,2 fps. Eine Fahrt ist damit eine Herausforderung. Verantwortlich ist fast ausschließlich der Abruf des Frames von der Kamera mit camera.capture. Entweder muss ich die Kamera effizienter ansteuern, nicht das billigste Kamera–Modul verwenden, oder es ist nicht möglich. Das will ich noch herausfinden.
Gerne kannst du mir schreiben bei Fragen oder Tipps. Danke für's Lesen und viel Spaß beim Basteln.
Dein Besuch auf unserer Website produziert laut der Messung auf websitecarbon.com nur 0,28 g CO₂.