Raspberry Video Camera – Teil 23: Verbesserung durch ROI und Aufnahmezeitbegrenzung

Auch ein gutes Programm lässt sich weiter verbessern und zwei solche Verbesserungen für die selbstlernende Farberkennung schlage ich hier vor: ROI steht hier nicht für „Return on Investment“ sondern für Region of Interest. Und das bedeutet, dass wir für die Farberkennung nicht mehr das gesamte Bild verwenden, sondern nur eine bestimmte Region, in der wir das Auftauchen eines Objekts vermuten. Das ist Thema 1 und die zweite Verbesserung führt eine generelle Aufnahmezeitbegrenzung für Videos ein um endlos lange Recordings zu vermeiden. Beide Verbesserungen können hilfreich sein, sind aber sicher nicht in jeder Umgebung sinnvoll.

Zuerst aber wieder ein Oachkatzl-Video. Mehr davon gibts in meinem YouTube-Kanal.

Region of Interest (ROI)

Eine Region of Interest lässt sich auch beim PIR-Sensor definieren – durch Abkleben mit Tesaband.

Wir haben eine fest installierte Kamera, die immer die gleiche Umgebung filmt und dabei auf das Erscheinen eines bestimmten Objekts wartet. In meinem Fall auf Eichhörnchen, wobei aber auch andere Tiere vor die Kamera kommen. Wenn wir nun den Bereich für die Farbauswertung verkleinern würden und nicht mehr das gesamte Kamerabild auswerten, dann hätte das folgende Vorteile:

  • Ein Objekt (das Eichhörnchen) wird flächenmäßig in Relation zum ROI größer ausfallen, als zum Gesamtbild.
  • Der Hintergrund wird kleiner und damit mögliche Fehlsignale geringer.
  • Ein kleineres Bild ist schneller analysiert, damit steigt die Verarbeitungsgeschwindigkeit.

Das funktioniert natürlich nur, wenn man tatsächlich sagen kann, dass das Zielobjekt nur in einem ganz bestimmten Bildteil vorkommen wird. Bei mir ist das einfach, ein Eichhörnchen wird sich in der Regel oberhalb der Stange aufhalten, von kleineren Abhängaktionen in Richtung der aufgehängten Nuss einmal abgesehen. Ich kann als ROI also getrost einen schmalen Streifen oberhalb der Stange annehmen.

Im Beispielbild lässt sich gut erkennen, dass das Eichhörnchen mit seinem Körper sicher ein Viertel des rot umrandeten ROI-Bereichs einnimmt. Auf die Gesamtfläche des Bildes bezogen, wäre es nur etwa ein Zwölftel.

Wie gesagt, eine Region of Interest zu definieren bringt Vorteile, aber es ist nur dann sinnvoll, wenn man den Erkennungsbereich wirklich eingrenzen kann. Bei einer Wildkamera im Wald dürfte es schwierig sein, weil weder bekannt ist, welche Tiere kommen werden, noch aus welcher Richtung.

Programmierung

Zuerst wird im einleitenden Programmteil die Region definiert. Die Bestimmung der Koordinaten muss leider von Hand erfolgen, zum Beispiel mit einem Bildverarbeitungsprogramm:

roi = {'x1': 0,                     # region of interest: upper left corner
       'y1': 100,
       'x2': 600,                   # lower right corner
       'y2': 220}

Ich mache das mal mit einem Python-Dictionary, so wird (im Unterschied zu einer Liste) die Bedeutung der Zahlen klar. x1/y1 stehen für die obere linke Ecke des Rechtecks und x2/y2 für die untere rechte Ecke. Wobei der Koordinatenursprung oben links liegt. Dann geht es in den Programmteil imageAnalyzer, dort suchen wir die Zeile mit der Farbraumkonvertierung nach HSV und setzen die Zeile mit Ausschneiden des ROI-Bereichs (fett) davor:

