Skip to content

Instantly share code, notes, and snippets.

@groboclown
Last active October 17, 2023 16:10
Show Gist options
  • Save groboclown/77784f8f807df6e1824ff3ed3a34648a to your computer and use it in GitHub Desktop.
Save groboclown/77784f8f807df6e1824ff3ed3a34648a to your computer and use it in GitHub Desktop.
Golang Concurrent Reduce when Order Matters
// Released under the Public Domain
// Where applicable, consider this CC-0
// Inspired by the Rope science documents from the Xi text editor:
// https://xi-editor.io/docs/rope_science_01.html
package main
import (
"context"
"crypto/sha256"
"fmt"
"strings"
"sync"
"testing"
"golang.org/x/exp/slices"
)
// ====================================================================
// Monoid - a mathematical structure that is a Set (S) and an
// operator (*) with these properties:
//
// - Operator is a mapping from S to S (not necessarily 1-to-1 or onto)
// over an ordered pair a, b in S.
// *(a, b) -> c
// - Associativity - *(a, *(b, c)) = *(*(a, b), c); for all a, b, c in S.
// - Identity - there exists e in S such that *(a, e) = *(e, a) = a for all a in S.
//
// Because the operator is an ordered pair and not guaranteed to be commutative
// ( *(a, b) is not necessarily the same as *(b, a) ), the operator must be applied
// against neighboring pairs.
//
// This implementation adds an extra layer on top of the values by introducing a
// position in the operator chain. Because the operator unifies positions together, the
// position is represented as a continuous span across positions.
// MonoidSpan contains the position in the chain along with the combined operator value.
//
// This structure is reused within the reduction function. It's assumed to pass ownership
// to the receiver when sent through a channel or passed as an argument.
type MonoidSpan[T any] struct {
Left int
Right int
Value T
}
// MonoidOperator represents the ordered operation for the monoid type T.
//
// This must be concurrent safe.
type MonoidOperator[T any] func(a T, b T) T
// MonoidReduce joins all adjacent monoid spans until the generator channel is closed or the context completes.
//
// All generated positions must be unique, but they do not need to completely cover the range.
// Each gap in the generated ranges generates a separate value in the returned channel.
// Only adjacent (off by one) positions will be joined into a single span.
//
// For performance reasons, this will return duplicate values. It's recommended to pass the returned
// span channel through the UniqueSpans or SortedSpans functions (one or the other, not both).
//
// Users of this function must read the returned error channel before reading from the
// value channel to prevent a deadlock.
func MonoidReduce[T any](
ctx context.Context,
generator <-chan *MonoidSpan[T],
op MonoidOperator[T],
) (<-chan *MonoidSpan[T], <-chan error) {
doneCh := make(chan *MonoidSpan[T])
errCh := make(chan error)
go func() {
spans := make(map[int]*MonoidSpan[T])
defer func() {
defer close(doneCh)
if e := recover(); e != nil {
if err, ok := e.(error); ok {
errCh <- err
} else {
errCh <- fmt.Errorf("unhandled error: %v", e)
}
}
// note: not deferring close errCh, because that can cause a deadlock.
close(errCh)
// The final pass at returning the spans just returns the values, which may include
// dupliates for the left/right positions; note however that left/right might be the same.
for _, s := range spans {
doneCh <- s
}
}()
for {
select {
case <-ctx.Done():
// Close out the returned channels in the defer function.
return
case ms, ok := <-generator:
if !ok {
// Close out the returned channels in the defer function.
return
}
// Handle joining the collected value.
preIdx := ms.Left - 1
sufIdx := ms.Right + 1
if pre, ok := spans[preIdx]; ok {
// spans[pre.Left] will be overwritten.
delete(spans, pre.Right)
ms.Left = pre.Left
ms.Value = op(pre.Value, ms.Value)
// If we were performing caching of the MonoidSpan object,
// this is where 'pre' would be released into the cache.
}
if suf, ok := spans[sufIdx]; ok {
// spans[suf.Right] will be overwritten.
delete(spans, suf.Left)
ms.Right = suf.Right
ms.Value = op(ms.Value, suf.Value)
// If we were performing caching of the MonoidSpan object,
// this is where 'suf' would be released into the cache.
}
spans[ms.Left] = ms
spans[ms.Right] = ms
}
}
}()
return doneCh, errCh
}
// UniqueSpans reads all input spans and returns channel of just unique spans.
//
// This is intended to work with the MonoidReduce to eliminate duplicates.
func UniqueSpans[T any](
source <-chan *MonoidSpan[T],
) <-chan *MonoidSpan[T] {
ret := make(chan *MonoidSpan[T])
go func() {
defer close(ret)
left := make(map[int]bool)
for span := range source {
if _, ok := left[span.Left]; ok {
// Already found.
// Note that, due to how duplicates are generated,
// this should only happen once. So we can safely delete
// the value to keep the map from blowing up in size.
delete(left, span.Left)
continue
}
left[span.Left] = true
ret <- span
// Don't care about right.
}
// deferred closing ret.
}()
return ret
}
// SortedSpans reads all input spans and returns a list of spans sorted by their position.
//
// Implicit in this function is eliminating the duplicate spans.
//
// Due to the sorting capability, the output of the values will be delayed until they are all
// available. However, sorting itself will happen as each value is collected.
func SortedSpans[T any](
source <-chan *MonoidSpan[T],
) <-chan *MonoidSpan[T] {
ret := make(chan *MonoidSpan[T])
go func() {
defer close(ret)
// Use "left" to maintain a lookup store of the spans, plus also a duplicate quick find.
left := make(map[int]*MonoidSpan[T])
// Because this runs in parallel when a new item is available, this maintains all items
// by left index pre-sorted. That way, the sort happens during processing to maximize
// parallelism, rather than delaying the sort until after all value generation and span joining
// completes.
sorted := make([]int, 0)
for span := range source {
idx := span.Left
if _, ok := left[idx]; !ok {
// Add the value into the lookup map and the sorted index list.
left[idx] = span
sorted = append(sorted, 0) // extend the slice with a dummy value (will be overwritten)
i, _ := slices.BinarySearch[[]int](sorted, idx) // find insertion point
copy(sorted[i+1:], sorted[i:]) // make room at the insertion point
sorted[i] = idx
}
}
for _, idx := range sorted {
ret <- left[idx]
}
// deferred closing ret.
}()
return ret
}
// SyncMonoidReduce runs the MonoidReduce function and returns just one joined value when complete.
//
// All disjoint spans will be joined into a single value based on their order. The 'initial' value is
// used to make the first join, plus also to act as the return value if no spans were created.
//
// Any error generated during the MonoidReduce is an indication of a panic problem, and so those errors
// are correspondingly turned into another panic in the current run context.
func SyncMonoidReduce[T any](
generator <-chan *MonoidSpan[T],
op MonoidOperator[T],
initial T,
) T {
resCh, errCh := MonoidReduce[T](context.Background(), generator, op)
finalCh := make(chan T)
go func() {
defer close(finalCh)
ret := initial
for val := range SortedSpans[T](resCh) {
ret = op(ret, val.Value)
}
finalCh <- ret
}()
for err := range errCh {
if err != nil {
panic(err)
}
}
return <-finalCh
}
// ====================================================================
// Unit Tests
// reduceList joins two lists together by appending the second after the first.
func reduceList[T any](a, b []T) []T {
ret := make([]T, len(a)+len(b))
copy(ret, a)
copy(ret[len(a):], b)
return ret
}
// TestEmpty ensures that the MonoidReduce correctly works when a generator produces nothing.
func TestEmpty(t *testing.T) {
emptyGen := make(chan *MonoidSpan[[]int])
spanCh, errCh := MonoidReduce[[]int](context.Background(), emptyGen, reduceList[int])
close(emptyGen)
for err := range errCh {
t.Error(err)
}
spans := make([]*MonoidSpan[[]int], 0)
for span := range spanCh {
spans = append(spans, span)
}
if len(spans) != 0 {
t.Errorf("Expected 0 spans, found %v", spans)
}
}
// listEquals performs a simple list count & contents equality check for integers.
func listEqual(a []int, b []int) bool {
if len(a) != len(b) {
return false
}
for p, v := range a {
if b[p] != v {
return false
}
}
return true
}
// TestOne ensures that the MonoidReduce works when just one value is generated.
func TestOne(t *testing.T) {
gen := make(chan *MonoidSpan[[]int])
spanCh, errCh := MonoidReduce[[]int](context.Background(), gen, reduceList[int])
gen <- &MonoidSpan[[]int]{
// Because left & right point to the same number, there shouldn't
// be any duplicates.
Left: 0,
Right: 0,
Value: []int{0},
}
close(gen)
for err := range errCh {
t.Error(err)
}
spans := make([]*MonoidSpan[[]int], 0)
for span := range spanCh {
spans = append(spans, span)
}
if len(spans) != 1 {
t.Errorf("Expected 1 span, found %v", spans)
}
if !listEqual(spans[0].Value, []int{0}) {
t.Errorf("Expected [0], found %v", spans[0].Value)
}
}
// TestTen checks the SyncMonoidReduce when the generator produces ten items in reverse join order.
func TestTen(t *testing.T) {
gen := make(chan *MonoidSpan[[]int])
go func() {
defer close(gen)
// Generate the spans in reverse order.
// Because the left/right match the value, the
// join will essentially be a sort operation.
for i := 9; i >= 0; i-- {
gen <- &MonoidSpan[[]int]{
Left: i * 2,
Right: (i * 2) + 1,
Value: []int{i},
}
}
}()
span := SyncMonoidReduce[[]int](gen, reduceList[int], []int{})
if !listEqual(span, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
t.Errorf("Expected [0-9], found %v", span)
}
}
// ====================================================================
// Benchmark Test
// reduceStr performs the monoid reduction for a string via concatenation.
//
// Note that there are many monoid and monad operations for a string, such as
// total string byte length or rune count.
func reduceStr(a, b string) string {
return a + b
}
// splitOnLf creates spans at each newline, and trims all leading and trailing whitespace.
func splitOnLf(p string) <-chan *MonoidSpan[string] {
r := make(chan *MonoidSpan[string], 100)
go func() {
defer close(r)
count := len(p)
start := 0
for pos := 0; pos < count; pos++ {
c := p[pos]
if c == '\n' {
// This generates trimmed, separated lines,
// but note that the Left & Right part of the
// span represents the source positions. This is extremely
// important.
r <- &MonoidSpan[string]{
Left: start,
Right: pos,
Value: strings.TrimSpace(p[start:pos]),
}
// The EOL is considered part of this span,
// so skip past this EOL when generating the next span.
start = pos + 1
}
}
r <- &MonoidSpan[string]{
Left: start,
Right: count - 1,
Value: p[start:count],
}
}()
return r
}
// sha512 transforms spans by turning the value into a sha512 hash of the contents.
func sha512(in <-chan *MonoidSpan[string]) <-chan *MonoidSpan[string] {
out := make(chan *MonoidSpan[string], 100)
go func() {
defer close(out)
var wg sync.WaitGroup
for s := range in {
s := s
wg.Add(1)
go func() {
defer wg.Done()
h := sha256.New()
s.Value = fmt.Sprintf("%x", h.Sum([]byte(s.Value)))
out <- s
}()
}
wg.Wait()
}()
return out
}
// generateParagraphs creates 'count' paragraphs each with 1-10 characters.
func generateParagraphs(count int) string {
ret := ""
for i := 0; i < count; i++ {
for j := 0; j <= (i % 10); j++ {
ret += fmt.Sprintf("%d", j)
}
ret += "\n"
}
return ret
}
// BenchmarkShaSplit times the SHA512 generation of each paragraph, then joins the SHA512 by string concatenation.
//
// More than anything, this tests the ability of the algorithm to take advantage of all available processors.
func BenchmarkShaSplit(b *testing.B) {
source := generateParagraphs(10000)
b.StartTimer()
SyncMonoidReduce[string](
sha512(splitOnLf(source)),
reduceStr,
"",
)
b.StopTimer()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment