Initial commit
This commit is contained in:
BIN
src/__pycache__/detector.cpython-310.pyc
Normal file
BIN
src/__pycache__/detector.cpython-310.pyc
Normal file
Binary file not shown.
BIN
src/__pycache__/objectifier.cpython-310.pyc
Normal file
BIN
src/__pycache__/objectifier.cpython-310.pyc
Normal file
Binary file not shown.
213
src/detector.py
Executable file
213
src/detector.py
Executable file
@@ -0,0 +1,213 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import cv2 as cv
|
||||
import numpy as np
|
||||
import sys
|
||||
import shutil
|
||||
from os import path
|
||||
import hashlib
|
||||
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
class Detection:
|
||||
"""Represents an object dectected in an image."""
|
||||
|
||||
def __init__(self, label, confidence, box):
|
||||
self.label = label
|
||||
self.confidence = confidence
|
||||
self.box = box
|
||||
|
||||
class AnalyzedImage:
|
||||
"""The result of performing object detection on an image."""
|
||||
|
||||
def __init__(self, filename, detections, outfile):
|
||||
self.detections = detections
|
||||
self.outfile = outfile
|
||||
|
||||
class Detector:
|
||||
"""Detects objects in images, returning an AnalyzedImage."""
|
||||
|
||||
def __init__(self, weights, cfg, classes, tempdir, confidence=0.6):
|
||||
self.net = cv.dnn.readNet(weights, cfg)
|
||||
self.classes = classes
|
||||
self.layer_names = self.net.getLayerNames()
|
||||
self.output_layer = [self.layer_names[i - 1] for i in self.net.getUnconnectedOutLayers()]
|
||||
self.tmpdir = tempdir
|
||||
self.minimum_confidence = confidence
|
||||
|
||||
def output_filename(self, filename):
|
||||
simple_name = path.splitext(path.basename(filename))[0]
|
||||
return str(self.tmpdir / (simple_name + ".png"))
|
||||
|
||||
def detect_objects(self, filename, output_filename=None):
|
||||
img = cv.imread(str(filename))
|
||||
height, width, channel = img.shape
|
||||
blob = cv.dnn.blobFromImage(img, 0.00392, (416, 416), (0,0,0), True, crop=False)
|
||||
self.net.setInput(blob)
|
||||
outs = self.net.forward(self.output_layer)
|
||||
|
||||
class_ids = []
|
||||
confidences = []
|
||||
boxes = []
|
||||
detections = []
|
||||
|
||||
for out in outs:
|
||||
for detection in out:
|
||||
scores = detection[5:]
|
||||
class_id = np.argmax(scores)
|
||||
confidence = scores[class_id]
|
||||
if confidence > self.minimum_confidence:
|
||||
center_x = int(detection[0] * width)
|
||||
center_y = int(detection[1] * height)
|
||||
w = int(detection[2] * width)
|
||||
h = int(detection[3] * height)
|
||||
x = int(center_x - w/2)
|
||||
y = int(center_y - h/2)
|
||||
boxes.append([x, y, w, h])
|
||||
confidences.append(float(confidence))
|
||||
class_ids.append(class_id)
|
||||
|
||||
indexes = cv.dnn.NMSBoxes(boxes, confidences, 0.5, 0.4)
|
||||
|
||||
for i in indexes:
|
||||
label = str(self.classes[class_ids[i]])
|
||||
box = [int(n) for n in boxes[i]]
|
||||
detections.append(Detection(label, confidences[i], box))
|
||||
|
||||
font = cv.FONT_HERSHEY_PLAIN
|
||||
marked = cv.imread(filename)
|
||||
for detection in detections:
|
||||
x, y, w, h = detection.box
|
||||
cv.rectangle(marked, (x,y), (x + w, y + h), (255,255,255,0), 2)
|
||||
cv.putText(marked, detection.label, (x,y+30), font, 3, (255,255,255,0), 1)
|
||||
|
||||
out_file = output_filename if output_filename else self.output_filename(filename)
|
||||
cv.imwrite(out_file, marked)
|
||||
return AnalyzedImage(filename, detections, str(out_file))
|
||||
|
||||
|
||||
|
||||
|
||||
# net = cv.dnn.readNet("/home/niten/Projects/yolo/yolov3.weights", "/home/niten/Projects/yolo/yolov3.cfg")
|
||||
|
||||
# classes = []
|
||||
|
||||
# with open("/home/niten/Projects/yolo/coco.names") as f:
|
||||
# classes = [line.strip() for line in f.readlines()]
|
||||
|
||||
# layer_names = net.getLayerNames()
|
||||
# output_layer = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]
|
||||
# colors = np.random.uniform(0, 255, size=(len(classes), 3))
|
||||
|
||||
# def scale_int(o, s, m):
|
||||
# return (m / s) * o
|
||||
|
||||
# def scale_box(orig, scaled, box):
|
||||
# o_h, o_w, _ = orig.shape
|
||||
# s_h, s_w, _ = scaled.shape
|
||||
# x, y, w, h = box
|
||||
# return [scale_int(o_w, s_w, x),
|
||||
# scale_int(o_h, s_h, y),
|
||||
# scale_int(o_w, o_h, w),
|
||||
# scale_int(o_h, s_h, h)]
|
||||
|
||||
# tmpdir = Path(tempfile.mkdtemp())
|
||||
|
||||
# def detect_objects(filename):
|
||||
# simplename = path.splitext(path.basename(filename))[0]
|
||||
# out_filename = tmpdir / ("processed_" + simplename + ".png")
|
||||
|
||||
# orig = cv.imread(str(filename))
|
||||
# img = cv.imread(str(filename))
|
||||
# # img = cv.resize(img, None, fx=0.4, fy=0.4)
|
||||
# height, width, channel = img.shape
|
||||
# # TODO: Change scale factor?
|
||||
# blob = cv.dnn.blobFromImage(img, 0.00392, (416, 416), (0, 0, 0), True, crop=False)
|
||||
# net.setInput(blob)
|
||||
# outs = net.forward(output_layer)
|
||||
|
||||
# class_ids = []
|
||||
# confidences = []
|
||||
# boxes = []
|
||||
|
||||
# detections = []
|
||||
|
||||
# for out in outs:
|
||||
# for detection in out:
|
||||
# scores = detection[5:]
|
||||
# class_id = np.argmax(scores)
|
||||
# confidence = scores[class_id]
|
||||
# if confidence > 0.6:
|
||||
# center_x = int(detection[0] * width)
|
||||
# center_y = int(detection[1] * height)
|
||||
# w = int(detection[2] * width)
|
||||
# h = int(detection[3] * height)
|
||||
# x = int(center_x - w/2)
|
||||
# y = int(center_y - h/2)
|
||||
# boxes.append([x, y, w, h])
|
||||
# confidences.append(float(confidence))
|
||||
# class_ids.append(class_id)
|
||||
|
||||
# indexes = cv.dnn.NMSBoxes(boxes, confidences, 0.5, 0.4)
|
||||
|
||||
# font = cv.FONT_HERSHEY_PLAIN
|
||||
# for i in range(len(boxes)):
|
||||
# if i in indexes:
|
||||
# label = str(classes[class_ids[i]])
|
||||
# color = colors[i]
|
||||
# scaled_box = scale_box(orig, img, boxes[i])
|
||||
# x, y, w, h = [int(n) for n in scaled_box]
|
||||
# detections.append(Detection(label, confidences[i], scaled_box))
|
||||
# # cv.rectangle(out, (x, y), (x + w, y + h), color, 2)
|
||||
# # cv.putText(out, label, (x, y + 30), font, 3, color, 3)
|
||||
|
||||
# #cv.imwrite(str(out_filename), out)
|
||||
# marked = cv.imread(filename)
|
||||
# for detection in detections:
|
||||
# x, y, w, h = [int(n) for n in detection.box]
|
||||
# cv.rectangle(marked, (x,y), (x + w, y + h), (255,255,255,0), 2)
|
||||
# cv.putText(marked, detection.label, (x, y + 30), font, 3, (255,255,255,0), 1)
|
||||
|
||||
# cv.imwrite(str(out_filename), marked)
|
||||
# return AnalyzedImage(filename, detections, str(out_filename))
|
||||
|
||||
# # cv.imshow("IMG", img)
|
||||
# # cv.waitKey(0)
|
||||
# # cv.destroyAllWindows()
|
||||
|
||||
# for filename in sys.argv[1:]:
|
||||
# print(filename + ":")
|
||||
# output = detect_objects(filename)
|
||||
# print(" OUTPUT: " + str(output.outfile))
|
||||
# for detection in output.detections:
|
||||
# print(" " + detection.label +
|
||||
# " (" + str(detection.confidence) + ")" +
|
||||
# " [" +
|
||||
# str(detection.box[0]) + ", " +
|
||||
# str(detection.box[1]) + ", " +
|
||||
# str(detection.box[2]) + ", " +
|
||||
# str(detection.box[3]) +
|
||||
# "]")
|
||||
|
||||
|
||||
# classes = []
|
||||
|
||||
# with open("/home/niten/Projects/yolo/coco.names") as f:
|
||||
# classes = [line.strip() for line in f.readlines()]
|
||||
|
||||
# detector = Detector("/home/niten/Projects/yolo/yolov3.weights", "/home/niten/Projects/yolo/yolov3.cfg", classes, Path(tempfile.mkdtemp()))
|
||||
|
||||
# for filename in sys.argv[1:]:
|
||||
# print(filename + ":")
|
||||
# output = detector.detect_objects(filename)
|
||||
# print(" OUTPUT: " + str(output.outfile))
|
||||
# for detection in output.detections:
|
||||
# print(" " + detection.label +
|
||||
# " (" + str(detection.confidence) + ")" +
|
||||
# " [" +
|
||||
# str(detection.box[0]) + ", " +
|
||||
# str(detection.box[1]) + ", " +
|
||||
# str(detection.box[2]) + ", " +
|
||||
# str(detection.box[3]) +
|
||||
# "]")
|
||||
78
src/objectifier.py
Normal file
78
src/objectifier.py
Normal file
@@ -0,0 +1,78 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from fastapi import FastAPI, HTTPException, Request, UploadFile
|
||||
from fastapi.responses import FileResponse
|
||||
from detector import Detector
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
incoming_dir = Path(tempfile.mkdtemp())
|
||||
outgoing_dir = Path(tempfile.mkdtemp())
|
||||
|
||||
def get_envvar(name):
|
||||
return os.environ.get(name)
|
||||
|
||||
def get_envvar_or_fail(name):
|
||||
result = get_envvar(name)
|
||||
if result:
|
||||
return result
|
||||
else:
|
||||
raise EnvironmentError('Missing required environment variable: ' + name)
|
||||
|
||||
yolo_config = get_envvar_or_fail('OBJECTIFIER_YOLOV3_CONFIG')
|
||||
yolo_weights = get_envvar_or_fail('OBJECTIFIER_YOLOV3_WEIGHTS')
|
||||
yolo_labels = get_envvar_or_fail('OBJECTIFIER_YOLOV3_LABELS')
|
||||
buffer_size = get_envvar('OBJECTIFIER_BUFFER_SIZE') or 524288
|
||||
|
||||
detector = Detector(
|
||||
yolo_weights,
|
||||
yolo_config,
|
||||
yolo_labels,
|
||||
outgoing_dir)
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
analyzed_images = {}
|
||||
|
||||
def detection_to_dict(d):
|
||||
return {
|
||||
"label": d.label,
|
||||
"confidence": d.confidence,
|
||||
"box": {
|
||||
"x": d.box[0],
|
||||
"y": d.box[1],
|
||||
"width": d.box[2],
|
||||
"height": d.box[3],
|
||||
},
|
||||
}
|
||||
|
||||
def result_to_dict(res, base_url):
|
||||
return {
|
||||
"labels": map(lambda d: d.label, res.detections),
|
||||
"detections": map(detection_to_dict, res.detections),
|
||||
"output": base_url + d.outfile,
|
||||
}
|
||||
|
||||
@app.put("/images/")
|
||||
async def analyze_image(file: UploadFile, request: Request):
|
||||
base_url = re.sub(r'\/images\/$', '/analyzed_images/', str(request.url))
|
||||
infile = open(incoming_dir / file.filename)
|
||||
file_hash = hashlib.sha256()
|
||||
with open(infile, mode="wb") as f:
|
||||
chunk = f.read(buffer_size)
|
||||
while chunk:
|
||||
file_hash.update(chunk)
|
||||
infile.write(chunk)
|
||||
chunk=f.read(buffer_size)
|
||||
result = detector.detect_objects(infile, file_hash.hexdigest() + ".png")
|
||||
return result_to_dict(result, base_url)
|
||||
|
||||
@app.get("/analyzed_images/${image_name}", response_class=FileResponse)
|
||||
def get_analyzed_image(image_name):
|
||||
filename = str(outgoing_dir / image_name)
|
||||
if path.isfile(filename):
|
||||
return filename
|
||||
else:
|
||||
raise HTTPException(status_code=404, detail="file not found: " + filename)
|
||||
40
src/yolo-cli.py
Normal file
40
src/yolo-cli.py
Normal file
@@ -0,0 +1,40 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import argparse
|
||||
from detector import Detector
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog = 'YOLO CLI',
|
||||
description = 'YOLO Command Line Interface.')
|
||||
|
||||
parser.add_argument('-w', '--yolo_weights',
|
||||
help="Weight file for YOLOv3 object detection.")
|
||||
parser.add_argument('-c', '--yolo_config',
|
||||
help="Configuration file for YOLOv3 object detection")
|
||||
parser.add_argument('-l', '--yolo_labels',
|
||||
help="File containing list of object labels for YOLOv3 object detection")
|
||||
|
||||
parser.add_argument('files', metavar='FILE', type=str, nargs='*');
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
outgoing_dir = Path(tempfile.mkdtemp())
|
||||
|
||||
classes = []
|
||||
with open(args.yolo_labels) as f:
|
||||
classes = [line.strip() for line in f.readlines()]
|
||||
|
||||
detector = Detector(
|
||||
args.yolo_weights,
|
||||
args.yolo_config,
|
||||
classes,
|
||||
outgoing_dir)
|
||||
|
||||
for filename in args.files:
|
||||
print(filename + ":")
|
||||
result = detector.detect_objects(filename)
|
||||
print(" OUTPUT: " + str(result.outfile))
|
||||
for detection in result.detections:
|
||||
print(" " + detection.label + " (" + str(detection.confidence) + ")")
|
||||
Reference in New Issue
Block a user