go-waku-rendezvous/storage.go

162 lines
3.6 KiB
Go

package rendezvous
import (
"bytes"
"crypto/rand"
"encoding/gob"
"time"
"github.com/libp2p/go-libp2p/core/peer"
)
const (
RecordsPrefix byte = 1 + iota
TopicBodyDelimiter = 0xff
)
type Iterator interface {
Release()
Next() bool
Prev() bool
Value() []byte
Key() []byte
Seek([]byte) bool
}
type DB interface {
Put([]byte, []byte) error
Delete([]byte) error
NewIterator([]byte) Iterator
}
type RegistrationRecord struct {
PeerEnvelope []byte
Ns string
Ttl int
Deadline time.Time
}
// TopicPart looks for TopicBodyDelimiter and returns topic prefix from the same key.
// It doesn't allocate memory for topic prefix.
func TopicPart(key []byte) []byte {
idx := bytes.IndexByte(key, TopicBodyDelimiter)
if idx == -1 {
return nil
}
return key[1:idx] // first byte is RecordsPrefix
}
type RecordsKey []byte
func NewRecordsKey(ns string, id peer.ID) RecordsKey {
key := make(RecordsKey, 2+len([]byte(ns))+len(id))
key[0] = RecordsPrefix
copy(key[1:], []byte(ns))
key[1+len([]byte(ns))] = TopicBodyDelimiter
copy(key[2+len([]byte(ns)):], id)
return key
}
func (k RecordsKey) SamePrefix(prefix []byte) bool {
return bytes.Equal(k[:len(prefix)], prefix)
}
func (k RecordsKey) String() string {
return string(k)
}
// NewStorage creates instance of the storage.
func NewStorage(db DB) Storage {
return Storage{
db: db,
}
}
// Storage manages records.
type Storage struct {
db DB
}
// Add stores record using specified topic.
func (s Storage) Add(ns string, id peer.ID, envelope []byte, ttl int, deadline time.Time) (string, error) {
key := NewRecordsKey(ns, id)
stored := RegistrationRecord{
PeerEnvelope: envelope,
Ttl: ttl,
Ns: ns,
Deadline: deadline,
}
var data bytes.Buffer
encoder := gob.NewEncoder(&data)
err := encoder.Encode(stored)
if err != nil {
return "", err
}
return key.String(), s.db.Put(key, data.Bytes())
}
// RemoveBykey removes record from storage.
func (s *Storage) RemoveByKey(key string) error {
return s.db.Delete([]byte(key))
}
func (s *Storage) IterateAllKeys(iterator func(key RecordsKey, Deadline time.Time) error) error {
iter := s.db.NewIterator([]byte{RecordsPrefix})
defer iter.Release()
for iter.Next() {
var stored RegistrationRecord
data := bytes.NewBuffer(iter.Value())
decoder := gob.NewDecoder(data)
if err := decoder.Decode(&stored); err != nil {
return err
}
if err := iterator(RecordsKey(iter.Key()), stored.Deadline); err != nil {
return err
}
}
return nil
}
// GetRandom reads random records for specified topic up to specified limit.
func (s *Storage) GetRandom(ns string, limit int64) (rst []RegistrationRecord, err error) {
prefixlen := 1 + len([]byte(ns))
key := make(RecordsKey, prefixlen+32)
key[0] = RecordsPrefix
copy(key[1:], []byte(ns))
key[prefixlen] = TopicBodyDelimiter
prefixlen++
iter := s.db.NewIterator(key[:prefixlen])
defer iter.Release()
uids := map[string]struct{}{}
// it might be too much cause we do crypto/rand.Read. requires profiling
for i := int64(0); i < limit*limit && len(rst) < int(limit); i++ {
if _, err := rand.Read(key[prefixlen:]); err != nil {
return nil, err
}
iter.Seek(key)
for _, f := range []func() bool{iter.Prev, iter.Next} {
if f() && key.SamePrefix(iter.Key()[:prefixlen]) {
var stored RegistrationRecord
data := bytes.NewBuffer(iter.Value())
decoder := gob.NewDecoder(data)
if err = decoder.Decode(&stored); err != nil {
return nil, err
}
k := iter.Key()
if _, exist := uids[string(k)]; exist {
continue
}
uids[string(k)] = struct{}{}
rst = append(rst, stored)
break
}
}
}
return rst, nil
}