Skip to content

Instantly share code, notes, and snippets.

@y8
Last active August 29, 2015 14:23

Revisions

  1. y8 revised this gist Jun 22, 2015. 1 changed file with 24 additions and 21 deletions.
    45 changes: 24 additions & 21 deletions kalman_ips.rb
    Original file line number Diff line number Diff line change
    @@ -118,36 +118,37 @@ def to_s

    private
    def update!
    self.update_time = benchmark do
    ## Update stage
    time = Time.now
    ## Update stage

    # y = z - (H * x)
    self.error = self.current_measurement - (self.readings * self.state)
    # y = z - (H * x)
    self.error = self.current_measurement - (self.readings * self.state)

    # S = H * P * Ht + R
    self.system_uncertainty = self.readings * self.process_noise * self.readings.transpose + self.sensor_noise
    # S = H * P * Ht + R
    self.system_uncertainty = self.readings * self.process_noise * self.readings.transpose + self.sensor_noise

    # K = P * Ht + S^-1
    self.kalman_gain = self.process_noise * self.readings.transpose * system_uncertainty.inverse
    # K = P * Ht + S^-1
    self.kalman_gain = self.process_noise * self.readings.transpose * system_uncertainty.inverse

    # x = x + (K * y)
    self.state = self.state + (self.kalman_gain * error)
    # x = x + (K * y)
    self.state = self.state + (self.kalman_gain * error)

    # P = (I - (K * H)) * P
    self.process_noise = (self.identity - (kalman_gain * self.readings)) * self.process_noise
    end
    # P = (I - (K * H)) * P
    self.process_noise = (self.identity - (kalman_gain * self.readings)) * self.process_noise

    self.update_time = Time.now - time
    end

    def predict!
    self.predict_time = benchmark do
    ## Prediction stage
    time = Time.now
    ## Prediction stage

    # x = (F * x) + u
    self.state = (self.model * self.state) + self.control
    # x = (F * x) + u
    self.state = (self.model * self.state) + self.control

    # P = F * P * Ft
    self.process_noise = self.model * self.process_noise * self.model.transpose
    end
    # P = F * P * Ft
    self.process_noise = self.model * self.process_noise * self.model.transpose
    self.predict_time = Time.now - time
    end

    def benchmark
    @@ -163,10 +164,12 @@ def benchmark

    kalman = Kalman.new

    puts RUBY_DESCRIPTION

    Benchmark.ips do |x|
    # Configure the number of seconds used during
    # the warmup phase (default 2) and calculation phase (default 5)
    x.config(:time => 5, :warmup => 2)
    x.config(:time => 10, :warmup => 2)

    # To reduce overhead, the number of iterations is passed in
    # and the block must run the code the specific number of times.
  2. y8 created this gist Jun 22, 2015.
    181 changes: 181 additions & 0 deletions kalman_ips.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,181 @@
    require 'benchmark/ips'
    require "matrix"
    require "time"

    class Kalman
    attr_accessor :state
    attr_accessor :process_noise
    attr_reader :control
    attr_accessor :model
    attr_reader :readings
    attr_reader :sensor_noise
    attr_reader :identity

    attr_accessor :error
    attr_accessor :system_uncertainty
    attr_accessor :kalman_gain

    attr_accessor :current_measurement
    attr_accessor :current_timestep

    attr_accessor :update_time, :predict_time

    def initialize
    # x = Estimate value (intial state)
    @state = Matrix[
    [247.0], # Height,
    [0.0], # Velocity
    [0.0] # Acceleration
    ]

    # P = Initial uncertainty (process noise)
    @process_noise = Matrix[
    [100, 0.0, 0.0], # We're not certian about height
    [0.0, 100, 0.0], # We're not certain about velocity
    [0.0, 0.0, 100], # We're not certain about acceleration
    ]

    # u = External motion (control signal)
    # There no control signal in our case, so it's just zeroes
    @control = Matrix[
    [0.0], # Height control signal, none
    [0.0], # Velocity control signal, none
    [0.0], # Acceleration control signal, none
    ]

    # H = Measurement function (measured params)
    @readings = Matrix[[
    1.0, # We can measure height
    0.0, # But we can't measure velocity
    0.0 # Not acceleration
    ]].freeze

    # R = Measurement uncertainty (measurement noise)
    @sensor_noise = Matrix[
    [0.6] # We're sensing only height, and there some error associated with height
    ]

    # z = Current measurement (observed value)
    @current_measurement = Matrix[
    [0.0]
    ]

    # y = Error
    @error = Matrix[
    [0.0]
    ]

    # K = Kalman gain (current measurement uncertainty)
    @kalman_gain = Matrix[
    [0.0],
    [0.0],
    [0.0]
    ]

    # S = System Uncertainty
    @system_uncertainty = Matrix[
    [0.0]
    ]

    # I = Identity matrix
    @identity = Matrix.identity(3).freeze

    @update_time = 0.0
    @predict_time = 0.0
    end

    # F = Next state function (process model)
    # This is where everything is goes nuts:
    # You need to come up with a matrix, that will represent a model
    # of changes between states. So if you apply this matrix to previous
    # state vector, you will get the next state prediction vector.
    def model
    Matrix[
    [1.0, self.current_timestep, 0.0], # height = height.prev + timestep * velocity.prev
    [0.0, 1.0, self.current_timestep], # velocity = velocity.prev + time * acceleration
    [0.0, 0.0, 1.0] # acceleration = prev.acceleration
    ].freeze
    end

    def measure(value, timestep)
    self.current_timestep = timestep
    self.current_measurement = Matrix[
    [value]
    ]

    update!
    predict!
    end

    def runtime_ms
    (self.update_time + self.predict_time) * 1000.0
    end

    def to_s
    {state: self.state, noise: self.process_noise}
    end
    alias_method :inspect, :to_s

    private
    def update!
    self.update_time = benchmark do
    ## Update stage

    # y = z - (H * x)
    self.error = self.current_measurement - (self.readings * self.state)

    # S = H * P * Ht + R
    self.system_uncertainty = self.readings * self.process_noise * self.readings.transpose + self.sensor_noise

    # K = P * Ht + S^-1
    self.kalman_gain = self.process_noise * self.readings.transpose * system_uncertainty.inverse

    # x = x + (K * y)
    self.state = self.state + (self.kalman_gain * error)

    # P = (I - (K * H)) * P
    self.process_noise = (self.identity - (kalman_gain * self.readings)) * self.process_noise
    end
    end

    def predict!
    self.predict_time = benchmark do
    ## Prediction stage

    # x = (F * x) + u
    self.state = (self.model * self.state) + self.control

    # P = F * P * Ft
    self.process_noise = self.model * self.process_noise * self.model.transpose
    end
    end

    def benchmark
    time = Time.now.to_f
    yield
    return Time.now.to_f - time
    end
    end

    last_value = 0.0
    time_step = 0.1
    increment = 0.05

    kalman = Kalman.new

    Benchmark.ips do |x|
    # Configure the number of seconds used during
    # the warmup phase (default 2) and calculation phase (default 5)
    x.config(:time => 5, :warmup => 2)

    # To reduce overhead, the number of iterations is passed in
    # and the block must run the code the specific number of times.
    # Used for when the workload is very small and any overhead
    # introduces incorrectable errors.
    x.report("kalman") do |iterations|
    iterations.times do
    kalman.measure last_value, time_step
    last_value = last_value + increment
    end
    end
    end