Last active
July 11, 2016 15:10
-
-
Save hatoo/cd3804620b13aad11ad1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
from chainer import cuda | |
from chainer import function | |
from chainer.functions import Sigmoid | |
from chainer.utils import type_check | |
import numpy | |
def _as_mat(x): | |
if x.ndim == 2: | |
return x | |
return x.reshape(len(x), -1) | |
class Autoencoder(function.Function): | |
def __init__(self, in_size, hidden_size, activation=Sigmoid, | |
wscale=1, bias=0, | |
initialW=None, initial_bias1=None, initial_bias2=None): | |
self.W = None | |
self.gW = None | |
self.b1 = None | |
self.b2 = None | |
self.gb1 = None | |
self.gb2 = None | |
self.activation = None | |
if initialW is not None: | |
assert initialW.shape == (hidden_size, in_size) | |
self.W = initialW | |
else: | |
self.W = numpy.random.normal( | |
0, wscale * math.sqrt(1. / in_size), | |
(hidden_size, in_size)).astype(numpy.float32) | |
xp = cuda.get_array_module(self.W) | |
self.gW = xp.full_like(self.W, numpy.nan) | |
if initial_bias1 is not None: | |
assert initial_bias1.shape == (hidden_size,) | |
self.b1 = initial_bias1 | |
else: | |
self.b1 = numpy.repeat(numpy.float32(bias), hidden_size) | |
if initial_bias2 is not None: | |
assert initial_bias2.shape == (in_size,) | |
self.b2 = initial_bias2 | |
else: | |
self.b2 = numpy.repeat(numpy.float32(bias), in_size) | |
self.gb1 = xp.empty_like(self.b1) | |
self.gb2 = xp.empty_like(self.b2) | |
if activation is not None: | |
if activation == Sigmoid: | |
self.activation = activation() | |
else: | |
self.activation = activation | |
def hidden(self, x): | |
h = _Encoder(self.W, self.b1)(x) | |
if self.activation is not None: | |
h = self.activation(h) | |
h.unchain_backward() | |
return h | |
@property | |
def parameter_names(self): | |
return 'W', 'b1', 'b2' | |
@property | |
def gradient_names(self): | |
return 'gW', 'gb1', 'gb2' | |
def check_type_forward(self, in_types): | |
type_check.expect(in_types.size() == 1) | |
x_type, = in_types | |
type_check.expect( | |
x_type.dtype == numpy.float32, | |
x_type.ndim >= 2, | |
(type_check.Variable(numpy.prod, 'prod')(x_type.shape[1:]) == | |
type_check.Variable(self.W.shape[1], 'W.shape[1]')), | |
) | |
def check_type_backward(self, in_types, out_types): | |
type_check.expect( | |
in_types.size() == 1, | |
out_types.size() == 1, | |
) | |
x_type, = in_types | |
y_type, = out_types | |
type_check.expect( | |
y_type.dtype == numpy.float32, | |
y_type.ndim == 2, | |
y_type.shape[0] == x_type.shape[0], | |
y_type.shape[1] == type_check.Variable(self.W.shape[1], | |
'W.shape[1]'), | |
) | |
def zero_grads(self): | |
self.gW.fill(0) | |
self.gb1.fill(0) | |
self.gb2.fill(0) | |
def forward(self, x): | |
_x = _as_mat(x[0]) | |
Wx = _x.dot(self.W.T) | |
Wx += self.b1 | |
self.x_activation = Wx | |
if self.activation is not None: | |
h, = self.activation.forward([Wx]) | |
else: | |
h = Wx | |
self.x_decode = h | |
y = h.dot(self.W) | |
y += self.b2 | |
return y, | |
def backward(self, x, gy): | |
_x = self.x_decode | |
_gy = gy[0] | |
self.gW += _x.T.dot(_gy) | |
self.gb2 += _gy.sum(0) | |
_gy = _gy.dot(self.W.T).reshape(_x.shape) | |
if self.activation is not None: | |
_gy, = self.activation.backward([self.x_activation], [_gy]) | |
_x = _as_mat(x[0]) | |
self.gW += _gy.T.dot(_x) | |
self.gb1 += _gy.sum(0) | |
return _gy.dot(self.W).reshape(x[0].shape), | |
# undifferentiable Linear function | |
class _Encoder(function.Function): | |
def __init__(self, initialW, initial_Bias): | |
self.W = initialW | |
self.b = initial_Bias | |
def check_type_forward(self, in_types): | |
type_check.expect(in_types.size() == 1) | |
x_type, = in_types | |
type_check.expect( | |
x_type.dtype == numpy.float32, | |
x_type.ndim >= 2, | |
(type_check.Variable(numpy.prod, 'prod')(x_type.shape[1:]) == | |
type_check.Variable(self.W.shape[1], 'W.shape[1]')), | |
) | |
def forward(self, x): | |
x = _as_mat(x[0]) | |
Wx = x.dot(self.W.T) | |
Wx += self.b | |
return Wx, |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from chainer import cuda, Variable, FunctionSet, optimizers | |
import chainer.functions as F | |
from autoencoder import Autoencoder | |
import math | |
import theano | |
import theano.tensor as T | |
learning_rate = 0.01 | |
n_in = 3 | |
n_hidden = 5 | |
initW = np.random.normal(0, math.sqrt(1. / n_in), (n_hidden, n_in)).astype(np.float32)#(theano.config.floatX) | |
#setup theano | |
x = T.matrix() | |
w = theano.shared(initW) | |
b1 = theano.shared(np.zeros(n_hidden, dtype=theano.config.floatX)) | |
b2 = theano.shared(np.zeros(n_in, dtype=theano.config.floatX)) | |
h = x.dot(w.T)+b1 | |
h = T.nnet.sigmoid(h) | |
y = h.dot(w)+b2 | |
loss = ((y-x)**2).mean() | |
updates = [(p, p - learning_rate * T.grad(loss, p)) for p in [w, b1, b2]] | |
train_theano = theano.function([x], [y, loss], updates=updates) | |
hidden_theano = theano.function([x], h) | |
#setup chainer | |
cuda.check_cuda_available() | |
cuda.get_device(0).use() | |
model_cpu=FunctionSet(ae = Autoencoder(n_in, n_hidden, initialW=initW)) | |
model_gpu=FunctionSet(ae = Autoencoder(n_in, n_hidden, initialW=initW)) | |
model_gpu.to_gpu() | |
data_cpu = np.array([[1,2,3],[4,5,6]]).astype(np.float32) / 10.0 | |
data_gpu = cuda.to_gpu(data_cpu) | |
x_cpu = Variable(data_cpu) | |
x_gpu = Variable(data_gpu) | |
opt_cpu = optimizers.SGD(learning_rate) | |
opt_gpu = optimizers.SGD(learning_rate) | |
opt_cpu.setup(model_cpu) | |
opt_gpu.setup(model_gpu) | |
for epoch in range(1,1000+1): | |
y_cpu = model_cpu.ae(x_cpu) | |
y_gpu = model_gpu.ae(x_gpu) | |
y_theano, loss_theano = train_theano(data_cpu) | |
loss_cpu = F.mean_squared_error(y_cpu, x_cpu) | |
loss_gpu = F.mean_squared_error(y_gpu, x_gpu) | |
opt_cpu.zero_grads() | |
loss_cpu.backward() | |
opt_cpu.update() | |
opt_gpu.zero_grads() | |
loss_gpu.backward() | |
opt_gpu.update() | |
print 'epoch ', epoch | |
print 'y_cpu', y_cpu.data | |
print 'loss_cpu', loss_cpu.data | |
print 'hidden_cpu', model_cpu.ae.hidden(x_cpu).data | |
print 'y_gpu', cuda.to_cpu(y_gpu.data) | |
print 'loss_gpu', cuda.to_cpu(loss_gpu.data) | |
print 'hidden_gpu', cuda.to_cpu(model_gpu.ae.hidden(x_gpu).data) | |
print 'y_theano', y_theano | |
print 'loss_theano', loss_theano | |
print 'hidden_theano', hidden_theano(data_cpu) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment