615 lines
16 KiB
Go
615 lines
16 KiB
Go
/**
|
|
* Reed-Solomon Coding over 8-bit values.
|
|
*
|
|
* Copyright 2015, Klaus Post
|
|
* Copyright 2015, Backblaze, Inc.
|
|
*/
|
|
|
|
package reedsolomon
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
)
|
|
|
|
// StreamEncoder is an interface to encode Reed-Salomon parity sets for your data.
|
|
// It provides a fully streaming interface, and processes data in blocks of up to 4MB.
|
|
//
|
|
// For small shard sizes, 10MB and below, it is recommended to use the in-memory interface,
|
|
// since the streaming interface has a start up overhead.
|
|
//
|
|
// For all operations, no readers and writers should not assume any order/size of
|
|
// individual reads/writes.
|
|
//
|
|
// For usage examples, see "stream-encoder.go" and "streamdecoder.go" in the examples
|
|
// folder.
|
|
type StreamEncoder interface {
|
|
// Encode parity shards for a set of data shards.
|
|
//
|
|
// Input is 'shards' containing readers for data shards followed by parity shards
|
|
// io.Writer.
|
|
//
|
|
// The number of shards must match the number given to NewStream().
|
|
//
|
|
// Each reader must supply the same number of bytes.
|
|
//
|
|
// The parity shards will be written to the writer.
|
|
// The number of bytes written will match the input size.
|
|
//
|
|
// If a data stream returns an error, a StreamReadError type error
|
|
// will be returned. If a parity writer returns an error, a
|
|
// StreamWriteError will be returned.
|
|
Encode(data []io.Reader, parity []io.Writer) error
|
|
|
|
// Verify returns true if the parity shards contain correct data.
|
|
//
|
|
// The number of shards must match the number total data+parity shards
|
|
// given to NewStream().
|
|
//
|
|
// Each reader must supply the same number of bytes.
|
|
// If a shard stream returns an error, a StreamReadError type error
|
|
// will be returned.
|
|
Verify(shards []io.Reader) (bool, error)
|
|
|
|
// Reconstruct will recreate the missing shards if possible.
|
|
//
|
|
// Given a list of valid shards (to read) and invalid shards (to write)
|
|
//
|
|
// You indicate that a shard is missing by setting it to nil in the 'valid'
|
|
// slice and at the same time setting a non-nil writer in "fill".
|
|
// An index cannot contain both non-nil 'valid' and 'fill' entry.
|
|
// If both are provided 'ErrReconstructMismatch' is returned.
|
|
//
|
|
// If there are too few shards to reconstruct the missing
|
|
// ones, ErrTooFewShards will be returned.
|
|
//
|
|
// The reconstructed shard set is complete, but integrity is not verified.
|
|
// Use the Verify function to check if data set is ok.
|
|
Reconstruct(valid []io.Reader, fill []io.Writer) error
|
|
|
|
// Split a an input stream into the number of shards given to the encoder.
|
|
//
|
|
// The data will be split into equally sized shards.
|
|
// If the data size isn't dividable by the number of shards,
|
|
// the last shard will contain extra zeros.
|
|
//
|
|
// You must supply the total size of your input.
|
|
// 'ErrShortData' will be returned if it is unable to retrieve the
|
|
// number of bytes indicated.
|
|
Split(data io.Reader, dst []io.Writer, size int64) (err error)
|
|
|
|
// Join the shards and write the data segment to dst.
|
|
//
|
|
// Only the data shards are considered.
|
|
//
|
|
// You must supply the exact output size you want.
|
|
// If there are to few shards given, ErrTooFewShards will be returned.
|
|
// If the total data size is less than outSize, ErrShortData will be returned.
|
|
Join(dst io.Writer, shards []io.Reader, outSize int64) error
|
|
}
|
|
|
|
// StreamReadError is returned when a read error is encountered
|
|
// that relates to a supplied stream.
|
|
// This will allow you to find out which reader has failed.
|
|
type StreamReadError struct {
|
|
Err error // The error
|
|
Stream int // The stream number on which the error occurred
|
|
}
|
|
|
|
// Error returns the error as a string
|
|
func (s StreamReadError) Error() string {
|
|
return fmt.Sprintf("error reading stream %d: %s", s.Stream, s.Err)
|
|
}
|
|
|
|
// String returns the error as a string
|
|
func (s StreamReadError) String() string {
|
|
return s.Error()
|
|
}
|
|
|
|
// StreamWriteError is returned when a write error is encountered
|
|
// that relates to a supplied stream. This will allow you to
|
|
// find out which reader has failed.
|
|
type StreamWriteError struct {
|
|
Err error // The error
|
|
Stream int // The stream number on which the error occurred
|
|
}
|
|
|
|
// Error returns the error as a string
|
|
func (s StreamWriteError) Error() string {
|
|
return fmt.Sprintf("error writing stream %d: %s", s.Stream, s.Err)
|
|
}
|
|
|
|
// String returns the error as a string
|
|
func (s StreamWriteError) String() string {
|
|
return s.Error()
|
|
}
|
|
|
|
// rsStream contains a matrix for a specific
|
|
// distribution of datashards and parity shards.
|
|
// Construct if using NewStream()
|
|
type rsStream struct {
|
|
r *reedSolomon
|
|
o options
|
|
|
|
// Shard reader
|
|
readShards func(dst [][]byte, in []io.Reader) error
|
|
// Shard writer
|
|
writeShards func(out []io.Writer, in [][]byte) error
|
|
|
|
blockPool sync.Pool
|
|
}
|
|
|
|
// NewStream creates a new encoder and initializes it to
|
|
// the number of data shards and parity shards that
|
|
// you want to use. You can reuse this encoder.
|
|
// Note that the maximum number of data shards is 256.
|
|
func NewStream(dataShards, parityShards int, o ...Option) (StreamEncoder, error) {
|
|
if dataShards+parityShards > 256 {
|
|
return nil, ErrMaxShardNum
|
|
}
|
|
|
|
r := rsStream{o: defaultOptions}
|
|
for _, opt := range o {
|
|
opt(&r.o)
|
|
}
|
|
// Override block size if shard size is set.
|
|
if r.o.streamBS == 0 && r.o.shardSize > 0 {
|
|
r.o.streamBS = r.o.shardSize
|
|
}
|
|
if r.o.streamBS <= 0 {
|
|
r.o.streamBS = 4 << 20
|
|
}
|
|
if r.o.shardSize == 0 && r.o.maxGoroutines == defaultOptions.maxGoroutines {
|
|
o = append(o, WithAutoGoroutines(r.o.streamBS))
|
|
}
|
|
|
|
enc, err := New(dataShards, parityShards, o...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
r.r = enc.(*reedSolomon)
|
|
|
|
r.blockPool.New = func() interface{} {
|
|
return AllocAligned(dataShards+parityShards, r.o.streamBS)
|
|
}
|
|
r.readShards = readShards
|
|
r.writeShards = writeShards
|
|
if r.o.concReads {
|
|
r.readShards = cReadShards
|
|
}
|
|
if r.o.concWrites {
|
|
r.writeShards = cWriteShards
|
|
}
|
|
|
|
return &r, err
|
|
}
|
|
|
|
// NewStreamC creates a new encoder and initializes it to
|
|
// the number of data shards and parity shards given.
|
|
//
|
|
// This functions as 'NewStream', but allows you to enable CONCURRENT reads and writes.
|
|
func NewStreamC(dataShards, parityShards int, conReads, conWrites bool, o ...Option) (StreamEncoder, error) {
|
|
return NewStream(dataShards, parityShards, append(o, WithConcurrentStreamReads(conReads), WithConcurrentStreamWrites(conWrites))...)
|
|
}
|
|
|
|
func (r *rsStream) createSlice() [][]byte {
|
|
out := r.blockPool.Get().([][]byte)
|
|
for i := range out {
|
|
out[i] = out[i][:r.o.streamBS]
|
|
}
|
|
return out
|
|
}
|
|
|
|
// Encodes parity shards for a set of data shards.
|
|
//
|
|
// Input is 'shards' containing readers for data shards followed by parity shards
|
|
// io.Writer.
|
|
//
|
|
// The number of shards must match the number given to NewStream().
|
|
//
|
|
// Each reader must supply the same number of bytes.
|
|
//
|
|
// The parity shards will be written to the writer.
|
|
// The number of bytes written will match the input size.
|
|
//
|
|
// If a data stream returns an error, a StreamReadError type error
|
|
// will be returned. If a parity writer returns an error, a
|
|
// StreamWriteError will be returned.
|
|
func (r *rsStream) Encode(data []io.Reader, parity []io.Writer) error {
|
|
if len(data) != r.r.dataShards {
|
|
return ErrTooFewShards
|
|
}
|
|
|
|
if len(parity) != r.r.parityShards {
|
|
return ErrTooFewShards
|
|
}
|
|
|
|
all := r.createSlice()
|
|
defer r.blockPool.Put(all)
|
|
in := all[:r.r.dataShards]
|
|
out := all[r.r.dataShards:]
|
|
read := 0
|
|
|
|
for {
|
|
err := r.readShards(in, data)
|
|
switch err {
|
|
case nil:
|
|
case io.EOF:
|
|
if read == 0 {
|
|
return ErrShardNoData
|
|
}
|
|
return nil
|
|
default:
|
|
return err
|
|
}
|
|
out = trimShards(out, shardSize(in))
|
|
read += shardSize(in)
|
|
err = r.r.Encode(all)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = r.writeShards(parity, out)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Trim the shards so they are all the same size
|
|
func trimShards(in [][]byte, size int) [][]byte {
|
|
for i := range in {
|
|
if len(in[i]) != 0 {
|
|
in[i] = in[i][0:size]
|
|
}
|
|
if len(in[i]) < size {
|
|
in[i] = in[i][:0]
|
|
}
|
|
}
|
|
return in
|
|
}
|
|
|
|
func readShards(dst [][]byte, in []io.Reader) error {
|
|
if len(in) != len(dst) {
|
|
panic("internal error: in and dst size do not match")
|
|
}
|
|
size := -1
|
|
for i := range in {
|
|
if in[i] == nil {
|
|
dst[i] = dst[i][:0]
|
|
continue
|
|
}
|
|
n, err := io.ReadFull(in[i], dst[i])
|
|
// The error is EOF only if no bytes were read.
|
|
// If an EOF happens after reading some but not all the bytes,
|
|
// ReadFull returns ErrUnexpectedEOF.
|
|
switch err {
|
|
case io.ErrUnexpectedEOF, io.EOF:
|
|
if size < 0 {
|
|
size = n
|
|
} else if n != size {
|
|
// Shard sizes must match.
|
|
return ErrShardSize
|
|
}
|
|
dst[i] = dst[i][0:n]
|
|
case nil:
|
|
continue
|
|
default:
|
|
return StreamReadError{Err: err, Stream: i}
|
|
}
|
|
}
|
|
if size == 0 {
|
|
return io.EOF
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func writeShards(out []io.Writer, in [][]byte) error {
|
|
if len(out) != len(in) {
|
|
panic("internal error: in and out size do not match")
|
|
}
|
|
for i := range in {
|
|
if out[i] == nil {
|
|
continue
|
|
}
|
|
n, err := out[i].Write(in[i])
|
|
if err != nil {
|
|
return StreamWriteError{Err: err, Stream: i}
|
|
}
|
|
//
|
|
if n != len(in[i]) {
|
|
return StreamWriteError{Err: io.ErrShortWrite, Stream: i}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type readResult struct {
|
|
n int
|
|
size int
|
|
err error
|
|
}
|
|
|
|
// cReadShards reads shards concurrently
|
|
func cReadShards(dst [][]byte, in []io.Reader) error {
|
|
if len(in) != len(dst) {
|
|
panic("internal error: in and dst size do not match")
|
|
}
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(in))
|
|
res := make(chan readResult, len(in))
|
|
for i := range in {
|
|
if in[i] == nil {
|
|
dst[i] = dst[i][:0]
|
|
wg.Done()
|
|
continue
|
|
}
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
n, err := io.ReadFull(in[i], dst[i])
|
|
// The error is EOF only if no bytes were read.
|
|
// If an EOF happens after reading some but not all the bytes,
|
|
// ReadFull returns ErrUnexpectedEOF.
|
|
res <- readResult{size: n, err: err, n: i}
|
|
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
close(res)
|
|
size := -1
|
|
for r := range res {
|
|
switch r.err {
|
|
case io.ErrUnexpectedEOF, io.EOF:
|
|
if size < 0 {
|
|
size = r.size
|
|
} else if r.size != size {
|
|
// Shard sizes must match.
|
|
return ErrShardSize
|
|
}
|
|
dst[r.n] = dst[r.n][0:r.size]
|
|
case nil:
|
|
default:
|
|
return StreamReadError{Err: r.err, Stream: r.n}
|
|
}
|
|
}
|
|
if size == 0 {
|
|
return io.EOF
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// cWriteShards writes shards concurrently
|
|
func cWriteShards(out []io.Writer, in [][]byte) error {
|
|
if len(out) != len(in) {
|
|
panic("internal error: in and out size do not match")
|
|
}
|
|
var errs = make(chan error, len(out))
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(out))
|
|
for i := range in {
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
if out[i] == nil {
|
|
errs <- nil
|
|
return
|
|
}
|
|
n, err := out[i].Write(in[i])
|
|
if err != nil {
|
|
errs <- StreamWriteError{Err: err, Stream: i}
|
|
return
|
|
}
|
|
if n != len(in[i]) {
|
|
errs <- StreamWriteError{Err: io.ErrShortWrite, Stream: i}
|
|
}
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
close(errs)
|
|
for err := range errs {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Verify returns true if the parity shards contain correct data.
|
|
//
|
|
// The number of shards must match the number total data+parity shards
|
|
// given to NewStream().
|
|
//
|
|
// Each reader must supply the same number of bytes.
|
|
// If a shard stream returns an error, a StreamReadError type error
|
|
// will be returned.
|
|
func (r *rsStream) Verify(shards []io.Reader) (bool, error) {
|
|
if len(shards) != r.r.totalShards {
|
|
return false, ErrTooFewShards
|
|
}
|
|
|
|
read := 0
|
|
all := r.createSlice()
|
|
defer r.blockPool.Put(all)
|
|
for {
|
|
err := r.readShards(all, shards)
|
|
if err == io.EOF {
|
|
if read == 0 {
|
|
return false, ErrShardNoData
|
|
}
|
|
return true, nil
|
|
}
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
read += shardSize(all)
|
|
ok, err := r.r.Verify(all)
|
|
if !ok || err != nil {
|
|
return ok, err
|
|
}
|
|
}
|
|
}
|
|
|
|
// ErrReconstructMismatch is returned by the StreamEncoder, if you supply
|
|
// "valid" and "fill" streams on the same index.
|
|
// Therefore it is impossible to see if you consider the shard valid
|
|
// or would like to have it reconstructed.
|
|
var ErrReconstructMismatch = errors.New("valid shards and fill shards are mutually exclusive")
|
|
|
|
// Reconstruct will recreate the missing shards if possible.
|
|
//
|
|
// Given a list of valid shards (to read) and invalid shards (to write)
|
|
//
|
|
// You indicate that a shard is missing by setting it to nil in the 'valid'
|
|
// slice and at the same time setting a non-nil writer in "fill".
|
|
// An index cannot contain both non-nil 'valid' and 'fill' entry.
|
|
//
|
|
// If there are too few shards to reconstruct the missing
|
|
// ones, ErrTooFewShards will be returned.
|
|
//
|
|
// The reconstructed shard set is complete when explicitly asked for all missing shards.
|
|
// However its integrity is not automatically verified.
|
|
// Use the Verify function to check in case the data set is complete.
|
|
func (r *rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
|
|
if len(valid) != r.r.totalShards {
|
|
return ErrTooFewShards
|
|
}
|
|
if len(fill) != r.r.totalShards {
|
|
return ErrTooFewShards
|
|
}
|
|
|
|
all := r.createSlice()
|
|
defer r.blockPool.Put(all)
|
|
reconDataOnly := true
|
|
for i := range valid {
|
|
if valid[i] != nil && fill[i] != nil {
|
|
return ErrReconstructMismatch
|
|
}
|
|
if i >= r.r.dataShards && fill[i] != nil {
|
|
reconDataOnly = false
|
|
}
|
|
}
|
|
|
|
read := 0
|
|
for {
|
|
err := r.readShards(all, valid)
|
|
if err == io.EOF {
|
|
if read == 0 {
|
|
return ErrShardNoData
|
|
}
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
read += shardSize(all)
|
|
all = trimShards(all, shardSize(all))
|
|
|
|
if reconDataOnly {
|
|
err = r.r.ReconstructData(all) // just reconstruct missing data shards
|
|
} else {
|
|
err = r.r.Reconstruct(all) // reconstruct all missing shards
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = r.writeShards(fill, all)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Join the shards and write the data segment to dst.
|
|
//
|
|
// Only the data shards are considered.
|
|
//
|
|
// You must supply the exact output size you want.
|
|
// If there are to few shards given, ErrTooFewShards will be returned.
|
|
// If the total data size is less than outSize, ErrShortData will be returned.
|
|
func (r *rsStream) Join(dst io.Writer, shards []io.Reader, outSize int64) error {
|
|
// Do we have enough shards?
|
|
if len(shards) < r.r.dataShards {
|
|
return ErrTooFewShards
|
|
}
|
|
|
|
// Trim off parity shards if any
|
|
shards = shards[:r.r.dataShards]
|
|
for i := range shards {
|
|
if shards[i] == nil {
|
|
return StreamReadError{Err: ErrShardNoData, Stream: i}
|
|
}
|
|
}
|
|
// Join all shards
|
|
src := io.MultiReader(shards...)
|
|
|
|
// Copy data to dst
|
|
n, err := io.CopyN(dst, src, outSize)
|
|
if err == io.EOF {
|
|
return ErrShortData
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if n != outSize {
|
|
return ErrShortData
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Split a an input stream into the number of shards given to the encoder.
|
|
//
|
|
// The data will be split into equally sized shards.
|
|
// If the data size isn't dividable by the number of shards,
|
|
// the last shard will contain extra zeros.
|
|
//
|
|
// You must supply the total size of your input.
|
|
// 'ErrShortData' will be returned if it is unable to retrieve the
|
|
// number of bytes indicated.
|
|
func (r *rsStream) Split(data io.Reader, dst []io.Writer, size int64) error {
|
|
if size == 0 {
|
|
return ErrShortData
|
|
}
|
|
if len(dst) != r.r.dataShards {
|
|
return ErrInvShardNum
|
|
}
|
|
|
|
for i := range dst {
|
|
if dst[i] == nil {
|
|
return StreamWriteError{Err: ErrShardNoData, Stream: i}
|
|
}
|
|
}
|
|
|
|
// Calculate number of bytes per shard.
|
|
perShard := (size + int64(r.r.dataShards) - 1) / int64(r.r.dataShards)
|
|
|
|
// Pad data to r.Shards*perShard.
|
|
paddingSize := (int64(r.r.totalShards) * perShard) - size
|
|
data = io.MultiReader(data, io.LimitReader(zeroPaddingReader{}, paddingSize))
|
|
|
|
// Split into equal-length shards and copy.
|
|
for i := range dst {
|
|
n, err := io.CopyN(dst[i], data, perShard)
|
|
if err != io.EOF && err != nil {
|
|
return err
|
|
}
|
|
if n != perShard {
|
|
return ErrShortData
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type zeroPaddingReader struct{}
|
|
|
|
var _ io.Reader = &zeroPaddingReader{}
|
|
|
|
func (t zeroPaddingReader) Read(p []byte) (n int, err error) {
|
|
n = len(p)
|
|
for i := 0; i < n; i++ {
|
|
p[i] = 0
|
|
}
|
|
return n, nil
|
|
}
|