|
# frozen_string_literal: true |
|
|
|
require 'bundler/inline' |
|
require 'json' |
|
|
|
gemfile do |
|
source 'https://rubygems.org' |
|
|
|
gem 'async' |
|
gem 'async-http-faraday' |
|
gem 'baran' |
|
gem 'ruby-openai' |
|
end |
|
|
|
require 'async' |
|
require 'async/barrier' |
|
require 'async/semaphore' |
|
require 'async/http/faraday' |
|
require 'async/http/faraday/default' |
|
|
|
# Convert post to chunks |
|
class ChunkService |
|
attr_reader :splitter |
|
|
|
def initialize(splitter:) |
|
@splitter = splitter |
|
end |
|
|
|
def call(paths:) |
|
Dir[paths] |
|
.map { |path| JSON.parse(File.read(path)) } |
|
.flat_map { |post| splitter.chunks(post.delete('content'), metadata: post) } |
|
end |
|
end |
|
|
|
# Get embeddings for each chunk |
|
class PostEmbeddingService |
|
attr_reader :openai, :model |
|
|
|
def initialize(openai:, model: 'text-embedding-3-small') |
|
@openai = openai |
|
@model = model |
|
end |
|
|
|
def call(post:) |
|
post[:id] = [post[:metadata].delete('id'), post.delete(:cursor)].compact.join('.') |
|
post[:values] = vector_of(text: post[:text]) |
|
post[:metadata][:text] = post.delete(:text) |
|
post |
|
rescue StandardError => e |
|
puts "Unable to process post: #{post[:id]}: #{e.message}" |
|
nil |
|
end |
|
|
|
private |
|
|
|
def vector_of(text:) |
|
openai.embeddings( |
|
parameters: { |
|
model: model, |
|
input: text |
|
} |
|
).dig('data', 0, 'embedding') |
|
end |
|
end |
|
|
|
# Convert post to embeddings for Cloudflare Vectorize Store |
|
class EmbeddingUseCase |
|
attr_reader :chunk_service, :embedding_service |
|
|
|
def initialize(chunk_service:, embedding_service:) |
|
@chunk_service = chunk_service |
|
@embedding_service = embedding_service |
|
end |
|
|
|
def execute(paths:, dest:, concurrency: 5) |
|
output = File.open(dest, 'w') |
|
posts = chunk_service.call(paths: paths) |
|
|
|
async(concurrency: concurrency) do |semaphore| |
|
posts.each do |post| |
|
semaphore.async do |
|
append(post: post, to: output) |
|
end |
|
end |
|
end |
|
|
|
output.close |
|
end |
|
|
|
private |
|
|
|
def async(concurrency: 5) |
|
barrier = Async::Barrier.new |
|
semaphore = Async::Semaphore.new(concurrency, parent: barrier) |
|
|
|
Async do |
|
yield semaphore |
|
barrier.wait |
|
ensure |
|
barrier.stop |
|
end |
|
end |
|
|
|
def append(post:, to:) |
|
puts "Processing #{post[:metadata]['published_at']} - #{post[:metadata]['title']} @ #{post[:cursor]}" |
|
post = embedding_service.call(post: post) |
|
to.puts(JSON.dump(post)) if post |
|
end |
|
end |
|
|
|
openai_access_token = ENV['OPENAI_ACCESS_TOKEN'] |
|
raise ArgumentError, 'OpenAI access token is required' unless openai_access_token |
|
|
|
model = ENV.fetch('OPENAI_MODEL', 'text-embedding-3-small') |
|
concurrency = ENV.fetch('CONCURRENCY', 5).to_i |
|
source = ENV.fetch('SOURCE', 'posts/**/*.json') |
|
destination = ENV.fetch('OUTPUT', 'output.ndjson') |
|
|
|
client = OpenAI::Client.new(access_token: openai_access_token) |
|
splitter = Baran::MarkdownSplitter.new |
|
|
|
chunk_service = ChunkService.new(splitter: splitter) |
|
embedding_service = PostEmbeddingService.new(openai: client, model: model) |
|
|
|
use_case = EmbeddingUseCase.new( |
|
chunk_service: chunk_service, |
|
embedding_service: embedding_service |
|
) |
|
|
|
use_case.execute(paths: source, |
|
dest: destination, |
|
concurrency: concurrency) |