From 12240baf61e251a80e3759dd22ca5847dcb8d807 Mon Sep 17 00:00:00 2001 From: Elad Date: Tue, 30 Apr 2019 13:10:57 +0900 Subject: [PATCH] swarm/chunk: add tags data type * swarm/chunk: add tags backend to chunk package --- swarm/chunk/chunk.go | 16 +++ swarm/chunk/tag.go | 218 ++++++++++++++++++++++++++++++++ swarm/chunk/tag_test.go | 273 ++++++++++++++++++++++++++++++++++++++++ swarm/chunk/tags.go | 80 ++++++++++++ swarm/sctx/sctx.go | 17 +++ 5 files changed, 604 insertions(+) create mode 100644 swarm/chunk/tag.go create mode 100644 swarm/chunk/tag_test.go create mode 100644 swarm/chunk/tags.go diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go index 9ae59c95f..17f49348b 100644 --- a/swarm/chunk/chunk.go +++ b/swarm/chunk/chunk.go @@ -1,3 +1,19 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + package chunk import ( diff --git a/swarm/chunk/tag.go b/swarm/chunk/tag.go new file mode 100644 index 000000000..359ac11ac --- /dev/null +++ b/swarm/chunk/tag.go @@ -0,0 +1,218 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package chunk + +import ( + "encoding/binary" + "errors" + "sync/atomic" + "time" +) + +var ( + errExists = errors.New("already exists") + errNA = errors.New("not available yet") + errNoETA = errors.New("unable to calculate ETA") + errTagNotFound = errors.New("tag not found") +) + +// State is the enum type for chunk states +type State = uint32 + +const ( + SPLIT State = iota // chunk has been processed by filehasher/swarm safe call + STORED // chunk stored locally + SEEN // chunk previously seen + SENT // chunk sent to neighbourhood + SYNCED // proof is received; chunk removed from sync db; chunk is available everywhere +) + +// Tag represents info on the status of new chunks +type Tag struct { + Uid uint32 // a unique identifier for this tag + Name string // a name tag for this tag + Address Address // the associated swarm hash for this tag + total uint32 // total chunks belonging to a tag + split uint32 // number of chunks already processed by splitter for hashing + seen uint32 // number of chunks already seen + stored uint32 // number of chunks already stored locally + sent uint32 // number of chunks sent for push syncing + synced uint32 // number of chunks synced with proof + startedAt time.Time // tag started to calculate ETA +} + +// New creates a new tag, stores it by the name and returns it +// it returns an error if the tag with this name already exists +func NewTag(uid uint32, s string, total uint32) *Tag { + t := &Tag{ + Uid: uid, + Name: s, + startedAt: time.Now(), + total: total, + } + return t +} + +// Inc increments the count for a state +func (t *Tag) Inc(state State) { + var v *uint32 + switch state { + case SPLIT: + v = &t.split + case STORED: + v = &t.stored + case SEEN: + v = &t.seen + case SENT: + v = &t.sent + case SYNCED: + v = &t.synced + } + atomic.AddUint32(v, 1) +} + +// Get returns the count for a state on a tag +func (t *Tag) Get(state State) int { + var v *uint32 + switch state { + case SPLIT: + v = &t.split + case STORED: + v = &t.stored + case SEEN: + v = &t.seen + case SENT: + v = &t.sent + case SYNCED: + v = &t.synced + } + return int(atomic.LoadUint32(v)) +} + +// GetTotal returns the total count +func (t *Tag) Total() int { + return int(atomic.LoadUint32(&t.total)) +} + +// DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag +// is meant to be called when splitter finishes for input streams of unknown size +func (t *Tag) DoneSplit(address Address) int { + total := atomic.LoadUint32(&t.split) + atomic.StoreUint32(&t.total, total) + t.Address = address + return int(total) +} + +// Status returns the value of state and the total count +func (t *Tag) Status(state State) (int, int, error) { + count, seen, total := t.Get(state), int(atomic.LoadUint32(&t.seen)), int(atomic.LoadUint32(&t.total)) + if total == 0 { + return count, total, errNA + } + switch state { + case SPLIT, STORED, SEEN: + return count, total, nil + case SENT, SYNCED: + stored := int(atomic.LoadUint32(&t.stored)) + if stored < total { + return count, total - seen, errNA + } + return count, total - seen, nil + } + return count, total, errNA +} + +// ETA returns the time of completion estimated based on time passed and rate of completion +func (t *Tag) ETA(state State) (time.Time, error) { + cnt, total, err := t.Status(state) + if err != nil { + return time.Time{}, err + } + if cnt == 0 || total == 0 { + return time.Time{}, errNoETA + } + diff := time.Since(t.startedAt) + dur := time.Duration(total) * diff / time.Duration(cnt) + return t.startedAt.Add(dur), nil +} + +// MarshalBinary marshals the tag into a byte slice +func (tag *Tag) MarshalBinary() (data []byte, err error) { + buffer := make([]byte, 0) + encodeUint32Append(&buffer, tag.Uid) + encodeUint32Append(&buffer, tag.total) + encodeUint32Append(&buffer, tag.split) + encodeUint32Append(&buffer, tag.seen) + encodeUint32Append(&buffer, tag.stored) + encodeUint32Append(&buffer, tag.sent) + encodeUint32Append(&buffer, tag.synced) + + intBuffer := make([]byte, 8) + + n := binary.PutVarint(intBuffer, tag.startedAt.Unix()) + buffer = append(buffer, intBuffer[:n]...) + + n = binary.PutVarint(intBuffer, int64(len(tag.Address))) + buffer = append(buffer, intBuffer[:n]...) + + buffer = append(buffer, tag.Address[:]...) + + buffer = append(buffer, []byte(tag.Name)...) + + return buffer, nil +} + +// UnmarshalBinary unmarshals a byte slice into a tag +func (tag *Tag) UnmarshalBinary(buffer []byte) error { + if len(buffer) < 13 { + return errors.New("buffer too short") + } + + tag.Uid = decodeUint32Splice(&buffer) + tag.total = decodeUint32Splice(&buffer) + tag.split = decodeUint32Splice(&buffer) + tag.seen = decodeUint32Splice(&buffer) + tag.stored = decodeUint32Splice(&buffer) + tag.sent = decodeUint32Splice(&buffer) + tag.synced = decodeUint32Splice(&buffer) + + t, n := binary.Varint(buffer) + tag.startedAt = time.Unix(t, 0) + buffer = buffer[n:] + + t, n = binary.Varint(buffer) + buffer = buffer[n:] + if t > 0 { + tag.Address = buffer[:t] + } + tag.Name = string(buffer[t:]) + + return nil + +} + +func encodeUint32Append(buffer *[]byte, val uint32) { + intBuffer := make([]byte, 4) + binary.BigEndian.PutUint32(intBuffer, val) + *buffer = append(*buffer, intBuffer...) +} + +func decodeUint32Splice(buffer *[]byte) uint32 { + val := binary.BigEndian.Uint32((*buffer)[:4]) + *buffer = (*buffer)[4:] + return val +} diff --git a/swarm/chunk/tag_test.go b/swarm/chunk/tag_test.go new file mode 100644 index 000000000..b3f3be2ca --- /dev/null +++ b/swarm/chunk/tag_test.go @@ -0,0 +1,273 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package chunk + +import ( + "bytes" + "sync" + "testing" + "time" +) + +var ( + allStates = []State{SPLIT, STORED, SEEN, SENT, SYNCED} +) + +// TestTagSingleIncrements tests if Inc increments the tag state value +func TestTagSingleIncrements(t *testing.T) { + tg := &Tag{total: 10} + + tc := []struct { + state uint32 + inc int + expcount int + exptotal int + }{ + {state: SPLIT, inc: 10, expcount: 10, exptotal: 10}, + {state: STORED, inc: 9, expcount: 9, exptotal: 9}, + {state: SEEN, inc: 1, expcount: 1, exptotal: 10}, + {state: SENT, inc: 9, expcount: 9, exptotal: 9}, + {state: SYNCED, inc: 9, expcount: 9, exptotal: 9}, + } + + for _, tc := range tc { + for i := 0; i < tc.inc; i++ { + tg.Inc(tc.state) + } + } + + for _, tc := range tc { + if tg.Get(tc.state) != tc.expcount { + t.Fatalf("not incremented") + } + } +} + +// TestTagStatus is a unit test to cover Tag.Status method functionality +func TestTagStatus(t *testing.T) { + tg := &Tag{total: 10} + tg.Inc(SEEN) + tg.Inc(SENT) + tg.Inc(SYNCED) + + for i := 0; i < 10; i++ { + tg.Inc(SPLIT) + tg.Inc(STORED) + } + for _, v := range []struct { + state State + expVal int + expTotal int + }{ + {state: STORED, expVal: 10, expTotal: 10}, + {state: SPLIT, expVal: 10, expTotal: 10}, + {state: SEEN, expVal: 1, expTotal: 10}, + {state: SENT, expVal: 1, expTotal: 9}, + {state: SYNCED, expVal: 1, expTotal: 9}, + } { + val, total, err := tg.Status(v.state) + if err != nil { + t.Fatal(err) + } + if val != v.expVal { + t.Fatalf("should be %d, got %d", v.expVal, val) + } + if total != v.expTotal { + t.Fatalf("expected total to be %d, got %d", v.expTotal, total) + } + } +} + +// tests ETA is precise +func TestTagETA(t *testing.T) { + now := time.Now() + maxDiff := 100000 // 100 microsecond + tg := &Tag{total: 10, startedAt: now} + time.Sleep(100 * time.Millisecond) + tg.Inc(SPLIT) + eta, err := tg.ETA(SPLIT) + if err != nil { + t.Fatal(err) + } + diff := time.Until(eta) - 9*time.Since(now) + if int(diff) > maxDiff { + t.Fatalf("ETA is not precise, got diff %v > .1ms", diff) + } +} + +// TestTagConcurrentIncrements tests Inc calls concurrently +func TestTagConcurrentIncrements(t *testing.T) { + tg := &Tag{} + n := 1000 + wg := sync.WaitGroup{} + wg.Add(5 * n) + for _, f := range allStates { + go func(f State) { + for j := 0; j < n; j++ { + go func() { + tg.Inc(f) + wg.Done() + }() + } + }(f) + } + wg.Wait() + for _, f := range allStates { + v := tg.Get(f) + if v != n { + t.Fatalf("expected state %v to be %v, got %v", f, n, v) + } + } +} + +// TestTagsMultipleConcurrentIncrements tests Inc calls concurrently +func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) { + ts := NewTags() + n := 100 + wg := sync.WaitGroup{} + wg.Add(10 * 5 * n) + for i := 0; i < 10; i++ { + s := string([]byte{uint8(i)}) + tag, err := ts.New(s, n) + if err != nil { + t.Fatal(err) + } + for _, f := range allStates { + go func(tag *Tag, f State) { + for j := 0; j < n; j++ { + go func() { + tag.Inc(f) + wg.Done() + }() + } + }(tag, f) + } + } + wg.Wait() + i := 0 + ts.Range(func(k, v interface{}) bool { + i++ + uid := k.(uint32) + for _, f := range allStates { + tag, err := ts.Get(uid) + if err != nil { + t.Fatal(err) + } + stateVal := tag.Get(f) + if stateVal != n { + t.Fatalf("expected tag %v state %v to be %v, got %v", uid, f, n, v) + } + } + return true + + }) + if i != 10 { + t.Fatal("not enough tagz") + } +} + +// TestMarshallingWithAddr tests that marshalling and unmarshalling is done correctly when the +// tag Address (byte slice) contains some arbitrary value +func TestMarshallingWithAddr(t *testing.T) { + tg := NewTag(111, "test/tag", 10) + tg.Address = []byte{0, 1, 2, 3, 4, 5, 6} + + for _, f := range allStates { + tg.Inc(f) + } + + b, err := tg.MarshalBinary() + if err != nil { + t.Fatal(err) + } + + unmarshalledTag := &Tag{} + err = unmarshalledTag.UnmarshalBinary(b) + if err != nil { + t.Fatal(err) + } + + if unmarshalledTag.Uid != tg.Uid { + t.Fatalf("tag uids not equal. want %d got %d", tg.Uid, unmarshalledTag.Uid) + } + + if unmarshalledTag.Name != tg.Name { + t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name) + } + + for _, state := range allStates { + uv, tv := unmarshalledTag.Get(state), tg.Get(state) + if uv != tv { + t.Fatalf("state %d inconsistent. expected %d to equal %d", state, uv, tv) + } + } + + if unmarshalledTag.Total() != tg.Total() { + t.Fatalf("tag names not equal. want %d got %d", tg.Total(), unmarshalledTag.Total()) + } + + if len(unmarshalledTag.Address) != len(tg.Address) { + t.Fatalf("tag addresses length mismatch, want %d, got %d", len(tg.Address), len(unmarshalledTag.Address)) + } + + if !bytes.Equal(unmarshalledTag.Address, tg.Address) { + t.Fatalf("expected tag address to be %v got %v", unmarshalledTag.Address, tg.Address) + } +} + +// TestMarshallingNoAddress tests that marshalling and unmarshalling is done correctly +// when the tag Address (byte slice) is empty in this case +func TestMarshallingNoAddr(t *testing.T) { + tg := NewTag(111, "test/tag", 10) + for _, f := range allStates { + tg.Inc(f) + } + + b, err := tg.MarshalBinary() + if err != nil { + t.Fatal(err) + } + + unmarshalledTag := &Tag{} + err = unmarshalledTag.UnmarshalBinary(b) + if err != nil { + t.Fatal(err) + } + + if unmarshalledTag.Uid != tg.Uid { + t.Fatalf("tag uids not equal. want %d got %d", tg.Uid, unmarshalledTag.Uid) + } + + if unmarshalledTag.Name != tg.Name { + t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name) + } + + for _, state := range allStates { + uv, tv := unmarshalledTag.Get(state), tg.Get(state) + if uv != tv { + t.Fatalf("state %d inconsistent. expected %d to equal %d", state, uv, tv) + } + } + + if unmarshalledTag.Total() != tg.Total() { + t.Fatalf("tag names not equal. want %d got %d", tg.Total(), unmarshalledTag.Total()) + } + + if len(unmarshalledTag.Address) != len(tg.Address) { + t.Fatalf("expected tag addresses to be equal length") + } +} diff --git a/swarm/chunk/tags.go b/swarm/chunk/tags.go new file mode 100644 index 000000000..07f9b8cd7 --- /dev/null +++ b/swarm/chunk/tags.go @@ -0,0 +1,80 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package chunk + +import ( + "context" + "errors" + "math/rand" + "sync" + "time" + + "github.com/ethereum/go-ethereum/swarm/sctx" +) + +// Tags hold tag information indexed by a unique random uint32 +type Tags struct { + tags *sync.Map + rng *rand.Rand +} + +// NewTags creates a tags object +func NewTags() *Tags { + return &Tags{ + tags: &sync.Map{}, + rng: rand.New(rand.NewSource(time.Now().Unix())), + } +} + +// New creates a new tag, stores it by the name and returns it +// it returns an error if the tag with this name already exists +func (ts *Tags) New(s string, total int) (*Tag, error) { + t := &Tag{ + Uid: ts.rng.Uint32(), + Name: s, + startedAt: time.Now(), + total: uint32(total), + } + if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded { + return nil, errExists + } + return t, nil +} + +// Get returns the undelying tag for the uid or an error if not found +func (ts *Tags) Get(uid uint32) (*Tag, error) { + t, ok := ts.tags.Load(uid) + if !ok { + return nil, errors.New("tag not found") + } + return t.(*Tag), nil +} + +// GetContext gets a tag from the tag uid stored in the context +func (ts *Tags) GetContext(ctx context.Context) (*Tag, error) { + uid := sctx.GetTag(ctx) + t, ok := ts.tags.Load(uid) + if !ok { + return nil, errTagNotFound + } + return t.(*Tag), nil +} + +// Range exposes sync.Map's iterator +func (ts *Tags) Range(fn func(k, v interface{}) bool) { + ts.tags.Range(fn) +} diff --git a/swarm/sctx/sctx.go b/swarm/sctx/sctx.go index fb7d35b00..adc8c7dab 100644 --- a/swarm/sctx/sctx.go +++ b/swarm/sctx/sctx.go @@ -5,12 +5,15 @@ import "context" type ( HTTPRequestIDKey struct{} requestHostKey struct{} + tagKey struct{} ) +// SetHost sets the http request host in the context func SetHost(ctx context.Context, domain string) context.Context { return context.WithValue(ctx, requestHostKey{}, domain) } +// GetHost gets the request host from the context func GetHost(ctx context.Context) string { v, ok := ctx.Value(requestHostKey{}).(string) if ok { @@ -18,3 +21,17 @@ func GetHost(ctx context.Context) string { } return "" } + +// SetTag sets the tag unique identifier in the context +func SetTag(ctx context.Context, tagId uint32) context.Context { + return context.WithValue(ctx, tagKey{}, tagId) +} + +// GetTag gets the tag unique identifier from the context +func GetTag(ctx context.Context) uint32 { + v, ok := ctx.Value(tagKey{}).(uint32) + if ok { + return v + } + return 0 +}