img = img[roi['y1']:roi['y2'], roi['x1']:roi['x2']]         # reduce image to roi 
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)                  # convert BGR to HSV

Die Zeile macht nichts anderes als zahlenmäßig einen Bereich des Bildarrays img auszuschneiden und den wieder unter dem Variablennamen img zu speichern. Und das war es schon, jede weitere Analyse passiert nur über den ROI-Bereich und das ein gutes Stückchen schneller wegen der geringeren Größe.

Begrenzung der Aufnahmezeit

Kombinationstrigger und selbstlernende Farberkennung funktionieren beide so, dass die Videoaufnahme dann gestartet werden, wenn beide Trigger (Farberkennung und Hardware-Bewegungssensor) gleichzeitig auslösen. Abgeschaltet wird die Aufnahme – nach einer gewissen Nachlaufzeit – wenn beide Trigger kein Signal mehr liefern.  So weit – so gut. Das kann aber bei sehr bewegten Hintergründen dazu führen, dass der PIR-Bewegungssensor bei laufender Aufnahme ständig wieder Signale liefert und so das Video nie (bzw. erst nach langer Zeit) abschaltet. Das erzeugt große Dateien mit wenig brauchbarem Inhalt. Wenn wir nun sagen könnten, dass brauchbare Videoaufnahmen eh nur über eine gewisse Zeit laufen, dann könnte man eine Zwangsabschaltung einbauen um Speicherplatz zu sparen.

Bei Tieren, die durch Futter angelockt werden, ist das möglich. Da knuspert so ein Eichhörnchen schon mal 2 oder 3 Minuten an ein Nussstückchen hin, aber viel länger dauert das auch nicht. Und selbst wenn, so würde ein Video, in dem über 5 Minuten nur eine Nuss verspeist wird, auf YouTube recht langweilig wirken. In meinem Anwendungsfall, kann ich problemlos die Aufnahmezeit auf sagen wir mal 6 Minuten begrenzen. Wem schon mal die SD-Karte mit Videos vollgelaufen ist, der kann das sicher nachvollziehen.

Aber auch hier gilt: Bei anderen Szenarien schaut es vielleicht anders aus und wenn der Schwerpunkt darin liegt, möglichst gar nichts zu verpassen, dann ist eine Zeitbegrenzung eher kontraproduktiv.

Programmierung

Auch hier legen wir am Programmanfang erst einmal die gewünschte Maximalzeit fest.

maxVideoDuration = 360              # force stop of recording if this duration exceeds

Die Zeit wird hier in Sekunden angegeben, 360 Sekunden wären also 6 Minuten. Die Überwachung der Zeit erfolgt dann im Programmteil triggerGenerator. Hier die komplette Klasse mit den Ergänzungen in Fettschrift:

# Combines PIR- and ObjectDetection-Trigger and generates trigger signal for the camera
class triggerGenerator:
  def __init__(self):
    self.triggered = False                      # Trigger currently active?
    self.triggerStart = datetime.datetime.now() # stores start time of current trigger
    self.lastTrigger = datetime.datetime.now()  # stores last time of trigger detection

  def trigger(self, ObjDet, Pir):
    now = datetime.datetime.now()
    if self.triggered:
# Trigger is already active
      if ObjDet or Pir:
# One or both trigger (imageAnalyzer and PIR) still high
# prolongation of timeout
        self.lastTrigger = datetime.datetime.now()
# Maximum duration of trigger exceeded?
# Remove trigger signal (zero length file)
        if (now -self.triggerStart).total_seconds() > maxVideoDuration:
          triggerFiles = [f for f in os.listdir(triggerPath) if f.endswith(triggerFileExt)]
          for f in triggerFiles:
            os.remove(triggerPath+f)
          self.triggered = False
          print('Video stopped - max. duration exceeded')
      else:
# Both trigger (imageAnalyzer and PIR) are low
# Remove trigger signal (zero length file)
        if (now - self.lastTrigger).total_seconds() > triggerTimeout:
          triggerFiles = [f for f in os.listdir(triggerPath) if f.endswith(triggerFileExt)]
          for f in triggerFiles:
            os.remove(triggerPath+f)
          self.triggered = False
          print('Video stopped')
    else:
# Trigger is inactive    
      if ObjDet and Pir:
# Both trigger (imageAnalyzer and PIR) high
# Set trigger signal (zero length file) and use the earlier timestamp
        open(triggerPath + min(ObjDet, Pir) + triggerFileExt, 'w').close()
        self.triggered = True
        self.lastTrigger = datetime.datetime.now()
        self.triggerStart = datetime.datetime.now()
        print('Video started')

self.lastTrigger speichert immer die Zeit, zu der eine Triggerdatei geschrieben wurde. So lange der Trigger nun aktiv ist, wird bei jedem Durchlauf geprüft, ob die vorgegebene Maximalzeit maxVideoDuration bereits abgelaufen ist. Wenn das passiert, wird die Aufnahme abgebrochen in dem die Triggerdatei gelöscht wird. Es braucht dann erst wieder beide Signale (Farberkennung und PIR) damit die Aufnahme neu beginnt.

Gesamtprogramm analyze3.py

Ich lasse den Dateinamen unverändert, dann braucht es auch keine Anpassung in der /etc/rc.local und liste hier noch einmal das ganze Programm:

import io
import os
import time
import datetime
import cv2
import numpy as np
import RPi.GPIO as GPIO

samplePath = "samples/"             # path to sample images
hueBins = 30                        # histogram bins for hue
satBins = 30                        # histogram bins for saturation
simili = 0.5                        # similarity limit, smaller values are considered similar
                                    # greater values different

imagePath = "/tmp/"                 # path to image and timestamp files
tsExt = ".sig"                      # file extension of timestamp file
imageFile = "record.jpg"            # name of image file

alpha = 0.075                       # amount of influence of a single value to the computed average
sigma = 9.0                         # value to multipy volatility with for a higher threshold
roi = {'x1': 0,                     # region of interest: upper left corner
       'y1': 100,
       'x2': 600,                   # lower right corner
       'y2': 220}
minPix = 275                        # minimum pixel to detect
sunset = 60                         # lower brightnes switches to night
sunrise = 65                        # higher brightness switches to day
bProThresh = 50                     # back projection threshold (0-255), lower values will be ignored
histLifeTime = datetime.timedelta(minutes = 15) # histogram lifetime, recalc if timed out
objectDetTriggerTO = 10             # timeout for object detection trigger

triggerTimeout = 20                 # min. trigger duration, time extends, when triggered again
maxVideoDuration = 360              # force stop of recording if this duration exceeds
triggerFileExt = '.trg'             # file extension for trigger signal file
triggerPath = 'trigger/'            # path for trigger signal file

GPIO_PIR = 12                       # used pin for PIR motion detector


# Initial learning function
# Reads sample images and calculates combined histograms
def initialLearn():
  print('Initial learning from sample images')

  histos = {}
  connections = []

  print('Reading sample images from folder: '+samplePath)

# Read sample images and calc histograms
  for f in os.listdir(samplePath):                              # all files in directory
    if not os.path.isfile(os.path.join(samplePath, f)):         # skip directories
      continue
    name = os.path.splitext(f)[0]                               # get name without extension
    img = cv2.imread(samplePath+f,1)                            # load image
    hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)                  # convert to HSV
    hist = cv2.calcHist([hsv],[0,1],None,[hueBins,satBins],[0,180, 0,256]) # calc histogram satu + hue
    hist[0,0]=0                                                 # delete background color (black))
    cv2.normalize(hist,hist,0,100,cv2.NORM_MINMAX)              # normalize values to 100

# Build a connection list from distances between each sample
# Distance is a measure of the similarity of two histograms computed from histogram comparison
# with hellinger method: 0 = equal and 1 = total different
    if len(histos) > 0:
      for hi in histos:                                         # with each already existing histogram
        diff = cv2.compareHist(hist,histos[hi],cv2.HISTCMP_HELLINGER) # compare two histograms
        connections.append([diff,name,hi])                      # and store [diff, partner a, partner b]
                                                                # in connection list
    histos[name] = hist                                         # finally store histogram (name is key)

  print('{0} sample images loaded - now combining similar images ...\n'.format(len(histos)))

# Sort list of connections (nearest first)  
  connections.sort(key=lambda c: c[0])                          # sort on distance

# combine most similar histograms to get a shorter histogram list
  while connections[0][0] < simili and len(histos) > 2:         # while small distances
    a = connections[0][1]                                       # partner a
    b = connections[0][2]                                       # partner b
    print("Similarity: {0:1.3f} combine {1} with {2}" \
          .format(connections[0][0],a,b))
    connections[:] = [x for x in connections if x[1]!=a and x[1]!=b and \
                      x[2]!=a and x[2]!=b]                      # clear a and b from list
#    hist = histos[a] + histos[b]                                # add histograms
    hist = np.maximum(histos[a], histos[b])                     # combine histogram max values
    name = a + b                                                # combine names
    del histos[a]                                               # delete a and b histograms
    del histos[b]
    for hi in histos:                                           # calc new connections
      diff = cv2.compareHist(hist,histos[hi],cv2.HISTCMP_HELLINGER)
      connections.append([diff,name,hi])
    histos[name] = hist                                         # store combined histogram 
    connections.sort(key=lambda c: c[0])                        # sort again on distance

  print('\nCombined to {0} detector histograms\n'.format(len(histos)))

# Normalize all remaining histograms
  for h in histos:
    cv2.normalize(histos[h],histos[h],0,100,cv2.NORM_MINMAX)    # adjust values between 0 and 100
  return(histos)  



class pir:
  def __init__(self, pin):
    self.timestamp = ''
    GPIO.setmode(GPIO.BOARD)
# Set pin as input
    GPIO.setup(pin,GPIO.IN)
# Bind event to callbac function
    GPIO.add_event_detect(pin, GPIO.BOTH, self.PIR_callback)

  def PIR_callback(self, pin):
    triggertime = datetime.datetime.now()
    if GPIO.input(pin) == 1:
      self.timestamp = triggertime.strftime("%Y-%m-%d-%H-%M-%S")
      print('PIR-Trigger active')
    else:
      print('PIR-Trigger cleared')
      self.timestamp = ''

  def triggered(self):
    return (self.timestamp)



class imageLoader:
  def __init__(self):
    self.timeout = datetime.timedelta(0, 1)                     # default 1sec
    self.lastTsTime = datetime.datetime.now() - self.timeout    # time of last timestamp
    self.lastTs = ""                                            # last timestamp as a string

  def readTimeStamp(self):
# Read names of all timestamp files, take the first (should be only one) and isolate timestamp
    tsFiles = [f for f in os.listdir(imagePath) if f.endswith(tsExt)]
    if len(tsFiles) > 0:
      return tsFiles[0][0:-4]
    else:
      return False

  def getImg(self):
# sleep until next expected image file, wait for valid time stamp file and then read image file
# adjust sleep time until next image (+/- 0.01sec)
    wTime = (self.timeout - (datetime.datetime.now() - self.lastTsTime)).total_seconds()
    if wTime > 0:
      print("\n",wTime)
      time.sleep(wTime)
    ts=self.readTimeStamp()
    if not ts or (ts == self.lastTs):
      self.timeout += datetime.timedelta(milliseconds=10)
      time.sleep(0.01)
      ts=self.readTimeStamp()
      while not ts or (ts == self.lastTs):
        time.sleep(0.01)
        ts=self.readTimeStamp()
    else:
      self.timeout -= datetime.timedelta(milliseconds=10)
    self.lastTsTime = datetime.datetime.now()
    self.lastTs = ts
    return (ts, cv2.imread('/tmp/record.jpg',1))


# Single detector (holds one histogram)
class detector:
  def __init__(self, name, hist):
    self.name = name                              # histogram name
    self.positiveHist = hist                      # histogram of sample positive images
    self.hist = np.copy(self.positiveHist)        # histogram to work with (initially copy of above) 
    self.avgPix = -1                              # average number of detected pixel
    self.avgVola = -1                             # average volatility (gap between number of 
                                                  # detected pixel and their average value)
# search for histogram colors in given hsv image
  def detect(self, hsv, ts):
    backPro = cv2.calcBackProject([hsv],[0,1],self.hist,[0,180,0,256],1)  # histogram back projection
    ret, mask = cv2.threshold(backPro, bProThresh,255,cv2.THRESH_BINARY)  # delete pixel with too low value
    pixDetected = cv2.countNonZero(mask)                        # count detected pixel

# special case: average values not yet computed or resetted due to histogram update
    if self.avgPix == -1:
      self.avgPix = pixDetected
      self.avgVola = minPix * sigma

# calc threshold (pixDetected must be higher for a valid object detection)
    movingBand = self.avgVola * sigma
    if movingBand < minPix:
      movingBand = minPix
    threshold = self.avgPix + movingBand

    print("{0} Pix: {1:5d} ({2:3.0f}% of Thresh) {3} {4}" \
      .format(ts, pixDetected, pixDetected/threshold*100, \
      "T" if pixDetected >= threshold else " ", self.name))

# no object detection because amount of detected pixels is below threshold
    if pixDetected < threshold:
# update average values
      self.avgVola = abs(pixDetected-self.avgPix)*alpha + self.avgVola*(1-alpha)
      self.avgPix = pixDetected*alpha + self.avgPix*(1-alpha)
      return(False)
    else:
# at this point we have a valid object detection
      return(True)      

# Build new detection histogram considering changed background colors
  def updateHist(self, negativeHist):
    self.hist = self.positiveHist - negativeHist              # elininate background colors 
    self.hist[self.hist<0] = 0                                # set all negative values to zero
    cv2.normalize(self.hist,self.hist,0,100,cv2.NORM_MINMAX)  # and normalize
    self.avgPix = -1                                          # reset average values
    print('Detector histogram updated', self.name)
      


# Uses multiple detectors to find object colors in the given image
class imageAnalyzer:
  def __init__(self, detectors):
    self.detectors = detectors                  # list ofimage detector histograms
    self.night = False                          # indicator for day or night
    self.timestamp = ''                         # indicates current active object detection
    self.lastTrigger = datetime.datetime.now()  # stores last time of object detection
    self.lastHistUpdate = datetime.datetime.now() - datetime.timedelta(hours = 1)

  def detect(self, img, ts):
    now = datetime.datetime.now()
# Detect whether day or night
    gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)                 # convert to grayscale
    brightness = np.average(gray)                               # calc average of all pixels
# Change from night to day or vice versa
    if self.night and brightness > sunrise: self.night = False
    if not self.night and brightness < sunset: self.night = True

    img = img[roi['y1']:roi['y2'], roi['x1']:roi['x2']]         # reduce image to roi 
    hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)                  # convert BGR to HSV

    imgDetected = False
    for d in self.detectors:                                    # loop over detector objects
      imgDetected = d.detect(hsv, ts) or imgDetected

    print()    
# no object detection because it is night or amount of detected pixels is below threshold
    if self.night or not imgDetected:
# clearing of trigger signal if timed out
      if self.timestamp:
        if (now - self.lastTrigger).total_seconds() > objectDetTriggerTO:
          self.timestamp = ''
          print('Object-Detection-Trigger cleared')
      elif not self.night:                                      # day, but no current and no previous trigger signal
        if now - self.lastHistUpdate > histLifeTime:            # histograms timed out
          negativeHist = cv2.calcHist([hsv],[0,1],None,[hueBins,satBins],[0,180,0,256])  # calc histogram satu + hue
          cv2.normalize(negativeHist,negativeHist,0,2000,cv2.NORM_MINMAX) # normalize to 2000
          for d in self.detectors:                              # loop over detector objects
            d.updateHist(negativeHist)
          self.lastHistUpdate = now
      return

# at this point we have a valid object detection
# activation or prolongation of trigger signal when new object is detected
    if not self.timestamp:
      self.timestamp = ts
      print('Object-Detection-Trigger active')
    self.lastTrigger = now
    return

  def triggered(self):
    return (self.timestamp)


# Combines PIR- and ObjectDetection-Trigger and generates trigger signal for the camera
class triggerGenerator:
  def __init__(self):
    self.triggered = False                      # Trigger currently active?
    self.triggerStart = datetime.datetime.now() # stores start time of current trigger
    self.lastTrigger = datetime.datetime.now()  # stores last time of trigger detection

  def trigger(self, ObjDet, Pir):
    now = datetime.datetime.now()
    if self.triggered:
# Trigger is already active
      if ObjDet or Pir:
# One or both trigger (imageAnalyzer and PIR) still high
# prolongation of timeout
        self.lastTrigger = datetime.datetime.now()
# Maximum duration of trigger exceeded?
# Remove trigger signal (zero length file)
        if (now -self.triggerStart).total_seconds() > maxVideoDuration:
          triggerFiles = [f for f in os.listdir(triggerPath) if f.endswith(triggerFileExt)]
          for f in triggerFiles:
            os.remove(triggerPath+f)
          self.triggered = False
          print('Video stopped - max. duration exceeded')
      else:
# Both trigger (imageAnalyzer and PIR) are low
# Remove trigger signal (zero length file)
        if (now - self.lastTrigger).total_seconds() > triggerTimeout:
          triggerFiles = [f for f in os.listdir(triggerPath) if f.endswith(triggerFileExt)]
          for f in triggerFiles:
            os.remove(triggerPath+f)
          self.triggered = False
          print('Video stopped')
    else:
# Trigger is inactive    
      if ObjDet and Pir:
# Both trigger (imageAnalyzer and PIR) high
# Set trigger signal (zero length file) and use the earlier timestamp
        open(triggerPath + min(ObjDet, Pir) + triggerFileExt, 'w').close()
        self.triggered = True
        self.lastTrigger = datetime.datetime.now()
        self.triggerStart = datetime.datetime.now()
        print('Video started')


# Main program
histos = initialLearn()                          # read sample images and learn their colors
detectors = []
for h in sorted(histos):                         # create detector objects from all histograms
  detectors.append(detector(h, histos[h]))       # and store them in a list

pi = pir(GPIO_PIR)                               # object for infrared motion sensor
il = imageLoader()                               # object to load image
ia = imageAnalyzer(detectors)                    # object to analyze image
tg = triggerGenerator()                          # object to create camera trigger file

while True:                                      # endless loop
  timeStamp, img = il.getImg()                   # wait for next image and load it
  ia.detect(img, timeStamp)                      # detect colors
  tg.trigger(ia.triggered(), pi.triggered())     # and create camera trigger file

Fazit

Nicht jeder braucht diese Verbesserungen, aber manchmal sind sie hilfreich. Wer in der Lage ist, den Bildbereich für die Farbanalyse ein wenig einzuschränken, der hat die Möglichkeit, die Erkennungsqualität zu verbessern, speziell bei kleinen Objekten. Die Aufnahmezeitbegrenzung braucht nur, wer den PIR-Sensor verwendet und bewegte Hintergründe hat. Oder wenn der PIR-Sensor gelegentlich mal spinnt. Dann ist es schön, wenn nicht gleich die ganze SD-Karte voll läuft.

 


Weitere Artikel in dieser Kategorie:

Schreiben Sie einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.