Created
October 18, 2018 20:22
-
-
Save joelteply/2d54bb8a98da5e45069a21cafdb2ec45 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os, sys | |
sys.path.insert(0, os.path.abspath("..")) | |
sys.path.insert(0, os.path.abspath("../common")) | |
import os | |
import time | |
import numpy as np | |
import glob | |
from scipy import misc, ndimage | |
import cv2 | |
from common import utils, image_processing as ip, geometry as geo, transformations as T, diagnostics as d | |
import math | |
import json | |
import shutil | |
# python refinement_loop.py --upload_to_s3=0 --loop=0 --input_path=../webroot/mloutput --debug=1 | |
# Z is UP | |
rotX = T.rotation_matrix(0.00, [1, 0, 0]) | |
rotY = T.rotation_matrix(0.00, [0, 1, 0]) | |
rotZ = T.rotation_matrix(-0.06, [0, 0, 1]) | |
# rotX = T.rotation_matrix(0.00001, [1, 0, 0]) | |
# rotY = T.rotation_matrix(0.00001, [0, 1, 0]) | |
# rotZ = T.rotation_matrix(0.00001, [0, 0, 1]) | |
r_range = 255 | |
g_range = 220 | |
b_range = 255 | |
r_range = g_range = b_range = 255 | |
deep_fov = 70.0 | |
RIGHT_ANGLE = (math.pi/2.0) # 90 degrees | |
PROCESSING_SIZE = 1024 | |
# The estimated normal will be saved in 16-bit PNG format, where 0-65535 in R,G,B channel | |
# correspond to [-1, 1] for the X,Y,Z component of the normal vector. We use the camera coordinates | |
# defined as - X points to the camera right, Y points to the camera forward, | |
# and Z points to the camera upward. For example, right facing wall are very red, | |
# floor are very blue, and you rarely see green as it's parallel to the camera viewing direction. | |
# https://github.com/yindaz/surface_normal | |
# test | |
class NumpyEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, np.ndarray): | |
return obj.tolist() | |
return json.JSONEncoder.default(self, obj) | |
def scaleComponent(color, range=255.0): | |
return 2.0 * (float(color)/float(range)) - 1.0 | |
def getNormalFromRGB(rgb): | |
x = scaleComponent(rgb[0], r_range) | |
y = scaleComponent(rgb[1], g_range) | |
z = scaleComponent(rgb[2], b_range) | |
#normalize Y | |
# y = np.sign(y) * (1 - math.sqrt(x**2 + z**2)) | |
# y = y + 0.45 | |
return rotateNormal((x,y,z)) | |
def rotateNormal(normal): | |
result = np.dot(normal, rotX[:3,:3].T) | |
result = np.dot(result, rotY[:3,:3].T) | |
result = np.dot(result, rotZ[:3,:3].T) | |
return tuple(result) | |
def getMatchingSurface(reduced_mask, isolated_surfaces, isolated_values, ignore_indexes=[], max_angle=40.0): | |
isolated_mask = np.zeros(reduced_mask.shape,dtype=np.uint8) | |
for value in isolated_values: | |
isolated_mask[reduced_mask == value] = 255 | |
most_pixels = 0 | |
surface_index = -1 | |
i = 0 | |
best_intersection = 0 | |
best_angle = 0 | |
straight_up = [0, 0, 1] | |
max_radians = geo.degreesToRadians(max_angle) | |
#find best matching surface | |
for surface in isolated_surfaces: | |
if i not in ignore_indexes: | |
normal = surface[1] | |
angle = geo.angle_between(normal, straight_up) | |
intersection = cv2.bitwise_and(surface[2], isolated_mask) | |
intersection_pixels = cv2.countNonZero(intersection) | |
if intersection_pixels > most_pixels and angle < max_radians: | |
most_pixels = intersection_pixels | |
surface_index = i | |
best_intersection = intersection | |
best_angle = angle | |
i = i + 1 | |
if surface_index == -1: | |
return getMatchingSurface(reduced_mask, isolated_surfaces, isolated_values, ignore_indexes, max_angle=180.0) | |
print("Floor is %.2f degrees from UP" % geo.radiansToDegrees(best_angle)) | |
return surface_index, most_pixels, best_intersection | |
def getCandidateWalls(floor_normal, isolated_surfaces, maxAngle=20): | |
candidate_walls = [] | |
max_diff = geo.degreesToRadians(maxAngle) | |
kernel = np.ones((3,3),np.uint8) | |
best_score = 0 | |
best_wall_index = 0 | |
i = 0 | |
wall_count = 0 | |
overall_best_index = 0 | |
overall_best_diff = math.pi | |
for surface in isolated_surfaces: | |
normal = surface[1] | |
angle = geo.angle_between(normal, floor_normal) | |
vert_diff = abs(angle - RIGHT_ANGLE) | |
if vert_diff < overall_best_diff: | |
overall_best_diff = vert_diff | |
overall_best_index = i | |
if vert_diff < max_diff: | |
candidate_walls.append(surface) | |
angle_diff = 180.0 * vert_diff / math.pi | |
opening = cv2.morphologyEx(surface[2], cv2.MORPH_OPEN, kernel) | |
total_pixels = cv2.countNonZero(opening) | |
score = total_pixels / math.sqrt(angle_diff + 3.0) | |
if score > best_score: | |
best_wall_index = wall_count | |
best_score = total_pixels | |
print("%d) candidate wall normal = %s, angle diff = %.2f, color = %s, score=%f" \ | |
% (wall_count, normal, angle_diff, surface[0], score)) | |
wall_count = wall_count + 1 | |
i = i + 1 | |
if len(candidate_walls) == 0: | |
best_wall_index = 0 | |
candidate_walls.append(isolated_surfaces[best_wall_index]) | |
return candidate_walls, best_wall_index | |
def getFOV(first_normal, second_normal, expected_angle = math.pi/2.0): | |
measured_angle = geo.angle_between(first_normal, second_normal) | |
# skew = measured_angle / expected_angle | |
# print(measured_angle) | |
camera_fov = geo.radiansToDegrees(measured_angle) * (deep_fov / 90.0) | |
return camera_fov | |
def determinePrimaryAngles(rgb, mask, image_name, K=5): | |
img = rgb.copy() | |
mask = misc.imresize(mask, img.shape[:2]) | |
d.saveDiagnosticsImage(args, img, image_name, "normals") | |
kmeans, labels, centers = ip.kmeansImage(img, K) | |
d.saveDiagnosticsImage(args, kmeans, image_name, "normals_clustered") | |
img = ip.cropImage(kmeans, 30) | |
mask = ip.cropImage(mask, 30) | |
reduced_normals = ip.constrainImage(img, 300, inter=cv2.INTER_NEAREST) | |
reduced_mask = ip.constrainImage(mask, 300, inter=cv2.INTER_NEAREST) | |
d.saveDiagnosticsImage(args, reduced_mask, image_name, "reduced_mask") | |
# build surfaces | |
isolated_surfaces = [] | |
for color in centers: | |
normal = getNormalFromRGB(color) | |
color_mask = ip.isolateColor(reduced_normals, color) | |
isolated_surfaces.append((color, normal, color_mask)) | |
if args.debug: | |
debugImage = img.copy() | |
floor_materials = [255] #floor, rug | |
#wall windowpane, cabinet, wardrobe, painting, mirror, shelf, refrigerator, bookcase | |
wall_materials = [0] | |
# find floor | |
(floor_index, most_pixels, floor_intersection) = getMatchingSurface(reduced_mask, isolated_surfaces, floor_materials) | |
if floor_index < 0: | |
print("Invalid surfaces") | |
return None | |
floor_surface = isolated_surfaces[floor_index] | |
if args.debug: | |
debugImage = d.labelMask(debugImage, floor_intersection, "floor") | |
#find candidate wall surfaces | |
unavailable_surfaces = [] | |
candidate_walls, primary_wall_index = getCandidateWalls(floor_surface[1], isolated_surfaces) | |
primaryFOV = 0 | |
if primary_wall_index >= 0: | |
unavailable_surfaces.append(primary_wall_index) | |
primaryFOV = getFOV(floor_surface[1], candidate_walls[primary_wall_index][1]) | |
if args.debug: | |
print("Wall %d is primary" % primary_wall_index) | |
primary_wall_intersection = candidate_walls[primary_wall_index][2] | |
if primary_wall_index >= 0: | |
debugImage = d.labelMask(debugImage, primary_wall_intersection, "primary") | |
d.saveDiagnosticsImage(args, debugImage, image_name, "surfaces") | |
debugImage = d.plotVectors(floor_surface[1], floor_surface[1], candidate_walls[primary_wall_index][1]) | |
d.saveDiagnosticsImage(args, debugImage, image_name, "vectors") | |
#calculate floor rotation: | |
floor_rotation = 0.0 | |
fov = 80.0 | |
x_unit_normal = [1,0,0] | |
y_unit_normal = [0,1,0] | |
z_unit_normal = [0,0,1] | |
floor_angle_x = math.pi - geo.angle_between(floor_surface[1], y_unit_normal) | |
if primary_wall_index >= 0: | |
fov = primaryFOV | |
factor = float(deep_fov / fov) | |
print("Scale factor=%f" % factor) | |
floor_rotation = geo.angle_between(candidate_walls[primary_wall_index][1], x_unit_normal) | |
print("Primary wall normal %s, fov=%.2f" % (candidate_walls[primary_wall_index][1], primaryFOV)) | |
print("Floor normal %s, rotation=%.2f, elevation=%.2f" % (floor_surface[1], floor_rotation, floor_angle_x)) | |
#json data | |
data = {} | |
data['fov'] = fov; | |
data['cameraRotation'] = [1.37, 0.0, 0.0] | |
data['floorRotation'] = floor_rotation | |
#debugging: | |
data['floorNormal'] = floor_surface[1] | |
if primary_wall_index >= 0: | |
data['primaryWallNormal'] = candidate_walls[primary_wall_index][1] | |
#write file: | |
json_output_path = os.path.join(args.output_path, image_name, "data.json") | |
with open(json_output_path, 'w') as outfile: | |
json.dump(data, outfile, cls=NumpyEncoder, indent=5) | |
if args.debug: | |
json_debug_path = os.path.join(args.logging_path, image_name + "_data.json") | |
shutil.copyfile(json_output_path, json_debug_path) | |
return kmeans | |
def refineMask(args, rgb, mask,kmeans_normals, normals, image_name): | |
# normals = ip.cropImage(normals, 30) | |
# normals = cv2.pyrMeanShiftFiltering(normals, 21, 31) | |
shape = mask.shape[:2] | |
img = misc.imresize(rgb, mask.shape[:2]) | |
kmeans=kmeans_normals.copy() | |
kmeans = misc.imresize(kmeans, img.shape[:2]) | |
img = cv2.bilateralFilter(img,37,41,61) | |
kmeans_gray = cv2.cvtColor(kmeans, cv2.COLOR_BGR2GRAY) | |
ret, thresh = cv2.threshold(kmeans_gray, 127, 255, 0) | |
nmask = np.zeros(mask.shape, np.uint8) | |
# Get rid of garbage (holes) | |
im3, contours, hierarchy = cv2.findContours(thresh,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE) | |
if len(contours) > 0: | |
areas = [cv2.contourArea(c) for c in contours] | |
max_index = np.argmax(areas) | |
cnt=contours[max_index] | |
M = cv2.moments(cnt) | |
if M["m00"] != 0: | |
cX = int(M["m10"] / M["m00"]) | |
cY = int(M["m01"] / M["m00"]) | |
if kmeans[cY,cX][2] > 250: | |
nmask = ip.isolateColor(kmeans, kmeans[cY,cX]) | |
border=50 | |
border_mask=cv2.copyMakeBorder(nmask, border, border, | |
border, border, cv2.BORDER_REPLICATE) | |
cv2.rectangle(border_mask,(51,51), (border_mask.shape[1]-border-1,border_mask.shape[0]-border-1),(0),cv2.FILLED); | |
d.saveDiagnosticsImage(args, border_mask, image_name, "normalsint") | |
#kernel = np.ones((5,5),np.uint8) | |
#mask = cv2.erode(mask,kernel,iterations = 1) | |
mask=cv2.copyMakeBorder(mask, border, border, | |
border, border, cv2.BORDER_CONSTANT, value=(0)) | |
mask=cv2.add(mask,border_mask) | |
img=cv2.copyMakeBorder(img, border, border, | |
border, border, cv2.BORDER_CONSTANT, value=(0)) | |
mask[mask > 0] = 1 | |
img = cv2.bilateralFilter(img,37,41,61) | |
mask = ip.refineMaskWatershed(args, img, mask, image_name, max_value=1) | |
mask[mask > 0] = 255 | |
# Get rid of garbage (holes) | |
im2, contours, hierarchy = cv2.findContours(mask,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE) | |
if len(contours) > 0: | |
areas = [cv2.contourArea(c) for c in contours] | |
max_index = np.argmax(areas) | |
cnt=contours[max_index] | |
# compute the center of the contour | |
min_area = (shape[0] * shape[1]) / 50 | |
for contour in contours: | |
area = cv2.contourArea(contour) | |
if area < min_area: | |
# compute the center of the contour | |
M = cv2.moments(contour) | |
if M["m00"] != 0: | |
cX = int(M["m10"] / M["m00"]) | |
cY = int(M["m01"] / M["m00"]) | |
color = 0 if mask[cY,cX] > 0 else 255 | |
mask = cv2.fillPoly(mask, pts =[contour], color=color) | |
mask=ip.cropImage(mask, border) | |
img=ip.cropImage(img, border) | |
# Get rid of garbage (holes) | |
im2, contours, hierarchy = cv2.findContours(mask,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE) | |
if len(contours) > 0: | |
for contour in contours: | |
area = cv2.contourArea(contour) | |
if area < min_area: | |
# compute the center of the contour | |
M = cv2.moments(contour) | |
if M["m00"] != 0: | |
cX = int(M["m10"] / M["m00"]) | |
cY = int(M["m01"] / M["m00"]) | |
color = 0 if mask[cY,cX] > 0 else 255 | |
mask = cv2.fillPoly(mask, pts =[contour], color=color) | |
# Smooth it all | |
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(3,3)) | |
mask = cv2.dilate(mask,kernel) | |
mask = cv2.GaussianBlur(mask,(5,5),0) | |
d.saveDiagnosticsMask(args, mask, image_name, "done", img,d.HUE_GREEN) | |
return mask | |
# margin = 50 | |
# h, w = img.shape[:2] | |
# croppedNormals = normals[margin:h, margin:w].copy() | |
# normals = cv2.GaussianBlur(normals,(31,31),0) | |
# normals[margin:h, margin:w] = croppedNormals | |
# mixed = cv2.addWeighted(img, 0.3, normals, 0.7, 0.0) | |
# mixed = cv2.bilateralFilter(mixed,27,31,61) | |
# d.saveDiagnosticsImage(args, mixed, image_name, "mixed") | |
# mask = ip.refineMaskWatershed(args, mixed, mask, image_name, distance=0.01, max_value=1) | |
# img = cv2.GaussianBlur(img,(9,9),0) | |
# img = cv2.bilateralFilter(img,37,41,61) | |
# d.saveDiagnosticsImage(args, img, image_name, "bilateral") | |
# mask = ip.refineMaskWatershed(args, img, mask, image_name, distance=0.01, max_value=1) | |
#todo: fix polygons or remove defects | |
#d.saveDiagnosticsImage(args, gabor, image_name, "gabor") | |
return mask | |
def processImages(args): | |
src_paths = glob.glob(os.path.join(args.input_path, "*_raw.jpg")) | |
if args.upload_to_s3: | |
client = utils.getS3Client() | |
else: | |
client = None | |
for src_path in src_paths: | |
name = os.path.basename(src_path) | |
image_name = name[:name.index("_raw.jpg")] | |
normals_path = os.path.join(args.input_path, (image_name + "_normals.png")) | |
mask_path = os.path.join(args.input_path, (image_name + "_mask.png")) | |
mask_output_path = os.path.join(args.output_path, image_name, "mask.png") | |
project_path = os.path.join(args.output_path, image_name) | |
if os.path.isfile(normals_path) and os.path.isfile(mask_path) and (not os.path.isfile(mask_output_path) or not args.loop): | |
print("\nRefining mask %s outputing to %s" % (mask_path, mask_output_path)) | |
utils.ensureDirectoryExists(project_path) | |
img = cv2.imread(src_path) | |
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE) | |
img = ip.constrainImage(img, PROCESSING_SIZE) | |
mask = ip.constrainImage(mask, PROCESSING_SIZE) | |
print("\nFinding walls and floors for %s" % normals_path) | |
#normal est | |
normals = cv2.imread(normals_path) | |
normals = cv2.cvtColor(normals, cv2.COLOR_BGR2RGB) | |
normals = ip.constrainImage(normals, PROCESSING_SIZE) | |
kmeans_normals = determinePrimaryAngles(normals, mask, image_name) | |
if kmeans_normals is None: | |
os.remove(mask_path) | |
continue | |
mask[mask < 255] = 0 | |
if args.debug: | |
floor = mask.copy() | |
d.saveDiagnosticsMask(args, floor, image_name, "mask", img, d.HUE_GREEN) | |
mask = refineMask(args, img, mask, kmeans_normals, normals, image_name) | |
floor = mask.copy() | |
d.saveDiagnosticsMask(args, floor, image_name, "refined", img, d.HUE_GREEN) | |
floor = cv2.GaussianBlur(floor,(5,5),0) | |
# ip.findVanishingPoints(args, img, image_name) | |
if cv2.imwrite(mask_output_path, floor): | |
print("\nWrote mask to %s" % mask_output_path) | |
if not client is None: | |
utils.uploadDirectoryToS3(client, project_path, image_name) | |
print ("Uploads complete, Finished processing.\n") | |
else: | |
print("\nCould not write mask to %s" % mask_output_path) | |
os.remove(mask_path) | |
if __name__ == "__main__": | |
parser = utils.stdArgs() | |
args = parser.parse_args() | |
print(args) | |
if not os.path.exists(args.output_path): | |
os.makedirs(args.output_path) | |
if args.loop: | |
print("\nWaiting on input at %s \n" % args.input_path) | |
while True: | |
processImages(args) | |
time.sleep(0.5) | |
else: | |
print("\nProcessing input ONCE from %s \n" % args.input_path) | |
processImages(args) | |
print("\nDone.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment