-
-
Save owenthereal/f7288b02babb7e84add536f51ed70835 to your computer and use it in GitHub Desktop.
Pass a single io.Reader to multiple goroutines
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Package fan is a little concurrent io experiment. | |
Example Use Case | |
---------------- | |
You have a function that takes a single io.Reader as an argument. You would like | |
to pass that reader to several processing functions. You could just make the | |
function accept an io.ReadSeeker, invoke each function serially in a for loop, | |
seeking after each call. But that's not cool. | |
So for example: | |
func process(r io.ReadSeeker) error { | |
for _, fn := range fns { | |
err := fn(r) | |
if err != nil { | |
return err | |
} | |
r.Seek(0, io.SeekStart) | |
} | |
} | |
Becomes: | |
import "./fan" | |
func process(r io.Reader) error { | |
fr := fan.Reader{r} | |
for _, fn := range fns { | |
go fn(fr.View()) | |
// ... handle errors via a channel | |
} | |
} | |
Limitations | |
----------- | |
Probably quite a lot. | |
*/ | |
package fan | |
import ( | |
"bytes" | |
"io" | |
"sync" | |
) | |
// Reader wraps an io.Reader, allowing "clones" to be created via the View method. | |
type Reader struct { | |
io.Reader | |
s []byte // buffered io.Reader data | |
mux sync.Mutex // could maybe be replaced by an RWMutex | |
} | |
// View returns a new io.Reader that behaves like a copy of the original io.Reader | |
func (r *Reader) View() io.Reader { | |
var i int64 // current reading index | |
return readFunc(func(p []byte) (int, error) { | |
r.mux.Lock() | |
defer r.mux.Unlock() | |
// Declare the returned error here. It is only assigned by calls to | |
// r.Reader.Read (`if` block below). That way callers see the io.EOF | |
// error only when they reach the limit of r.s | |
var err error | |
// If the client has asked for more data than is available, we need to | |
// grow the buffer. | |
if i+int64(len(p)) > int64(len(r.s)) { | |
cp := make([]byte, len(p)) | |
var n int // don't shadow err | |
n, err = r.Reader.Read(cp) | |
r.s = append(r.s, cp[:n]...) | |
} | |
n := copy(p, r.s[i:]) | |
i += int64(n) | |
return n, err | |
}) | |
} | |
// Original returns the "original" io.Reader without any thread-safety working | |
// behind the scenes. | |
func (r *Reader) Original() io.Reader { | |
return io.MultiReader(bytes.NewReader(r.s), r.Reader) | |
} | |
// ReadFunc follows the design of http.HandlerFunc, allowing us to create io.Reader | |
// functions that can exploit closured variables | |
type readFunc func(p []byte) (n int, err error) | |
func (rf readFunc) Read(p []byte) (n int, err error) { return rf(p) } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fan | |
import ( | |
"bytes" | |
"sync" | |
"testing" | |
"github.com/drhodes/golorem" | |
) | |
// We will be reading some random lorem ipsum text. | |
var data = []byte(lorem.Paragraph(500, 600)) | |
func TestReader(t *testing.T) { | |
r := bytes.NewReader(data) | |
fr := Reader{r, // fan.Reader | |
// Because this is an internal test we need to completely intialise | |
// the struct. But note the API would look like `fan.Reader{r}` | |
[]byte(nil), *new(sync.Mutex), | |
} | |
wg := new(sync.WaitGroup) | |
// Start 10x goroutines | |
for i := 0; i < 10; i++ { | |
wg.Add(1) | |
go func(i int) { | |
defer wg.Done() | |
buf := new(bytes.Buffer) | |
n, err := buf.ReadFrom(fr.View()) | |
// Check the error | |
if err != nil { | |
t.Errorf("Goroutine %d err: %v", i, err) | |
return | |
} | |
// Validate the result | |
if int(n) != len(data) { | |
t.Errorf("Goroutine %d err: expected %d bytes, got %d", i, len(data), n) | |
return | |
} | |
t.Logf("Goroutine %d ok: got %d bytes\n", i, len(buf.Bytes())) | |
}(i) | |
} | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment