Last active
January 11, 2017 03:46
-
-
Save samehmohamed88/f3d831140ead1edb09daa9c10ca847ec to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pickle | |
import json | |
import random | |
import csv | |
import cv2 | |
import numpy as np | |
from keras.preprocessing.image import ImageDataGenerator | |
from keras.models import Sequential | |
from keras.optimizers import Adam | |
from keras.callbacks import ModelCheckpoint | |
from keras.layers import Convolution2D, MaxPooling2D, ZeroPadding2D | |
from keras.layers import Activation, Dropout, Flatten, Dense | |
from keras.layers.core import Lambda | |
from keras.callbacks import Callback | |
from keras.utils import np_utils | |
_index_in_epoch = 0 | |
nb_epoch = 10 | |
batch_size = 32 | |
img_height, img_width = 64, 64 | |
global threshold | |
global zero_count | |
global zeros_to_add | |
threshold = .05 | |
zero_count = 0 | |
zeros_to_add = 0 | |
class LifecycleCallback(Callback): | |
def on_epoch_begin(self, epoch, logs={}): | |
pass | |
def on_epoch_end(self, epoch, logs={}): | |
global threshold | |
threshold = threshold + (epoch * .05) | |
if threshold > 0.8: | |
threshold = 0.2 | |
def adjust_brightness(img): | |
image = cv2.cvtColor(img, cv2.COLOR_RGB2HSV) | |
random_bright = .25+np.random.uniform() | |
image[:,:,2] = image[:,:,2]*random_bright | |
image = cv2.cvtColor(image, cv2.COLOR_HSV2RGB) | |
return image | |
def shuffle(x, y): | |
perm = np.arange(len(x)) | |
np.random.shuffle(perm) | |
x = x[perm] | |
y = y[perm] | |
return (x, y) | |
def train_test_split(X, Y): | |
count = int(len(X)*.7) | |
X_train = X[:count] | |
Y_train = Y[:count] | |
X_val = X[count:] | |
Y_val = Y[count:] | |
return (X_train, Y_train, X_val, Y_val) | |
def load_training_and_validation(): | |
global zero_count | |
rows, labels = [], [] | |
with open('data/driving_log.csv', 'r') as _f: | |
reader = csv.reader(_f, delimiter=',') | |
next(reader, None) | |
for row in reader: | |
rows.append(row[0].strip()) | |
labels.append(float(row[3])) | |
# left camera | |
rows.append(row[1].strip()) | |
labels.append(float(row[3])+.25) | |
# right camera | |
rows.append(row[2].strip()) | |
labels.append(float(row[3])-0.25) | |
if float(row[3]) == 0: | |
zero_count += 1 | |
assert len(rows) == len(labels), 'unbalanced data' | |
# shuffle the data | |
X, Y = shuffle(np.array(rows), np.array(labels)) | |
# split into training and validation | |
return train_test_split(X, Y) | |
def resize_image(img): | |
return cv2.resize(img,( 64, 64)) | |
def affine_transform(img, angle, adjust, sub=False): | |
cols, rows, ch = img.shape | |
if sub: | |
angle -= adjust | |
pts1 = np.float32([[75, 75], [150, 75], [50, 250]]) | |
pts2 = np.float32([[60, 60], [140, 75], [50, 320]]) | |
else: | |
angle += adjust | |
pts1 = np.float32([[75, 75], [150, 75], [50,250]]) | |
pts2 = np.float32([[65, 75], [165, 75], [50,250]]) | |
M = cv2.getAffineTransform(pts1, pts2) | |
dst = cv2.warpAffine(img, M, (rows, cols)) | |
return dst.reshape((cols, rows, ch)), angle | |
def next_batch(data, labels, batch_size): | |
"""Return the next `batch_size` examples from this data set.""" | |
global _index_in_epoch | |
start = _index_in_epoch | |
_index_in_epoch += batch_size | |
_num_examples = len(data) | |
if _index_in_epoch > _num_examples: | |
# Shuffle the data | |
data, labels = shuffle(data, labels) | |
# Start next epoch | |
start = 0 | |
_index_in_epoch = batch_size | |
assert batch_size <= _num_examples | |
end = _index_in_epoch | |
return data[start:end], labels[start:end] | |
def transform_generator(x, y, batch_size=32, is_validation=False): | |
global zeros_to_add | |
global zero_count | |
while True: | |
bad = [] | |
images, labels = list(), list() | |
_images, _labels = next_batch(x, y, batch_size) | |
current = os.path.dirname(os.path.realpath(__file__)) | |
for i in range(len(_images)): | |
img = cv2.imread('{}/data/{}'.format(current, _images[i]), 1) | |
if img is None: continue | |
image = adjust_brightness(img) | |
resized = resize_image(image) | |
angle = _labels[i] | |
images.append(resized) | |
labels.append(angle) | |
# resized = img.reshape(img_height, img_width, 3) | |
if angle == 0: | |
zeros_to_add += 1 | |
if zeros_to_add < int(zero_count*.25): | |
resized = resize_image(img) | |
angle = _labels[i] | |
images.append(resized) | |
labels.append(angle) | |
continue | |
elif angle > 0: | |
sub = False | |
else: | |
sub = True | |
if abs(angle) > 0: | |
if is_validation: continue | |
adjust = .06 * 5 * threshold | |
# image, angle = affine_transform(img, angle, adjust, sub=sub) | |
# resized = resize_image(image) | |
# images.append(resized) | |
# labels.append(angle) | |
image = cv2.flip(img, 1) | |
resized = resize_image(image) | |
images.append(resized) | |
labels.append(-1*angle) | |
X = np.array(images, dtype=np.float64).reshape((-1, img_height, img_width, 3)) | |
Y = np.array(labels, dtype=np.float64) | |
yield (X, Y) | |
def gen_model(): | |
model = Sequential() | |
# (((64 - 5) + 4) / 1.) + 1 | |
model.add(Convolution2D(16, 5, 5, subsample=(1, 1), input_shape=(img_height, img_width, 3))) | |
model.add(ZeroPadding2D(padding=(2, 2))) | |
model.add(Activation('relu')) | |
model.add(MaxPooling2D(pool_size=(2, 2))) | |
# (((64 - 4) + 4) / 2.) + 1 | |
model.add(Convolution2D(32, 4, 4, subsample=(1, 1))) | |
model.add(ZeroPadding2D(padding=(2, 2))) | |
model.add(Activation('relu')) | |
model.add(MaxPooling2D(pool_size=(2, 2))) | |
# (((33 - 3) + 4) / 2.) + 1 | |
model.add(Convolution2D(64, 3, 3, subsample=(1, 1))) | |
model.add(ZeroPadding2D(padding=(2, 2))) | |
model.add(Activation('relu')) | |
model.add(MaxPooling2D(pool_size=(2, 2))) | |
# (((18 - 2) + 4) / 2.) + 1 | |
model.add(Convolution2D(64, 2, 2, subsample=(2, 2))) | |
model.add(ZeroPadding2D(padding=(0, 0))) | |
model.add(Activation('relu')) | |
model.add(MaxPooling2D(pool_size=(2, 2))) | |
model.add(Flatten()) | |
model.add(Dense(1024)) | |
model.add(Dropout(0.5)) | |
model.add(Activation('sigmoid')) | |
model.add(Dense(1)) | |
adam = Adam(lr=0.0001) | |
model.compile(loss='mean_squared_error', optimizer=adam) | |
return model | |
def main(): | |
X_train, Y_train, X_val, Y_val = load_training_and_validation() | |
assert len(X_train) == len(Y_train), 'unbalanced training data' | |
assert len(X_val) == len(Y_val), 'unbalanced validation data' | |
print(len(X_train), "training images and ", len(X_val), "validation images") | |
model = gen_model() | |
filepath = "weights-improvement-{epoch:02d}-{val_loss:.4f}.h5" | |
checkpoint = ModelCheckpoint(filepath, verbose=1, save_best_only=True) | |
model.fit_generator( | |
transform_generator(X_train, Y_train), | |
samples_per_epoch=(len(X_train)*2), | |
nb_epoch=nb_epoch, | |
validation_data=transform_generator(X_val, Y_val, is_validation=True), | |
nb_val_samples=len(X_val), | |
callbacks=[checkpoint]) | |
print("Saving model weights and configuration file.") | |
if not os.path.exists("./outputs/sim"): | |
os.makedirs("./outputs/sim") | |
model.save_weights("./outputs/sim/sim.h5", True) | |
with open('./outputs/sim/sim.json', 'w') as outfile: | |
json.dump(model.to_json(), outfile) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment