diff --git a/.gitignore b/.gitignore index 1012f7a..f41800e 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ credentials.json token.json local.db node_modules +#pycharm files +*/.idea/ diff --git a/google_cloud/setup_vm.sh b/google_cloud/setup_vm.sh index 2990e3a..a3dd04a 100644 --- a/google_cloud/setup_vm.sh +++ b/google_cloud/setup_vm.sh @@ -17,6 +17,10 @@ apt-get -y install sqlite3 pip3 install -U pip pip3 install -U psycopg2 pip3 install -U twilio +pip3 install -U numpy +pip3 install -U scipy +pip3 install -U docker + # change timezone ln -fs /usr/share/zoneinfo/America/Los_Angeles /etc/localtime diff --git a/lib/detection_policies/inception_and_threshold.py b/lib/detection_policies/inception_and_threshold.py index bf8a96e..8ca7529 100644 --- a/lib/detection_policies/inception_and_threshold.py +++ b/lib/detection_policies/inception_and_threshold.py @@ -10,7 +10,7 @@ import settings import datetime import math - +import docker class InceptionV3AndHistoricalThreshold: @@ -19,6 +19,9 @@ class InceptionV3AndHistoricalThreshold: def __init__(self, settings, args, google_services, dbManager): self.dbManager = dbManager + #start up docker daemon with tf serving + client = docker.from_env() + client.containers.run("inception_serving", detach=True, ports={'8500/tcp': 8500}, tty=True) self.prediction_service = connect_to_prediction_service(settings.server_ip_and_port) self.args = args self.google_services = google_services diff --git a/lib/gcp_helper.py b/lib/gcp_helper.py new file mode 100644 index 0000000..3b7d558 --- /dev/null +++ b/lib/gcp_helper.py @@ -0,0 +1,62 @@ +# Copyright 2018 The Fuego Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +""" + +Helper functions for Google Cloud Platform + +""" + +import grpc +import tensorflow as tf +import time + +from tensorflow_serving.apis import predict_pb2 +from tensorflow_serving.apis import prediction_service_pb2_grpc +from tensorflow.python.framework import tensor_util + +def connect_to_prediction_service(server_ip_and_port): + """ + Connect to a an inference server at given ip address and port. Server could be + a single machine or a Kubernetes cluster + :param server_ip_and_port: string with ip address followed by port (e.g. '34.82.71.243:8500') + :return: PredicitonServiceStub object + """ + # tf.app.flags.DEFINE_string('server', server_ip_and_port, 'PredictionService host:port') + # channel = grpc.insecure_channel(tf.app.flags.FLAGS.server) + channel = grpc.insecure_channel(server_ip_and_port) + # grpc.secure_channel() + return prediction_service_pb2_grpc.PredictionServiceStub(channel) + +def predict_batch(prediction_service, crops, timing=False): + """ + Run inference on a batch of predicitons + :param crops: N x H x W x 3 uint8 numpy array (e.g. set of crops for a single camera) + :return: N x 2 numpy array of smoke/nonsmoke probabilities + """ + + # Send request + # See prediction_service.proto for gRPC request/response details. + request = predict_pb2.PredictRequest() + request.model_spec.name = 'inception' + request.model_spec.signature_name = 'serving_default' + request.inputs['image_batch:0'].CopyFrom(tf.contrib.util.make_tensor_proto(crops, shape=crops.shape)) + + start = time.time() + result = prediction_service.Predict(request, 1000.0) # 1000 secs timeout + if timing: + print('Inference time {}'.format(time.time() - start)) + #convert to numpy + numpy_result = tensor_util.MakeNdarray(result.outputs["import_1/import/InceptionV3/Predictions/Reshape_1"]) + return numpy_result diff --git a/lib/rect_to_squares.py b/lib/rect_to_squares.py index c372d89..cd6b525 100644 --- a/lib/rect_to_squares.py +++ b/lib/rect_to_squares.py @@ -14,9 +14,7 @@ # ============================================================================== """ @author: Kinshuk Govil - Simple utility to break up rectangle into squares - """ import os @@ -33,7 +31,6 @@ def rect_to_squares(selectionX0, selectionY0, selectionX1, selectionY1, limitX, close enough to a square. Also, squares must meet minimium size requiremet of minSize and must be centered around the selected rectangle. All squares must fit between (0,0) and (limitX, limitY) - Returns array/list of coordinates for the squares. The coordinates are represented by 4-tuples (x0,y0,x1,y1) """ @@ -130,15 +127,12 @@ def cutBoxesOld(imgOrig, outputDirectory, imageFileName, callBackFn=None): def getSegmentRanges(fullSize, segmentSize): """Break the given fullSize into ranges of segmentSize - Divide the range (0,fullSize) into multiple ranges of size segmentSize that are equally spaced apart and have approximately 10% overlap (overlapRatio) - Args: fullSize (int): size of the full range (0, fullSize) segmentSize (int): size of each segment - Returns: (list): list of tuples (start, end) marking each segment's range """ @@ -169,18 +163,15 @@ def getSegmentRanges(fullSize, segmentSize): def cutBoxesFixed(imgOrig, outputDirectory, imageFileName, callBackFn=None): """Cut the given image into fixed size boxes - Divide the given image into square segments of 299x299 (segmentSize below) to match the size of images used by InceptionV3 image classification machine learning model. This function uses the getSegmentRanges() function above to calculate the exact start and end of each square - Args: imgOrig (Image): Image object of the original image outputDirectory (str): name of directory to store the segments imageFileName (str): nane of image file (used as segment file prefix) callBackFn (function): callback function that's called for each square - Returns: (list): list of segments with filename and coordinates """ @@ -233,4 +224,4 @@ def test(): # for testing if __name__=="__main__": - test() + test() \ No newline at end of file diff --git a/sample_settings.py b/sample_settings.py index 7187339..6d8e663 100644 --- a/sample_settings.py +++ b/sample_settings.py @@ -23,6 +23,8 @@ downloadDir = 'XXX/orig' archive_storage_bucket = "fuego-firecam-a" +server_ip_and_port = 'localhost:8500' #depends on the specific inference server running on GCP + teamDriveID = '0ADX9uPkOmsDJUk9PVA' # Fuego Smoke allPictures = '1SCXFLE25EbQUQMvTcWfeHFU29qxN8WMM' # Pictures - Samples and Full Sets # smokePictures = '1jq9p2A5BVLh1oWKktpV1oaWTVUU9KmNJ' # Smoke diff --git a/smoke-classifier/detect_fire.py b/smoke-classifier/detect_fire.py index 532a303..1c6d214 100644 --- a/smoke-classifier/detect_fire.py +++ b/smoke-classifier/detect_fire.py @@ -31,27 +31,26 @@ import settings settings.fuegoRoot = fuegoRoot import collect_args -import rect_to_squares import goog_helper os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # quiet down tensorflow logging (must be done before tf_helper) -import tf_helper import db_manager import email_helper import sms_helper import img_archive + import logging import pathlib import tempfile import shutil import time, datetime, dateutil.parser import random -import re import hashlib from urllib.request import urlretrieve -import tensorflow as tf from PIL import Image, ImageFile, ImageDraw, ImageFont +from detection_policies.inception_and_threshold import InceptionV3AndHistoricalThreshold ImageFile.LOAD_TRUNCATED_IMAGES = True +import numpy as np def getNextImage(dbManager, cameras, cameraID=None): @@ -92,7 +91,7 @@ def getNextImage(dbManager, cameras, cameraID=None): # skip to next camera return getNextImage(dbManager, cameras) camera['md5'] = md5 - return (camera['name'], timestamp, imgPath, md5) + return (camera['name'], timestamp, imgPath) getNextImage.tmpDir = None # XXXXX Use a fixed stable directory for testing @@ -139,168 +138,12 @@ def getNextImageFromDir(imgDirectory): getNextImageFromDir.index = -1 getNextImageFromDir.tmpDir = None - -def segmentImage(imgPath): - """Segment the given image into sections to for smoke classificaiton - - Args: - imgPath (str): filepath of the image - - Returns: - List of dictionary containing information on each segment - """ - img = Image.open(imgPath) - ppath = pathlib.PurePath(imgPath) - segments = rect_to_squares.cutBoxes(img, str(ppath.parent), imgPath) - img.close() - return segments - - -def recordScores(dbManager, camera, timestamp, segments, minusMinutes): - """Record the smoke scores for each segment into SQL DB - - Args: - dbManager (DbManager): - camera (str): camera name - timestamp (int): - segments (list): List of dictionary containing information on each segment - """ - dt = datetime.datetime.fromtimestamp(timestamp) - secondsInDay = (dt.hour * 60 + dt.minute) * 60 + dt.second - - dbRows = [] - for segmentInfo in segments: - dbRow = { - 'CameraName': camera, - 'Timestamp': timestamp, - 'MinX': segmentInfo['MinX'], - 'MinY': segmentInfo['MinY'], - 'MaxX': segmentInfo['MaxX'], - 'MaxY': segmentInfo['MaxY'], - 'Score': segmentInfo['score'], - 'MinusMinutes': minusMinutes, - 'SecondsInDay': secondsInDay - } - dbRows.append(dbRow) - dbManager.add_data('scores', dbRows) - - -def postFilter(dbManager, camera, timestamp, segments): - """Post classification filter to reduce false positives - - Many times smoke classification scores segments with haze and glare - above 0.5. Haze and glare occur tend to occur at similar time over - multiple days, so this filter raises the threshold based on the max - smoke score for same segment at same time of day over the last few days. - Score must be > halfway between max value and 1. Also, minimum .1 above max. - - Args: - dbManager (DbManager): - camera (str): camera name - timestamp (int): - segments (list): Sorted List of dictionary containing information on each segment - - Returns: - Dictionary with information for the segment most likely to be smoke - or None - """ - # enable the next few lines fakes a detection to test alerting functionality - # maxFireSegment = segments[0] - # maxFireSegment['HistAvg'] = 0.1 - # maxFireSegment['HistMax'] = 0.2 - # maxFireSegment['HistNumSamples'] = 10 - # return maxFireSegment - - # segments is sorted, so skip all work if max score is < .5 - if segments[0]['score'] < .5: - return None - - sqlTemplate = """SELECT MinX,MinY,MaxX,MaxY,count(*) as cnt, avg(score) as avgs, max(score) as maxs FROM scores - WHERE CameraName='%s' and Timestamp > %s and Timestamp < %s and SecondsInDay > %s and SecondsInDay < %s - GROUP BY MinX,MinY,MaxX,MaxY""" - - dt = datetime.datetime.fromtimestamp(timestamp) - secondsInDay = (dt.hour * 60 + dt.minute) * 60 + dt.second - sqlStr = sqlTemplate % (camera, timestamp - 60*60*int(24*3.5), timestamp - 60*60*12, secondsInDay - 60*60, secondsInDay + 60*60) - # print('sql', sqlStr, timestamp) - dbResult = dbManager.query(sqlStr) - # if len(dbResult) > 0: - # print('post filter result', dbResult) - maxFireSegment = None - maxFireScore = 0 - for segmentInfo in segments: - if segmentInfo['score'] < .5: # segments is sorted. we've reached end of segments >= .5 - break - for row in dbResult: - if (row['minx'] == segmentInfo['MinX'] and row['miny'] == segmentInfo['MinY'] and - row['maxx'] == segmentInfo['MaxX'] and row['maxy'] == segmentInfo['MaxY']): - threshold = (row['maxs'] + 1)/2 # threshold is halfway between max and 1 - # Segments with historical value above 0.8 are too noisy, so discard them by setting - # threshold at least .2 above max. Also requires .7 to reach .9 vs just .85 - threshold = max(threshold, row['maxs'] + 0.2) - # print('thresh', row['minx'], row['miny'], row['maxx'], row['maxy'], row['maxs'], threshold) - if (segmentInfo['score'] > threshold) and (segmentInfo['score'] > maxFireScore): - maxFireScore = segmentInfo['score'] - maxFireSegment = segmentInfo - maxFireSegment['HistAvg'] = row['avgs'] - maxFireSegment['HistMax'] = row['maxs'] - maxFireSegment['HistNumSamples'] = row['cnt'] - - return maxFireSegment - - -def collectPositves(service, imgPath, origImgPath, segments): - """Collect all positive scoring segments - - Copy the images for all segments that score highter than > .5 to google drive folder - settings.positivePictures. These will be used to train future models. - Also, copy the full image for reference. - - Args: - imgPath (str): path name for main image - segments (list): List of dictionary containing information on each segment - """ - positiveSegments = 0 - ppath = pathlib.PurePath(origImgPath) - imgNameNoExt = str(os.path.splitext(ppath.name)[0]) - origImg = None - for segmentInfo in segments: - if segmentInfo['score'] > .5: - if imgPath != origImgPath: - if not origImg: - origImg = Image.open(origImgPath) - cropCoords = (segmentInfo['MinX'], segmentInfo['MinY'], segmentInfo['MaxX'], segmentInfo['MaxY']) - croppedOrigImg = origImg.crop(cropCoords) - cropImgName = imgNameNoExt + '_Crop_' + 'x'.join(list(map(lambda x: str(x), cropCoords))) + '.jpg' - cropImgPath = os.path.join(str(ppath.parent), cropImgName) - croppedOrigImg.save(cropImgPath, format='JPEG') - croppedOrigImg.close() - if hasattr(settings, 'positivePicturesDir'): - destPath = os.path.join(settings.positivePicturesDir, cropImgName) - shutil.copy(cropImgPath, destPath) - else: - goog_helper.uploadFile(service, settings.positivePictures, cropImgPath) - os.remove(cropImgPath) - if hasattr(settings, 'positivePicturesDir'): - pp = pathlib.PurePath(segmentInfo['imgPath']) - destPath = os.path.join(settings.positivePicturesDir, pp.name) - shutil.copy(segmentInfo['imgPath'], destPath) - else: - goog_helper.uploadFile(service, settings.positivePictures, segmentInfo['imgPath']) - positiveSegments += 1 - - if positiveSegments > 0: - # Commenting out saving full images for now to reduce data - # goog_helper.uploadFile(service, settings.positivePictures, imgPath) - logging.warning('Found %d positives in image %s', positiveSegments, ppath.name) - - def drawRect(imgDraw, x0, y0, x1, y1, width, color): for i in range(width): imgDraw.rectangle((x0+i,y0+i,x1-i,y1-i),outline=color) -def drawFireBox(imgPath, fireSegment): +def drawFireBox(imgPath, detection_spec): """Draw bounding box with fire detection with score on image Stores the resulting annotated image as new file @@ -313,28 +156,30 @@ def drawFireBox(imgPath, fireSegment): """ img = Image.open(imgPath) imgDraw = ImageDraw.Draw(img) - x0 = fireSegment['MinX'] - y0 = fireSegment['MinY'] - x1 = fireSegment['MaxX'] - y1 = fireSegment['MaxY'] - centerX = (x0 + x1)/2 - centerY = (y0 + y1)/2 - color = "red" - lineWidth=3 - drawRect(imgDraw, x0, y0, x1, y1, lineWidth, color) - - fontSize=80 - font = ImageFont.truetype(os.path.join(settings.fuegoRoot, 'lib/Roboto-Regular.ttf'), size=fontSize) - scoreStr = '%.2f' % fireSegment['score'] - textSize = imgDraw.textsize(scoreStr, font=font) - imgDraw.text((centerX - textSize[0]/2, centerY - textSize[1]), scoreStr, font=font, fill=color) - - color = "blue" - fontSize=70 - font = ImageFont.truetype(os.path.join(settings.fuegoRoot, 'lib/Roboto-Regular.ttf'), size=fontSize) - scoreStr = '%.2f' % fireSegment['HistMax'] - textSize = imgDraw.textsize(scoreStr, font=font) - imgDraw.text((centerX - textSize[0]/2, centerY), scoreStr, font=font, fill=color) + + for bounding_box in detection_spec: + x0 = bounding_box['x'] + y0 = bounding_box['y'] + x1 = bounding_box['x'] + bounding_box['width'] + y1 = bounding_box['y'] + bounding_box['height'] + centerX = (x0 + x1)/2 + centerY = (y0 + y1)/2 + color = "red" + lineWidth=3 + drawRect(imgDraw, x0, y0, x1, y1, lineWidth, color) + + fontSize=80 + font = ImageFont.truetype(os.path.join(settings.fuegoRoot, 'lib/Roboto-Regular.ttf'), size=fontSize) + scoreStr = '%.2f' % bounding_box['score'] + textSize = imgDraw.textsize(scoreStr, font=font) + imgDraw.text((centerX - textSize[0]/2, centerY - textSize[1]), scoreStr, font=font, fill=color) + + # color = "blue" + # fontSize=70 + # font = ImageFont.truetype(os.path.join(settings.fuegoRoot, 'lib/Roboto-Regular.ttf'), size=fontSize) + # scoreStr = '%.2f' % fireSegment['HistMax'] + # textSize = imgDraw.textsize(scoreStr, font=font) + # imgDraw.text((centerX - textSize[0]/2, centerY), scoreStr, font=font, fill=color) filePathParts = os.path.splitext(imgPath) annotatedFile = filePathParts[0] + '_Score' + filePathParts[1] @@ -344,7 +189,7 @@ def drawFireBox(imgPath, fireSegment): return annotatedFile -def recordDetection(dbManager, service, camera, timestamp, imgPath, annotatedFile, fireSegment): +def recordDetection(dbManager, service, camera, timestamp, imgPath, annotatedFile, detection_spec): """Record that a smoke/fire has been detected Record the detection with useful metrics in 'detections' table in SQL DB. @@ -357,12 +202,12 @@ def recordDetection(dbManager, service, camera, timestamp, imgPath, annotatedFil timestamp (int): imgPath: filepath of the image annotatedFile: filepath of the image with annotated box and score - fireSegment (dictionary): dictionary with information for the segment with fire/smoke + detection_spec (list): DetectionSpec (list of bounding boxes + scores) Returns: List of Google drive IDs for the uploaded image files """ - logging.warning('Fire detected by camera %s, image %s, segment %s', camera, imgPath, str(fireSegment)) + logging.warning('Fire detected by camera %s, image %s, segment %s', camera, imgPath, str(detection_spec)) # upload file to google drive detection dir driveFileIDs = [] driveFile = goog_helper.uploadFile(service, settings.detectionPictures, imgPath) @@ -376,14 +221,11 @@ def recordDetection(dbManager, service, camera, timestamp, imgPath, annotatedFil dbRow = { 'CameraName': camera, 'Timestamp': timestamp, - 'MinX': fireSegment['MinX'], - 'MinY': fireSegment['MinY'], - 'MaxX': fireSegment['MaxX'], - 'MaxY': fireSegment['MaxY'], - 'Score': fireSegment['score'], - 'HistAvg': fireSegment['HistAvg'], - 'HistMax': fireSegment['HistMax'], - 'HistNumSamples': fireSegment['HistNumSamples'], + 'MinX': detection_spec['x'], + 'MinY': detection_spec['y'], + 'MaxX': detection_spec['x'] + detection_spec['width'], + 'MaxY': detection_spec['y'] + detection_spec['height'], + 'Score': detection_spec['score'], 'ImageID': driveFileIDs[0] if driveFileIDs else '' } dbManager.add_data('detections', dbRow) @@ -422,7 +264,7 @@ def checkAndUpdateAlerts(dbManager, camera, timestamp, driveFileIDs): return True -def alertFire(constants, cameraID, imgPath, annotatedFile, driveFileIDs, fireSegment, timestamp): +def alertFire(constants, cameraID, imgPath, annotatedFile, driveFileIDs, detection_spec, timestamp): """Send alerts about given fire through all channels (currently email and sms) Args: @@ -434,11 +276,11 @@ def alertFire(constants, cameraID, imgPath, annotatedFile, driveFileIDs, fireSeg fireSegment (dictionary): dictionary with information for the segment with fire/smoke timestamp (int): time.time() value when image was taken """ - emailFireNotification(constants, cameraID, imgPath, annotatedFile, driveFileIDs, fireSegment, timestamp) + emailFireNotification(constants, cameraID, imgPath, annotatedFile, driveFileIDs, detection_spec, timestamp) smsFireNotification(constants['dbManager'], cameraID) -def emailFireNotification(constants, cameraID, imgPath, annotatedFile, driveFileIDs, fireSegment, timestamp): +def emailFireNotification(constants, cameraID, imgPath, annotatedFile, driveFileIDs, detection_spec, timestamp): """Send an email alert for a potential new fire Send email with information about the camera and fire score includeing @@ -454,7 +296,8 @@ def emailFireNotification(constants, cameraID, imgPath, annotatedFile, driveFile timestamp (int): time.time() value when image was taken """ dbManager = constants['dbManager'] - subject = 'Possible (%d%%) fire in camera %s' % (int(fireSegment['score']*100), cameraID) + max_score = np.max(np.array([box['score'] for box in detection_spec])) + subject = 'Possible (%d%%) fire in camera %s' % (int(max_score*100), cameraID) body = 'Please check the attached images for fire.' # commenting out links to google drive because they appear as extra attachments causing confusion # and some email recipients don't even have permissions to access drive. @@ -493,27 +336,6 @@ def smsFireNotification(dbManager, cameraID): sms_helper.sendSms(settings, phone, message) -def deleteImageFiles(imgPath, origImgPath, annotatedFile, segments): - """Delete all image files given in segments - - Args: - imgPath: filepath of the original image - annotatedFile: filepath of the annotated image - segments (list): List of dictionary containing information on each segment - """ - for segmentInfo in segments: - os.remove(segmentInfo['imgPath']) - os.remove(imgPath) - if imgPath != origImgPath: - os.remove(origImgPath) - if annotatedFile: - os.remove(annotatedFile) - ppath = pathlib.PurePath(imgPath) - # leftoverFiles = os.listdir(str(ppath.parent)) - # if len(leftoverFiles) > 0: - # logging.warning('leftover files %s', str(leftoverFiles)) - - def getLastScoreCamera(dbManager): sqlStr = "SELECT CameraName from scores order by Timestamp desc limit 1;" dbResult = dbManager.query(sqlStr) @@ -532,95 +354,6 @@ def heartBeat(filename): """ pathlib.Path(filename).touch() - -def segmentAndClassify(imgPath, tfSession, graph, labels): - """Segment the given image into squares and classify each square - - Args: - imgPath (str): filepath of the image to segment and clasify - tfSession: Tensorflow session - graph: Tensorflow graph - labels: Tensorflow labels - - Returns: - list of segments with scores sorted by decreasing score - """ - segments = segmentImage(imgPath) - # print('si', segments) - tf_helper.classifySegments(tfSession, graph, labels, segments) - segments.sort(key=lambda x: -x['score']) - return segments - - -def recordFilterReport(constants, cameraID, timestamp, imgPath, origImgPath, segments, minusMinutes, googleDrive, positivesOnly): - """Record the scores for classified segments, check for detections and alerts, and delete images - - Args: - constants (dict): "global" contants - cameraID (str): ID for camera associated with the image - timestamp (int): time.time() value when image was taken - imgPath (str): filepath of the image (possibly derived image by subtraction) that was classified - origImgPath (str): filepath of the original image from camera - segments (list): List of dictionary containing information on each segment - minusMinutes (int): number of minutes separating subtracted images (0 for non-subtracted images) - googleDrive: google drive API service - positivesOnly (bool): only collect/upload positives to google drive - used when training against old images - """ - annotatedFile = None - args = constants['args'] - dbManager = constants['dbManager'] - if args.collectPositves: - collectPositves(googleDrive, imgPath, origImgPath, segments) - if not positivesOnly: - recordScores(dbManager, cameraID, timestamp, segments, minusMinutes) - fireSegment = postFilter(dbManager, cameraID, timestamp, segments) - if fireSegment: - annotatedFile = drawFireBox(origImgPath, fireSegment) - driveFileIDs = recordDetection(dbManager, googleDrive, cameraID, timestamp, origImgPath, annotatedFile, fireSegment) - if checkAndUpdateAlerts(dbManager, cameraID, timestamp, driveFileIDs): - alertFire(constants, cameraID, origImgPath, annotatedFile, driveFileIDs, fireSegment, timestamp) - deleteImageFiles(imgPath, origImgPath, annotatedFile, segments) - if (args.heartbeat): - heartBeat(args.heartbeat) - logging.warning('Highest score for camera %s: %f' % (cameraID, segments[0]['score'])) - - -def genDiffImage(imgPath, earlierImgPath, minusMinutes): - """Subtract the two given images and store result in new difference image file - - Args: - imgPath (str): filepath of the current image (to subtract from) - imgPath (str): filepath of the earlier image (value to subtract) - minusMinutes (int): number of minutes separating subtracted images - - Returns: - file path to the difference image - """ - imgA = Image.open(imgPath) - imgB = Image.open(earlierImgPath) - imgDiff = img_archive.diffImages(imgA, imgB) - parsedName = img_archive.parseFilename(imgPath) - parsedName['diffMinutes'] = minusMinutes - imgDiffName = img_archive.repackFileName(parsedName) - ppath = pathlib.PurePath(imgPath) - imgDiffPath = os.path.join(str(ppath.parent), imgDiffName) - imgDiff.save(imgDiffPath, format='JPEG') - return imgDiffPath - - -def expectedDrainSeconds(deferredImages, timeTracker): - """Calculate the expected time to drain the deferred images queue - - Args: - deferredImages (list): list of deferred images - timeTracker (dict): tracks recent image processing times - - Returns: - Seconds to process deferred images - """ - return len(deferredImages)*timeTracker['timePerSample'] - - def updateTimeTracker(timeTracker, processingTime): """Update the time tracker data with given time to process current image @@ -654,101 +387,14 @@ def initializeTimeTracker(): 'timePerSample': 3 # start off with estimate of 3 seconds per camera } - -def getDeferrredImgageInfo(deferredImages, timeTracker, minusMinutes, currentTime): - """Returns tuple of boolean indicating whether queue is full, and optionally deferred image if ready to process - - Args: - deferredImages (list): list of deferred images - timeTracker (dict): tracks recent image processing times - minusMinutes (int): number of desired minutes between images to subract - currentTime (float): current time.time() value - - Returns: - Tuple (bool, dict or None) - """ - if minusMinutes == 0: - return (False, None) - if len(deferredImages) == 0: - return (False, None) - minusSeconds = 60*minusMinutes - queueFull = (expectedDrainSeconds(deferredImages, timeTracker) >= minusSeconds) - if queueFull or (deferredImages[0]['timestamp'] + minusSeconds < currentTime): - img = deferredImages[0] - del deferredImages[0] - return (queueFull, img) - return (queueFull, None) - - -def addToDeferredImages(dbManager, cameras, deferredImages): - """Fetch a new image and add it to deferred images queue - - Args: - dbManager (DbManager): - cameras (list): list of cameras - deferredImages (list): list of deferred images - """ - (cameraID, timestamp, imgPath, md5) = getNextImage(dbManager, cameras) - # add image to Q if not already another one from same camera - matches = list(filter(lambda x: x['cameraID'] == cameraID, deferredImages)) - if len(matches) > 0: - assert len(matches) == 1 - logging.warning('Camera already in list waiting processing %s', matches[0]) - time.sleep(2) # take a nap to let things catch up - return - deferredImages.append({ - 'timestamp': timestamp, - 'cameraID': cameraID, - 'imgPath': imgPath, - 'md5': md5, - 'oldWait': 0 - }) - logging.warning('Defer camera %s. Len %d', cameraID, len(deferredImages)) - - -def genDiffImageFromDeferred(dbManager, cameras, deferredImageInfo, deferredImages, minusMinutes): - """Generate a difference image baed on given deferred image - - Args: - dbManager (DbManager): - cameras (list): list of cameras - deferredImageInfo (dict): deferred image to process - deferredImages (list): list of deferred images - minusMinutes (int): number of desired minutes between images to subract - - Returns: - Tuple containing camera name, current timestamp, filepath of regular image, and filepath of difference image - """ - # logging.warning('DefImg: %d, %s, %s', len(deferredImages), timeStart, deferredImageInfo) - (cameraID, timestamp, imgPath, md5) = getNextImage(dbManager, cameras, deferredImageInfo['cameraID']) - timeDiff = timestamp - deferredImageInfo['timestamp'] - if md5 == deferredImageInfo['md5']: # check if image hasn't changed - logging.warning('Camera %s unchanged (oldWait=%d, diff=%d)', cameraID, deferredImageInfo['oldWait'], timeDiff) - if (timeDiff + deferredImageInfo['oldWait']) < min(2 * 60 * minusMinutes, 5 * 60): - # some cameras may not referesh fast enough so give another chance up to 2x minusMinutes or 5 mins - # Putting it back at the end of the queue with updated timestamp - # Updating timestamp so this doesn't take priority in processing - deferredImageInfo['timestamp'] = timestamp - deferredImageInfo['oldWait'] += timeDiff - deferredImages.append(deferredImageInfo) - else: # timeout waiting for new image from camera => discard data - os.remove(deferredImageInfo['imgPath']) - os.remove(imgPath) - return (None,None,None,None) - imgDiffPath = genDiffImage(imgPath, deferredImageInfo['imgPath'], minusMinutes) - logging.warning('Checking camera %s. Len %d. Diff %d', cameraID, len(deferredImages), timeDiff) - return (cameraID, timestamp, imgPath, imgDiffPath) - - -def getArchivedImages(constants, cameras, startTimeDT, timeRangeSeconds, minusMinutes): - """Get random images from HPWREN archive matching given constraints and optionally subtract them +def getArchivedImages(constants, cameras, startTimeDT, timeRangeSeconds): + """Get random images from HPWREN archive matching given constraints Args: constants (dict): "global" contants cameras (list): list of cameras startTimeDT (datetime): starting time of time range timeRangeSeconds (int): number of seconds in time range - minusMinutes (int): number of desired minutes between images to subract Returns: Tuple containing camera name, current timestamp, filepath of regular image, and filepath of difference image @@ -759,34 +405,15 @@ def getArchivedImages(constants, cameras, startTimeDT, timeRangeSeconds, minusMi cameraID = cameras[int(len(cameras)*random.random())]['name'] timeDT = startTimeDT + datetime.timedelta(seconds = random.random()*timeRangeSeconds) - if minusMinutes: - prevTimeDT = timeDT + datetime.timedelta(seconds = -60 * minusMinutes) - else: - prevTimeDT = timeDT + prevTimeDT = timeDT files = img_archive.getHpwrenImages(constants['googleServices'], settings, getArchivedImages.tmpDir.name, - constants['camArchives'], cameraID, prevTimeDT, timeDT, minusMinutes or 1) + constants['camArchives'], cameraID, prevTimeDT, timeDT, 1) # logging.warning('files %s', str(files)) if not files: return (None, None, None, None) - if minusMinutes: - if len(files) > 1: - if files[0] >= files[1]: # files[0] is supposed to be earlier than files[1] - logging.warning('unexpected file order %s', str(files)) - for file in files: - os.remove(file) - return (None, None, None, None) - imgDiffPath = genDiffImage(files[1], files[0], minusMinutes) - os.remove(files[0]) # no longer needed - parsedName = img_archive.parseFilename(files[1]) - return (cameraID, parsedName['unixTime'], files[1], imgDiffPath) - else: - logging.warning('unexpected file count %s', str(files)) - for file in files: - os.remove(file) - return (None, None, None, None) - elif len(files) > 0: + if len(files) > 0: parsedName = img_archive.parseFilename(files[0]) - return (cameraID, parsedName['unixTime'], files[0], files[0]) + return (cameraID, parsedName['unixTime'], files[0]) return (None, None, None, None) getArchivedImages.tmpDir = None @@ -803,7 +430,6 @@ def main(): ["e", "endTime", "(optional) performs search with modifiedTime < endTime"], ] args = collect_args.collectArgs([], optionalArgs=optArgs, parentParsers=[goog_helper.getParentParser()]) - minusMinutes = int(args.minusMinutes) if args.minusMinutes else 0 googleServices = goog_helper.getGoogleServices(settings, args) dbManager = db_manager.DbManager(sqliteFile=settings.db_file, psqlHost=settings.psqlHost, psqlDb=settings.psqlDb, @@ -824,52 +450,55 @@ def main(): assert startTimeDT and endTimeDT timeRangeSeconds = (endTimeDT-startTimeDT).total_seconds() assert timeRangeSeconds > 0 - assert args.collectPositves + assert args._collect_positives useArchivedImages = True - deferredImages = [] processingTimeTracker = initializeTimeTracker() - graph = tf_helper.load_graph(settings.model_file) - labels = tf_helper.load_labels(settings.labels_file) - config = tf.ConfigProto() - config.gpu_options.per_process_gpu_memory_fraction = 0.1 #hopefully reduces segfaults - with tf.Session(graph=graph, config=config) as tfSession: - while True: - classifyImgPath = None - timeStart = time.time() - if useArchivedImages: - (cameraID, timestamp, imgPath, classifyImgPath) = \ - getArchivedImages(constants, cameras, startTimeDT, timeRangeSeconds, minusMinutes) - elif minusMinutes: - (queueFull, deferredImageInfo) = getDeferrredImgageInfo(deferredImages, processingTimeTracker, minusMinutes, timeStart) - if not queueFull: # queue is not full, so add more to queue - addToDeferredImages(dbManager, cameras, deferredImages) - if deferredImageInfo: # we have a deferred image ready to process, now get latest image and subtract - (cameraID, timestamp, imgPath, classifyImgPath) = \ - genDiffImageFromDeferred(dbManager, cameras, deferredImageInfo, deferredImages, minusMinutes) - if not cameraID: - continue # skip to next camera without deleting deferred image which may be reused later - os.remove(deferredImageInfo['imgPath']) # no longer needed - else: - continue # in diff mode without deferredImage, nothing more to do - # elif args.imgDirectory: unused functionality -- to delete? - # (cameraID, timestamp, imgPath, md5) = getNextImageFromDir(args.imgDirectory) - else: # regular (non diff mode), grab image and process - (cameraID, timestamp, imgPath, md5) = getNextImage(dbManager, cameras) - classifyImgPath = imgPath - if not cameraID: - continue # skip to next camera - timeFetch = time.time() - - segments = segmentAndClassify(classifyImgPath, tfSession, graph, labels) - timeClassify = time.time() - recordFilterReport(constants, cameraID, timestamp, classifyImgPath, imgPath, segments, minusMinutes, googleServices['drive'], useArchivedImages) - timePost = time.time() - updateTimeTracker(processingTimeTracker, timePost - timeStart) - if args.time: - logging.warning('Timings: fetch=%.2f, classify=%.2f, post=%.2f', - timeFetch-timeStart, timeClassify-timeFetch, timePost-timeClassify) - - -if __name__=="__main__": + detection_policy = InceptionV3AndHistoricalThreshold(settings, args, googleServices, dbManager) + while True: + timeStart = time.time() + + #### Generate ImageSpec: Load sequence of one or more images, either from archives or live cameras #### + #TODO: switch to sequence of images + if useArchivedImages: + (cameraID, timestamp, imgPath) = getArchivedImages(constants, cameras, startTimeDT, timeRangeSeconds) + else: # regular (non diff mode), grab image and process + cameraID, timestamp, imgPath = getNextImage(dbManager, cameras) + if not cameraID: + continue # skip to next camera + + image_spec = [{}] + image_spec[-1]['path'] = imgPath + image_spec[-1]['timestamp'] = timestamp + image_spec[-1]['cameraID'] = cameraID + + timeFetch = time.time() + + + ##### Send loaded images to detection policy #### + detection_spec = detection_policy.run_detection(image_spec) + timeClassify = time.time() + + + ##### If fire detected, record in database and send out alerts as needed ##### + if len(detection_spec) > 0: + annotatedFile = drawFireBox(imgPath, detection_spec) + driveFileIDs = recordDetection(dbManager, googleServices['drive'], cameraID, timestamp, imgPath, + annotatedFile, detection_spec) + if checkAndUpdateAlerts(dbManager, cameraID, timestamp, driveFileIDs): + alertFire(constants, cameraID, imgPath, annotatedFile, driveFileIDs, detection_spec, timestamp) + timePost = time.time() + + #TODO: delete image sequnce + os.remove(imgPath) + + updateTimeTracker(processingTimeTracker, timePost - timeStart) + if args.time: + logging.warning('Timings: fetch=%.2f, classify=%.2f, post=%.2f', + timeFetch-timeStart, timeClassify-timeFetch, timePost-timeClassify) + + if (args.heartbeat): + heartBeat(args.heartbeat) + +if __name__== "__main__": main() diff --git a/smoke-classifier/detect_monitor.py b/smoke-classifier/detect_monitor.py index 9bacb16..0a44908 100644 --- a/smoke-classifier/detect_monitor.py +++ b/smoke-classifier/detect_monitor.py @@ -101,7 +101,7 @@ def main(): for i in range(numProcesses): heartbeatFile = tempfile.NamedTemporaryFile() heartbeatFileName = heartbeatFile.name - proc = startProcess(scriptName, heartbeatFileName, args.collectPositves, args.restrictType) + proc = startProcess(scriptName, heartbeatFileName, args._collect_positives, args.restrictType) procInfos.append({ 'proc': proc, 'heartbeatFile': heartbeatFile, @@ -121,7 +121,7 @@ def main(): if (timestamp - lastTS) > 4*60: # kill if stuck more than 4 minutes logging.warning('Killing %d', proc.pid) proc.kill() - procInfo['proc'] = startProcess(scriptName, procInfo['heartbeatFileName'], args.collectPositves, args.restrictType) + procInfo['proc'] = startProcess(scriptName, procInfo['heartbeatFileName'], args._collect_positives, args.restrictType) time.sleep(30) diff --git a/train/package_docker.sh b/train/package_docker.sh new file mode 100644 index 0000000..f43c426 --- /dev/null +++ b/train/package_docker.sh @@ -0,0 +1,18 @@ +#this script packages a trained model into a docker container, and +#pushes it to the GCR. First arugment should be a name you want to +#use for the model (i.e. inception), and second argument should be its path + +NAME=$1 +MODEL_PATH=$2 + +docker pull tensorflow/serving:latest-gpu +sudo docker run -d --name serving_base tensorflow/serving:latest-gpu +# create intermediate dir +sudo docker exec serving_base mkdir -p /models/$NAME +sudo docker cp $MODEL_PATH serving_base:/models/$NAME/1 +sudo docker commit --change "ENV MODEL_NAME $NAME" serving_base $NAME"_serving" +sudo docker kill serving_base +sudo docker rm serving_base + +sudo docker tag $NAME"_serving" gcr.io/dkgu-dev/$NAME"_serving":latest +sudo docker push gcr.io/dkgu-dev/$NAME"_serving":latest diff --git a/train/prepare_trainset.py b/train/prepare_trainset.py index 0eac6ae..a977f18 100644 --- a/train/prepare_trainset.py +++ b/train/prepare_trainset.py @@ -188,7 +188,7 @@ def _convert_dataset(split_name, filenames, class_names_to_ids, dataset_dir): output_filename = _get_dataset_filename( dataset_dir, split_name, shard_id, numShards) - with tf.python_io.TFRecordWriter(output_filename) as tfrecord_writer: + with tf.io.TFRecordWriter(output_filename) as tfrecord_writer: start_ndx = shard_id * num_per_shard end_ndx = min((shard_id+1) * num_per_shard, len(filenames)) for i in range(start_ndx, end_ndx): @@ -197,7 +197,7 @@ def _convert_dataset(split_name, filenames, class_names_to_ids, dataset_dir): sys.stdout.flush() # Read the filename: - image_data = tf.gfile.FastGFile(filenames[i], 'rb').read() + image_data = tf.gfile.GFile(filenames[i], 'rb').read() height, width = image_reader.read_image_dims(sess, image_data) class_name = os.path.basename(os.path.dirname(filenames[i])) diff --git a/train/train_classifier_keras.py b/train/train_classifier_keras.py new file mode 100644 index 0000000..3bb5db8 --- /dev/null +++ b/train/train_classifier_keras.py @@ -0,0 +1,77 @@ +import glob +import tensorflow as tf +from tensorflow import keras +import collect_args +import goog_helper + +def _parse_function(example_proto): + """ + Function for converting TFRecordDataset to uncompressed image pixels + labels + :return: + """ + feature_description = { + 'image/class/label': tf.io.FixedLenFeature([], tf.int64, default_value=0), + 'image/encoded': tf.io.FixedLenFeature([], tf.string, default_value=''), + 'image/format': tf.io.FixedLenFeature([], tf.string, default_value=''), + 'image/height': tf.io.FixedLenFeature([], tf.int64, default_value=0), + 'image/width': tf.io.FixedLenFeature([], tf.int64, default_value=0), + } + + # Parse the input `tf.Example` proto using the dictionary above. + example = tf.io.parse_single_example(example_proto, feature_description) + image = tf.image.decode_image(example['image/encoded'], channels=3) + + #Resizing images in training set because they are apprently rectangular much fo the time + if example['image/height'] != 299 or example['image/width'] != 299: + image = tf.image.resize(tf.reshape(image, [example['image/height'], example['image/width'], 3]), [299, 299]) + image = tf.cast(image, tf.uint8) + + image = tf.reshape(image, [299, 299, 3]) #weird workaround because decode image doesnt get shape + label = tf.one_hot(example['image/class/label'], depth=2) + + image = (tf.cast(image, tf.float32) - 128) / 128.0 + return [image, label] + + +def main(): + reqArgs = [ + ["i", "inputDir", "local directory containing both smoke and nonSmoke images"], + ["o", "outputDir", "local directory to write out TFRecords files"], + ] + optArgs = [ + ["t", "trainPercentage", "percentage of data to use for training vs. validation (default 90)"] + ] + + args = collect_args.collectArgs(reqArgs, optionalArgs=optArgs, parentParsers=[goog_helper.getParentParser()]) + + batch_size = 32 + max_epochs = 1000 + steps_per_epoch=250 + overshoot_epochs=15 #number of epochs over which validation loss hasnt decreased to stop training at + val_steps = 100 #only needed for now because of a bug in tf2.0, which should be fixed in next version + #TODO: either set this to # of validation examples /batch size (i.e. figure out num validation examples) + #or upgrade to TF2.1 when its ready and automatically go thorugh the whole set + + train_filenames = glob.glob(args.inputDir + 'firecam_train_*.tfrecord') + val_filenames = glob.glob(args.inputDir + 'firecam_validation_*.tfrecord') + + raw_dataset_train = tf.data.TFRecordDataset(train_filenames) + raw_dataset_val = tf.data.TFRecordDataset(val_filenames) + + dataset_train = raw_dataset_train.map(_parse_function).repeat(max_epochs * steps_per_epoch).shuffle(batch_size * 5).batch(batch_size) + dataset_val = raw_dataset_val.map(_parse_function).repeat().batch(batch_size) + + inception = keras.applications.inception_v3.InceptionV3(weights=None, include_top=True, input_tensor=None, + classes=2) + optimizer = tf.keras.optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, amsgrad=False) + inception.compile(optimizer=optimizer, loss=tf.keras.losses.BinaryCrossentropy(), metrics=['accuracy']) + + callbacks = [keras.callbacks.EarlyStopping(monitor='val_loss', patience=overshoot_epochs), + keras.callbacks.ModelCheckpoint(filepath=args.outputDir + 'best_model', + monitor='val_loss', save_best_only=True)] + + inception.fit(dataset_train, validation_data=dataset_val, epochs=max_epochs, validation_steps=val_steps, steps_per_epoch=steps_per_epoch, callbacks=callbacks) + + +if __name__ == "__main__": + main()