Last active
January 19, 2017 21:36
-
-
Save simonmorley/5901be947407ea427f0a to your computer and use it in GitHub Desktop.
Import and Alias Mulitple Indexes To Elasticsearch Rails
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A collection of Rake tasks to facilitate importing data from yout models into Elasticsearch. | |
# This will import the index as an alias, update the alias and delete the old ones | |
# It will also allow the importation of indexes via arguments | |
# Add this e.g. into the `lib/tasks/elasticsearch.rake` file in your Rails application: | |
# | |
# require 'elasticsearch/rails/tasks/import' | |
# | |
# To import the records from your `Article` model, run: | |
# | |
# $ bundle exec rake environment elasticsearch:import:model CLASS='MyModel, MyOtherModel' | |
# | |
STDOUT.sync = true | |
STDERR.sync = true | |
begin; require 'ansi/progressbar'; rescue LoadError; end | |
namespace :elasticsearch do | |
task :import => 'import:model' | |
namespace :import do | |
task :model do | |
if ENV['CLASS'].to_s == '' | |
puts '='*90, 'USAGE', '='*90, import_model_desc, "" | |
exit(1) | |
end | |
@date ||= Time.now.strftime '%Y%m%d%H%M%S' | |
klasses = ENV['CLASS'].split(",") | |
klasses.each do |klass| | |
klass = eval(klass.to_s) | |
@client = klass.__elasticsearch__.client | |
@alias_name = "#{klass.index_name}_#{@date}" | |
unless ENV['DEBUG'] | |
begin | |
klass.__elasticsearch__.client.transport.logger.level = Logger::WARN | |
rescue NoMethodError; end | |
begin | |
klass.__elasticsearch__.client.transport.tracer.level = Logger::WARN | |
rescue NoMethodError; end | |
end | |
def import_index(klass) | |
total = klass.count rescue nil | |
pbar = ANSI::Progressbar.new(klass.to_s, total) rescue nil | |
pbar.__send__ :show if pbar | |
total_errors = klass.import force: ENV.fetch('FORCE', true), | |
batch_size: ENV.fetch('BATCH', 1000).to_i, | |
index: @alias_name, | |
type: ENV.fetch('TYPE', nil) do |response| | |
pbar.inc response['items'].size if pbar | |
STDERR.flush | |
STDOUT.flush | |
end | |
pbar.finish if pbar | |
puts "[CREATED ALIASES] #{total_errors} errors occurred" unless total_errors.zero? | |
end | |
def delete_old_aliases(index_name) | |
aliases = @client.indices.get_aliases(index: index_name).keys | |
aliases.each do |alias_name| | |
begin | |
if Time.parse(alias_name.gsub(/locations_/, '')) < 1.weeks.ago | |
@client.indices.delete index: alias_name | |
puts "Deleted alias #{alias_name}" | |
end | |
rescue | |
## Needs rescue message ### | |
end | |
end | |
puts '[IMPORT] Done' | |
end | |
def create_alias(index_name) | |
@client.indices.delete index: index_name rescue false | |
@client.indices.update_aliases body: { | |
actions: [ | |
{ add: { index: @alias_name, alias: index_name } }, | |
] | |
} | |
puts "[UPDATED ALIAS]" | |
end | |
### Import to alias ### | |
import_index(klass) | |
### LINK THE ALIAS TO THE INDEX NAME ### | |
create_alias(klass.index_name) | |
### UPDATE OLD INDICES ### | |
delete_old_aliases(klass.index_name) | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment