Skip to content

Instantly share code, notes, and snippets.

@billdueber
Last active January 13, 2022 17:18

Revisions

  1. billdueber revised this gist Jan 13, 2022. 1 changed file with 12 additions and 12 deletions.
    24 changes: 12 additions & 12 deletions rsolr_streamer.rb
    Original file line number Diff line number Diff line change
    @@ -54,6 +54,18 @@ def initialize(rsolr:, handler: "select", filter: "*:*", sort: "id asc", batch_s
    @fields = fields
    yield self if block_given?
    end

    # Iterate through the documents in the stream. Behind the scenes, these will be fetched in batches
    # of `batch_size` for efficiency.
    # @yieldreturn [Hash] A single solr document in the stream
    def each
    return enum_for(:each) unless block_given?
    verify_we_have_everything!
    while solr_has_more?
    cursor_response = get_page
    cursor_response.docs.each { |d| yield d }
    end
    end

    # @private
    # @return [Hash] Default solr params derived from instance variables
    @@ -88,18 +100,6 @@ def get_page
    def solr_has_more?
    @last_cursor != @current_cursor
    end

    # Iterate through the documents in the stream. Behind the scenes, these will be fetched in batches
    # of `batch_size` for efficiency.
    # @yieldreturn [Hash] A single solr document in the stream
    def each
    return enum_for(:each) unless block_given?
    verify_we_have_everything!
    while solr_has_more?
    cursor_response = get_page
    cursor_response.docs.each { |d| yield d }
    end
    end
    end

    # Utility wrapper around rsolr response
  2. billdueber created this gist Jan 13, 2022.
    125 changes: 125 additions & 0 deletions rsolr_streamer.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,125 @@
    # frozen_string_literal: true

    # # Simple example -- get ids and titles of all items without an author
    #
    # rsolr = RSolr.connect(url: 'http://localhost:8025/solr/catalog')
    # stream = rsolr.streamer(handler: 'select') do |s|
    # s.filter = 'NOT author:[* TO *]'
    # s.sort = 'id asc'
    # s.fields = ['id', 'title']
    # s.batch_size = 2_000
    # end

    # File.open("no_author.tsv", "w:utf-8") do |outfile|
    # stream.each do |doc|
    # outfile.puts [doc['id'], doc['title']].join("\t")
    # end
    # end


    require "rsolr"
    require "delegate"

    module RSolr

    class Client
    def streamer(**kwargs)
    Streamer.new(rsolr: self, **kwargs)
    end
    end

    # Cursor-based streamer to iterate through non-ranked documents, possibly filtered, from a solr core without the usual
    # problems associated with deep paging. Note that this can't efficiently get you straight to a deep page, it
    # only has benefits when pulling a long set of documents from solr.
    class Streamer
    include Enumerable

    attr_accessor :handler, :filter, :sort, :batch_size, :rsolr, :fields

    # @param [RSolr] rsolr an underlying rsolr object pointing to a core, created however you want
    # @param [String] handler The handler to target
    # @param [String] filter The filter to apply before streaming.
    # @param [Sort] sort A solr sort; must on a unique index
    # @param [Integer] batch_size How many documents to pull from solr at onces when streaming
    # @param [Array<String>] fields Which fields to return with each document. Default is "all"
    # @yieldreturn [RSolr::Streamer] the new object
    def initialize(rsolr:, handler: "select", filter: "*:*", sort: "id asc", batch_size: 1000, fields: [])
    @rsolr = rsolr
    @handler = handler
    @filter = filter
    @batch_size = batch_size
    @sort = sort
    @last_cursor = nil
    @current_cursor = "*"
    @fields = fields
    yield self if block_given?
    end

    # @private
    # @return [Hash] Default solr params derived from instance variables
    def default_params
    fl = Array(fields).join(",")
    p = {q: "*:*", wt: :ruby, rows: @batch_size, sort: @sort, fq: @filter, fl: fl}
    p.reject { |k, v| [nil, "", []].include?(v) }
    end

    # @private
    # Make sure we have everything we need for a successful stream
    def verify_we_have_everything!
    missing = {rsolr: rsolr, handler: handler, filter: filter, batch_size: batch_size}.select { |_k, v| v.nil? }.keys
    raise "RSolr::Streamer missing value for #{missing.join(", ")}" unless missing.empty?
    end

    # @private
    # Get a single "page" (`batch_size` documents) from solr. Feeds into #each
    # @return [CursorResponse]
    def get_page
    params = {cursorMark: @current_cursor, fl: Array(fields).join(",")}.merge default_params

    resp = CursorResponse.new(@rsolr.get(@handler, params: params))
    @last_cursor = @current_cursor
    @current_cursor = resp.cursor
    resp
    end

    # @private
    # Determine if solr has another page of results
    # @return [Boolean]
    def solr_has_more?
    @last_cursor != @current_cursor
    end

    # Iterate through the documents in the stream. Behind the scenes, these will be fetched in batches
    # of `batch_size` for efficiency.
    # @yieldreturn [Hash] A single solr document in the stream
    def each
    return enum_for(:each) unless block_given?
    verify_we_have_everything!
    while solr_has_more?
    cursor_response = get_page
    cursor_response.docs.each { |d| yield d }
    end
    end
    end

    # Utility wrapper around rsolr response
    class CursorResponse < SimpleDelegator
    def initialize(rsolr_response)
    super
    @resp = rsolr_response
    __setobj__(@resp)
    end

    def docs
    @resp["response"]["docs"]
    end

    def num_found
    @resp["response"]["numFound"]
    end

    def cursor
    @resp["nextCursorMark"]
    end
    end
    end