162 lines
3.6 KiB
Go
162 lines
3.6 KiB
Go
package rendezvous
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/rand"
|
|
"encoding/gob"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
)
|
|
|
|
const (
|
|
RecordsPrefix byte = 1 + iota
|
|
|
|
TopicBodyDelimiter = 0xff
|
|
)
|
|
|
|
type Iterator interface {
|
|
Release()
|
|
Next() bool
|
|
Prev() bool
|
|
Value() []byte
|
|
Key() []byte
|
|
Seek([]byte) bool
|
|
}
|
|
|
|
type DB interface {
|
|
Put([]byte, []byte) error
|
|
Delete([]byte) error
|
|
NewIterator([]byte) Iterator
|
|
}
|
|
|
|
type RegistrationRecord struct {
|
|
PeerEnvelope []byte
|
|
Ns string
|
|
Ttl int
|
|
Deadline time.Time
|
|
}
|
|
|
|
// TopicPart looks for TopicBodyDelimiter and returns topic prefix from the same key.
|
|
// It doesn't allocate memory for topic prefix.
|
|
func TopicPart(key []byte) []byte {
|
|
idx := bytes.IndexByte(key, TopicBodyDelimiter)
|
|
if idx == -1 {
|
|
return nil
|
|
}
|
|
return key[1:idx] // first byte is RecordsPrefix
|
|
}
|
|
|
|
type RecordsKey []byte
|
|
|
|
func NewRecordsKey(ns string, id peer.ID) RecordsKey {
|
|
key := make(RecordsKey, 2+len([]byte(ns))+len(id))
|
|
key[0] = RecordsPrefix
|
|
copy(key[1:], []byte(ns))
|
|
key[1+len([]byte(ns))] = TopicBodyDelimiter
|
|
copy(key[2+len([]byte(ns)):], id)
|
|
return key
|
|
}
|
|
|
|
func (k RecordsKey) SamePrefix(prefix []byte) bool {
|
|
return bytes.Equal(k[:len(prefix)], prefix)
|
|
}
|
|
|
|
func (k RecordsKey) String() string {
|
|
return string(k)
|
|
}
|
|
|
|
// NewStorage creates instance of the storage.
|
|
func NewStorage(db DB) Storage {
|
|
return Storage{
|
|
db: db,
|
|
}
|
|
}
|
|
|
|
// Storage manages records.
|
|
type Storage struct {
|
|
db DB
|
|
}
|
|
|
|
// Add stores record using specified topic.
|
|
func (s Storage) Add(ns string, id peer.ID, envelope []byte, ttl int, deadline time.Time) (string, error) {
|
|
key := NewRecordsKey(ns, id)
|
|
stored := RegistrationRecord{
|
|
PeerEnvelope: envelope,
|
|
Ttl: ttl,
|
|
Ns: ns,
|
|
Deadline: deadline,
|
|
}
|
|
|
|
var data bytes.Buffer
|
|
encoder := gob.NewEncoder(&data)
|
|
|
|
err := encoder.Encode(stored)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return key.String(), s.db.Put(key, data.Bytes())
|
|
}
|
|
|
|
// RemoveBykey removes record from storage.
|
|
func (s *Storage) RemoveByKey(key string) error {
|
|
return s.db.Delete([]byte(key))
|
|
}
|
|
|
|
func (s *Storage) IterateAllKeys(iterator func(key RecordsKey, Deadline time.Time) error) error {
|
|
iter := s.db.NewIterator([]byte{RecordsPrefix})
|
|
defer iter.Release()
|
|
|
|
for iter.Next() {
|
|
var stored RegistrationRecord
|
|
data := bytes.NewBuffer(iter.Value())
|
|
decoder := gob.NewDecoder(data)
|
|
if err := decoder.Decode(&stored); err != nil {
|
|
return err
|
|
}
|
|
if err := iterator(RecordsKey(iter.Key()), stored.Deadline); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetRandom reads random records for specified topic up to specified limit.
|
|
func (s *Storage) GetRandom(ns string, limit int64) (rst []RegistrationRecord, err error) {
|
|
prefixlen := 1 + len([]byte(ns))
|
|
key := make(RecordsKey, prefixlen+32)
|
|
key[0] = RecordsPrefix
|
|
copy(key[1:], []byte(ns))
|
|
key[prefixlen] = TopicBodyDelimiter
|
|
prefixlen++
|
|
|
|
iter := s.db.NewIterator(key[:prefixlen])
|
|
defer iter.Release()
|
|
uids := map[string]struct{}{}
|
|
// it might be too much cause we do crypto/rand.Read. requires profiling
|
|
for i := int64(0); i < limit*limit && len(rst) < int(limit); i++ {
|
|
if _, err := rand.Read(key[prefixlen:]); err != nil {
|
|
return nil, err
|
|
}
|
|
iter.Seek(key)
|
|
for _, f := range []func() bool{iter.Prev, iter.Next} {
|
|
if f() && key.SamePrefix(iter.Key()[:prefixlen]) {
|
|
var stored RegistrationRecord
|
|
data := bytes.NewBuffer(iter.Value())
|
|
decoder := gob.NewDecoder(data)
|
|
if err = decoder.Decode(&stored); err != nil {
|
|
return nil, err
|
|
}
|
|
k := iter.Key()
|
|
if _, exist := uids[string(k)]; exist {
|
|
continue
|
|
}
|
|
uids[string(k)] = struct{}{}
|
|
rst = append(rst, stored)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return rst, nil
|
|
}
|