Skip to content

Instantly share code, notes, and snippets.

@sp71
Created January 30, 2025 02:33
Show Gist options
  • Save sp71/c42eb6f72078eebd30c312ebebaeb43f to your computer and use it in GitHub Desktop.
Save sp71/c42eb6f72078eebd30c312ebebaeb43f to your computer and use it in GitHub Desktop.
FileChunker efficiently writes large datasets to multiple files in chunks, supporting plain text and gzip formats.
/*
FileChunker efficiently writes large datasets to multiple files in chunks, supporting plain text and gzip formats.
The FileChunker validates each line of input data, writes it to the current file, and manages file rotation based on
specified size limits. It operates asynchronously, processing input data from a buffered channel and ensuring that
data is flushed to disk in a non-blocking manner. This is particularly useful for handling large datasets that need
to be split into manageable file sizes for storage or further processing.
*/
package ciem
import (
"bufio"
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"os"
"log/slog"
)
type FileWriter interface {
io.WriteCloser
Flush() error
Extension() string
NewWriter(w io.Writer)
}
type GzipWriter struct {
*gzip.Writer
}
func (w *GzipWriter) Extension() string {
return ".json.gz"
}
func (w *GzipWriter) NewWriter(wr io.Writer) {
w.Writer = gzip.NewWriter(wr)
}
type BufferedWriter struct {
*bufio.Writer
}
func (b *BufferedWriter) Extension() string {
return ".json"
}
func (b *BufferedWriter) Close() error {
return b.Writer.Flush()
}
func (b *BufferedWriter) NewWriter(wr io.Writer) {
b.Writer = bufio.NewWriter(wr)
}
type FileChunker struct {
Files []*os.File
LinesSkipped int
LinesCount int
flushSize int
fileSize int64
files []*os.File
bytesWritten int
writer FileWriter
currentFile *os.File
filename string
lineValidator LineValidator
inputCh chan any
doneCh chan struct{}
outputDir string
}
type LineValidator func(line []byte) bool
// NewFileChunker initializes a FileChunker with a buffered channel
func NewFileChunker(filename string, lineValidator LineValidator, writer FileWriter, outputDir string, flushSize int, maxFileSize int64) *FileChunker {
c := &FileChunker{
filename: filename,
lineValidator: lineValidator,
inputCh: make(chan any, 1000),
doneCh: make(chan struct{}),
flushSize: flushSize,
fileSize: maxFileSize,
writer: writer,
outputDir: outputDir,
}
go c.stream()
return c
}
func (c *FileChunker) Send(input any) {
c.inputCh <- input
}
func (c *FileChunker) Close() {
close(c.inputCh)
<-c.doneCh // Wait for c.channel to finish processing all input
}
func (c *FileChunker) stream() {
buf := bytes.NewBuffer(nil)
for input := range c.inputCh {
buf.Reset()
if err := json.NewEncoder(buf).Encode(input); err != nil {
slog.Error("Error while converting the json", "input", input, "err", err)
continue
}
if err := c.writeLine(buf.Bytes()); err != nil {
slog.Error("Error writing line", "line", buf.Bytes(), "err", err)
continue
}
}
c.closeFile()
c.Files = c.files
close(c.doneCh)
}
func (c *FileChunker) writeLine(line []byte) error {
if !c.lineValidator(line) {
c.LinesSkipped++
return nil
}
if c.currentFile == nil {
c.newFile()
}
n, err := c.writer.Write(line)
if err != nil {
c.LinesSkipped++
return err
}
// for gzipwriter, n is not the true size of the compressed line but just len(line)
// for bufferedwriter, n is the true size of the line
c.bytesWritten += n
c.LinesCount++
if c.bytesWritten < c.flushSize {
return nil
}
// flush threshold reached so lets flush data to file and check true compressed file size
c.bytesWritten = 0
c.writer.Flush()
currentSize, err := c.currentFile.Stat()
if err != nil {
return err
}
if currentSize.Size() >= c.fileSize {
c.closeFile()
}
return nil
}
func (c *FileChunker) closeFile() {
if c.currentFile == nil {
return
}
c.writer.Close()
c.currentFile.Close()
c.currentFile = nil
}
func (c *FileChunker) newFile() {
c.bytesWritten = 0
name := fmt.Sprintf("%s_*-part%d%s", c.filename, len(c.files)+1, c.writer.Extension())
file, err := os.CreateTemp(c.outputDir, name)
if err != nil {
slog.Error("Cannot create new file", "err", err, "fileName", name)
return
}
c.files = append(c.files, file)
c.writer.NewWriter(file)
c.currentFile = file
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment