Last active
March 31, 2023 15:07
-
-
Save bored-engineer/75a1148309e845ceaf0cb2724a0e0fe5 to your computer and use it in GitHub Desktop.
[Golang] PrefixReader (io.WriteCloser) wraps an io.Reader allowing data to be written ("prefixed") to the Reader which will be returned by subsequent Read calls until Close is called at which points all reads will pass-through to the underlying reader.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// THIS IS THE CORRECT SOLUTION, NOT WHAT WAS IN THIS GIST | |
package main | |
import ( | |
"strings" | |
"io" | |
"io/ioutil" | |
"fmt" | |
) | |
// This is better/more stdlib than what I wrote: | |
func main() { | |
r := strings.NewReader("Underlying Reader") | |
pr, pw := io.Pipe() | |
go func() { | |
pw.Write([]byte("Some Prefixed Data")) | |
pw.Write([]byte("More Prefixed Data")) | |
pw.Close() | |
}() | |
b, err := ioutil.ReadAll(io.MultiReader(pr, r)) | |
if err != nil { | |
panic(err) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"strings" | |
"io/ioutil" | |
"fmt" | |
) | |
func main() { | |
r := strings.NewReader("Underlying Reader") | |
c := make(chan []byte) | |
pr := NewPrefixReader(r, c) | |
go func() { | |
c <- []byte("Some Prefixed Data\n") | |
c <- []byte("More Prefixed Data\n") | |
close(c) | |
}() | |
b, err := ioutil.ReadAll(pr) | |
if err != nil { | |
panic(err) | |
} | |
// Some Prefixed Data\nMore Prefixed Data\nUnderlying Reader | |
fmt.Printf("%s", b) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"io" | |
"sync" | |
"errors" | |
) | |
// PrefixReader implements io.ReadWriteCloser | |
type PrefixReader struct { | |
// mu syncronizes Read calls | |
mu sync.Mutex | |
// r is the underlying io.Reader to use after chan is closed | |
r io.Reader | |
// buf is the buffered data if a Read is shorter than buf | |
buf []byte | |
// c is the channel of incoming bytes to "read" | |
c chan []byte | |
} | |
// PrefixReader implements io.Closer and will always succeed | |
func (r *PrefixReader) Close() error { | |
// TODO: This panics if called more than once instead of returning error | |
close(r.c) | |
return nil | |
} | |
// PrefixReader implements io.Writer and will always succeed | |
func (r *PrefixReader) Write(data []byte) (int, error) { | |
// Per the io.Writer docs we cannot retain data so copy is needed | |
tmp := make([]byte, len(data)) | |
n := copy(tmp, data) | |
select { | |
case r.c <- tmp: | |
return n, nil | |
default: | |
return 0, errors.New("attempt to Write to closed channel") | |
} | |
} | |
// PrefixReader implements io.Reader | |
func (r *PrefixReader) Read(data []byte) (int, error) { | |
// Obtain a lock first to serialize any reads | |
r.mu.Lock() | |
defer r.mu.Unlock() | |
// Before anything, if we have buffered data to read, return that | |
// This is only safe because of the above mutex | |
if available := len(r.buf); available > 0 { | |
// If the buf we're "reading" into is smaller than available, use that instead | |
if actual := len(data); actual > available { | |
available = actual | |
} | |
// Copy that many bytes into data | |
n := copy(data, r.buf[:available]) | |
// Truncate the buffer by the amount we read and return | |
r.buf = r.buf[available:] | |
return n, nil | |
} | |
// We must wait for data from the channel | |
b, ok := <-r.c | |
// The channel was closed while we were waiting for it or was already closed | |
// pass-through to the read io.Reader implementation instead | |
if !ok { | |
if r.r == nil { | |
return 0, io.EOF | |
} | |
return r.r.Read(data) | |
} | |
// If we have more data to write than the available room, | |
// we'll have to buffer some for next the next Read call | |
if room := len(data); len(b) > room { | |
// Put the extra data into buf and truncate b so it's a safe sized copy | |
// Replacing buf here is safe as we have an exclusive lock and it was | |
// checked to be nil/0 length already in the above call | |
r.buf = b[room:] | |
b = b[:room] | |
} | |
// Copy the received bytes into data and return (length checked above) | |
return copy(data, b), nil | |
} | |
// NewPrefixReader creates a reader that "reads" from c until it is closed | |
// once c is closed it passes through all Read calls to the io.Reader in r | |
func NewPrefixReader(r io.Reader, c chan []byte) *PrefixReader { | |
return &PrefixReader{r:r, c:c} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment