go-waku/waku/v2/protocol/waku_store/waku_store.go

545 lines
14 KiB
Go
Raw Normal View History

2021-03-18 16:40:47 +00:00
package store
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"errors"
2021-03-22 16:45:13 +00:00
"fmt"
2021-03-18 16:40:47 +00:00
"sort"
"sync"
"time"
"github.com/cruxic/go-hmac-drbg/hmacdrbg"
2021-03-22 16:45:13 +00:00
logging "github.com/ipfs/go-log"
2021-03-18 16:40:47 +00:00
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
2021-04-07 21:16:29 +00:00
"github.com/libp2p/go-msgio/protoio"
2021-03-22 16:45:13 +00:00
2021-03-18 16:40:47 +00:00
ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/common"
2021-03-18 16:40:47 +00:00
"github.com/status-im/go-waku/waku/v2/protocol"
)
2021-03-22 16:45:13 +00:00
var log = logging.Logger("wakustore")
const WakuStoreProtocolId = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta1")
2021-03-18 16:40:47 +00:00
const MaxPageSize = 100 // Maximum number of waku messages in each page
const ConnectionTimeout = 10 * time.Second
const DefaultContentTopic = "/waku/2/default-content/proto"
2021-03-18 16:40:47 +00:00
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrInvalidId = errors.New("invalid request id")
)
2021-03-18 16:40:47 +00:00
func minOf(vars ...int) int {
min := vars[0]
for _, i := range vars {
if min > i {
min = i
}
}
return min
}
func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (resMessages []IndexedWakuMessage, resPagingInfo *protocol.PagingInfo) {
// takes list, and performs paging based on pinfo
// returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request
cursor := pinfo.Cursor
pageSize := pinfo.PageSize
dir := pinfo.Direction
if pageSize == 0 { // pageSize being zero indicates that no pagination is required
return list, pinfo
}
if len(list) == 0 { // no pagination is needed for an empty list
return list, &protocol.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction}
}
msgList := make([]IndexedWakuMessage, len(list))
_ = copy(msgList, list) // makes a copy of the list
sort.Slice(msgList, func(i, j int) bool { // sorts msgList based on the custom comparison proc indexedWakuMessageComparison
return indexedWakuMessageComparison(msgList[i], msgList[j]) == -1
})
initQuery := false
if cursor == nil {
initQuery = true // an empty cursor means it is an initial query
switch dir {
case protocol.PagingInfo_FORWARD:
cursor = list[0].index // perform paging from the begining of the list
case protocol.PagingInfo_BACKWARD:
cursor = list[len(list)-1].index // perform paging from the end of the list
}
}
foundIndex := findIndex(msgList, cursor)
2021-03-18 16:40:47 +00:00
if foundIndex == -1 { // the cursor is not valid
return nil, &protocol.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction}
}
var retrievedPageSize, s, e int
var newCursor *protocol.Index // to be returned as part of the new paging info
switch dir {
case protocol.PagingInfo_FORWARD: // forward pagination
remainingMessages := len(msgList) - foundIndex - 1
if initQuery {
remainingMessages = remainingMessages + 1
2021-03-18 16:40:47 +00:00
foundIndex = foundIndex - 1
}
// the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., msgList.len-foundIndex
retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages)
2021-03-18 16:40:47 +00:00
s = foundIndex + 1 // non inclusive
e = foundIndex + retrievedPageSize
newCursor = msgList[e].index // the new cursor points to the end of the page
case protocol.PagingInfo_BACKWARD: // backward pagination
remainingMessages := foundIndex
if initQuery {
remainingMessages = remainingMessages + 1
2021-03-18 16:40:47 +00:00
foundIndex = foundIndex + 1
}
// the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., foundIndex-0
retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages)
2021-03-18 16:40:47 +00:00
s = foundIndex - retrievedPageSize
e = foundIndex - 1
newCursor = msgList[s].index // the new cursor points to the begining of the page
}
// retrieve the messages
for i := s; i <= e; i++ {
resMessages = append(resMessages, msgList[i])
}
2021-04-07 21:16:29 +00:00
resPagingInfo = &protocol.PagingInfo{PageSize: uint64(retrievedPageSize), Cursor: newCursor, Direction: pinfo.Direction}
2021-03-18 16:40:47 +00:00
return
}
func paginateWithoutIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (resMessages []*protocol.WakuMessage, resPinfo *protocol.PagingInfo) {
// takes list, and performs paging based on pinfo
// returns the page i.e, a sequence of WakuMessage and the new paging info to be used for the next paging request
indexedData, updatedPagingInfo := paginateWithIndex(list, pinfo)
for _, indexedMsg := range indexedData {
resMessages = append(resMessages, indexedMsg.msg)
}
resPinfo = updatedPagingInfo
return
}
func contains(s []string, e string) bool {
2021-03-18 16:40:47 +00:00
for _, a := range s {
if a == e {
return true
}
}
return false
}
func (w *WakuStore) FindMessages(query *protocol.HistoryQuery) *protocol.HistoryResponse {
result := new(protocol.HistoryResponse)
// data holds IndexedWakuMessage whose topics match the query
var data []IndexedWakuMessage
for _, indexedMsg := range w.messages {
// temporal filtering
// check whether the history query contains a time filter
if query.StartTime != 0 && query.EndTime != 0 {
if indexedMsg.msg.Timestamp < query.StartTime || indexedMsg.msg.Timestamp > query.EndTime {
continue
}
}
2021-04-07 21:16:29 +00:00
if contains(query.Topics, indexedMsg.msg.ContentTopic) {
2021-03-18 16:40:47 +00:00
data = append(data, indexedMsg)
}
}
result.Messages, result.PagingInfo = paginateWithoutIndex(data, query.PagingInfo)
return result
}
type MessageProvider interface {
GetAll() ([]*protocol.WakuMessage, error)
Put(cursor *protocol.Index, message *protocol.WakuMessage) error
Stop()
2021-03-18 16:40:47 +00:00
}
type IndexedWakuMessage struct {
msg *protocol.WakuMessage
index *protocol.Index
}
type WakuStore struct {
MsgC chan *common.Envelope
2021-03-22 16:45:13 +00:00
messages []IndexedWakuMessage
messagesMutex sync.Mutex
msgProvider MessageProvider
2021-03-18 16:40:47 +00:00
h host.Host
ctx context.Context
}
func NewWakuStore(ctx context.Context, h host.Host, p MessageProvider) *WakuStore {
2021-03-18 16:40:47 +00:00
wakuStore := new(WakuStore)
wakuStore.MsgC = make(chan *common.Envelope)
2021-03-18 16:40:47 +00:00
wakuStore.msgProvider = p
wakuStore.h = h
wakuStore.ctx = ctx
return wakuStore
}
2021-03-18 16:40:47 +00:00
func (store *WakuStore) Start() {
2021-04-07 21:16:29 +00:00
if store.msgProvider == nil {
return
}
store.h.SetStreamHandler(WakuStoreProtocolId, store.onRequest)
2021-03-18 16:40:47 +00:00
2021-03-22 16:45:13 +00:00
messages, err := store.msgProvider.GetAll()
if err != nil {
log.Error("could not load DBProvider messages")
return
}
for _, msg := range messages {
idx, err := computeIndex(msg)
if err != nil {
log.Error("could not calculate message index", err)
continue
}
store.messages = append(store.messages, IndexedWakuMessage{msg: msg, index: idx})
}
go store.storeIncomingMessages()
2021-03-18 16:40:47 +00:00
}
2021-03-22 16:45:13 +00:00
func (store *WakuStore) storeIncomingMessages() {
for envelope := range store.MsgC {
index, err := computeIndex(envelope.Message())
2021-03-18 16:40:47 +00:00
if err != nil {
2021-03-22 16:45:13 +00:00
log.Error("could not calculate message index", err)
2021-03-18 16:40:47 +00:00
continue
}
2021-03-22 16:45:13 +00:00
store.messagesMutex.Lock()
store.messages = append(store.messages, IndexedWakuMessage{msg: envelope.Message(), index: index})
2021-03-22 16:45:13 +00:00
store.messagesMutex.Unlock()
2021-03-18 16:40:47 +00:00
if store.msgProvider == nil {
continue
}
err = store.msgProvider.Put(index, envelope.Message()) // Should the index be stored?
2021-03-22 16:45:13 +00:00
if err != nil {
log.Error("could not store message", err)
continue
}
2021-03-18 16:40:47 +00:00
}
}
func (store *WakuStore) onRequest(s network.Stream) {
defer s.Close()
historyRPCRequest := &protocol.HistoryRPC{}
2021-04-07 21:16:29 +00:00
writer := protoio.NewDelimitedWriter(s)
reader := protoio.NewDelimitedReader(s, 64*1024)
2021-04-07 21:16:29 +00:00
err := reader.ReadMsg(historyRPCRequest)
2021-03-18 16:40:47 +00:00
if err != nil {
2021-03-22 16:45:13 +00:00
log.Error("error reading request", err)
2021-03-18 16:40:47 +00:00
return
}
2021-03-22 16:45:13 +00:00
log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer()))
2021-03-18 16:40:47 +00:00
historyResponseRPC := &protocol.HistoryRPC{}
historyResponseRPC.RequestId = historyRPCRequest.RequestId
historyResponseRPC.Response = store.FindMessages(historyRPCRequest.Query)
2021-04-07 21:16:29 +00:00
err = writer.WriteMsg(historyResponseRPC)
2021-03-18 16:40:47 +00:00
if err != nil {
2021-03-22 16:45:13 +00:00
log.Error("error writing response", err)
2021-03-18 16:40:47 +00:00
s.Reset()
} else {
2021-03-22 16:45:13 +00:00
log.Info(fmt.Sprintf("%s: Response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String()))
2021-03-18 16:40:47 +00:00
}
}
func computeIndex(msg *protocol.WakuMessage) (*protocol.Index, error) {
2021-04-07 21:16:29 +00:00
data, err := msg.Marshal()
2021-03-18 16:40:47 +00:00
if err != nil {
return nil, err
}
digest := sha256.Sum256(data)
return &protocol.Index{
Digest: digest[:],
2021-04-04 19:33:21 +00:00
ReceivedTime: float64(time.Now().UnixNano()),
2021-03-18 16:40:47 +00:00
}, nil
}
func indexComparison(x, y *protocol.Index) int {
// compares x and y
// returns 0 if they are equal
// returns -1 if x < y
// returns 1 if x > y
var timecmp int = 0 // TODO: ask waku team why Index ReceivedTime is is float?
if x.ReceivedTime > y.ReceivedTime {
timecmp = 1
} else if x.ReceivedTime < y.ReceivedTime {
timecmp = -1
}
digestcm := bytes.Compare(x.Digest, y.Digest)
if timecmp != 0 {
return timecmp // timestamp has a higher priority for comparison
}
return digestcm
}
func indexedWakuMessageComparison(x, y IndexedWakuMessage) int {
// compares x and y
// returns 0 if they are equal
// returns -1 if x < y
// returns 1 if x > y
return indexComparison(x.index, y.index)
}
func findIndex(msgList []IndexedWakuMessage, index *protocol.Index) int {
2021-03-18 16:40:47 +00:00
// returns the position of an IndexedWakuMessage in msgList whose index value matches the given index
// returns -1 if no match is found
for i, indexedWakuMessage := range msgList {
2021-03-18 16:40:47 +00:00
if bytes.Compare(indexedWakuMessage.index.Digest, index.Digest) == 0 && indexedWakuMessage.index.ReceivedTime == index.ReceivedTime {
return i
}
}
return -1
}
func (store *WakuStore) AddPeer(p peer.ID, addrs []ma.Multiaddr) error {
for _, addr := range addrs {
store.h.Peerstore().AddAddr(p, addr, peerstore.PermanentAddrTTL)
}
err := store.h.Peerstore().AddProtocols(p, string(WakuStoreProtocolId))
if err != nil {
return err
}
return nil
}
func (store *WakuStore) selectPeer() *peer.ID {
// @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
// This will require us to check for various factors such as:
// - which topics they track
// - latency?
// - default store peer?
// Selects the best peer for a given protocol
var peers peer.IDSlice
for _, peer := range store.h.Peerstore().Peers() {
protocols, err := store.h.Peerstore().SupportsProtocols(peer, string(WakuStoreProtocolId))
if err != nil {
2021-03-22 16:45:13 +00:00
log.Error("error obtaining the protocols supported by peers", err)
2021-03-18 16:40:47 +00:00
return nil
}
if len(protocols) > 0 {
peers = append(peers, peer)
}
}
if len(peers) >= 1 {
// TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
return &peers[0]
}
return nil
}
var brHmacDrbgPool = sync.Pool{New: func() interface{} {
seed := make([]byte, 48)
_, err := rand.Read(seed)
if err != nil {
log.Fatal(err)
}
return hmacdrbg.NewHmacDrbg(256, seed, nil)
}}
func GenerateRequestId() []byte {
2021-03-18 16:40:47 +00:00
rng := brHmacDrbgPool.Get().(*hmacdrbg.HmacDrbg)
defer brHmacDrbgPool.Put(rng)
2021-04-15 17:55:40 +00:00
randData := make([]byte, 32)
2021-03-18 16:40:47 +00:00
if !rng.Generate(randData) {
//Reseed is required every 10,000 calls
seed := make([]byte, 48)
_, err := rand.Read(seed)
if err != nil {
log.Fatal(err)
}
err = rng.Reseed(seed)
if err != nil {
//only happens if seed < security-level
log.Fatal(err)
}
if !rng.Generate(randData) {
2021-03-22 16:45:13 +00:00
log.Error("could not generate random request id")
2021-03-18 16:40:47 +00:00
}
}
return randData
}
type HistoryRequestParameters struct {
2021-04-15 17:55:40 +00:00
selectedPeer peer.ID
requestId []byte
timeout *time.Duration
ctx context.Context
cancelFunc context.CancelFunc
cursor *protocol.Index
pageSize uint64
asc bool
s *WakuStore
}
type HistoryRequestOption func(*HistoryRequestParameters)
2021-04-15 17:55:40 +00:00
func WithPeer(p peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
2021-04-15 17:55:40 +00:00
params.selectedPeer = p
}
}
func WithAutomaticPeerSelection() HistoryRequestOption {
return func(params *HistoryRequestParameters) {
2021-04-15 17:55:40 +00:00
p := params.s.selectPeer()
params.selectedPeer = *p
}
}
func WithRequestId(requestId []byte) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.requestId = requestId
}
}
func WithAutomaticRequestId() HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.requestId = GenerateRequestId()
}
}
func WithTimeout(t time.Duration) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.timeout = &t
params.ctx, params.cancelFunc = context.WithTimeout(params.s.ctx, t)
}
2021-03-18 16:40:47 +00:00
}
func WithCursor(c *protocol.Index) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.cursor = c
}
}
func WithPaging(asc bool, pageSize uint64) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.asc = asc
params.pageSize = pageSize
}
}
func DefaultOptions() []HistoryRequestOption {
return []HistoryRequestOption{
WithAutomaticRequestId(),
WithAutomaticPeerSelection(),
WithTimeout(ConnectionTimeout),
WithPaging(true, 0),
}
}
func (store *WakuStore) Query(q *protocol.HistoryQuery, opts ...HistoryRequestOption) (*protocol.HistoryResponse, error) {
params := new(HistoryRequestParameters)
params.s = store
for _, opt := range opts {
opt(params)
}
2021-04-15 17:55:40 +00:00
if params.selectedPeer == "" {
return nil, ErrNoPeersAvailable
}
if len(params.requestId) == 0 {
return nil, ErrInvalidId
}
// Setting default timeout if none is specified
if params.timeout == nil {
timeoutF := WithTimeout(ConnectionTimeout)
timeoutF(params)
}
if *params.timeout == 0 {
params.ctx = store.ctx
} else {
defer params.cancelFunc()
}
if params.cursor != nil {
q.PagingInfo.Cursor = params.cursor
}
if params.asc {
q.PagingInfo.Direction = protocol.PagingInfo_FORWARD
} else {
q.PagingInfo.Direction = protocol.PagingInfo_BACKWARD
2021-03-18 16:40:47 +00:00
}
q.PagingInfo.PageSize = params.pageSize
2021-03-18 16:40:47 +00:00
2021-04-15 17:55:40 +00:00
connOpt, err := store.h.NewStream(params.ctx, params.selectedPeer, WakuStoreProtocolId)
2021-03-18 16:40:47 +00:00
if err != nil {
2021-03-22 16:45:13 +00:00
log.Info("failed to connect to remote peer", err)
2021-03-18 16:40:47 +00:00
return nil, err
}
defer connOpt.Close()
defer connOpt.Reset()
historyRequest := &protocol.HistoryRPC{Query: q, RequestId: hex.EncodeToString(params.requestId)}
2021-04-07 21:16:29 +00:00
writer := protoio.NewDelimitedWriter(connOpt)
reader := protoio.NewDelimitedReader(connOpt, 64*1024)
2021-03-18 16:40:47 +00:00
2021-04-07 21:16:29 +00:00
err = writer.WriteMsg(historyRequest)
2021-03-18 16:40:47 +00:00
if err != nil {
2021-04-07 21:16:29 +00:00
log.Error("could not write request", err)
2021-03-18 16:40:47 +00:00
return nil, err
}
historyResponseRPC := &protocol.HistoryRPC{}
2021-04-07 21:16:29 +00:00
err = reader.ReadMsg(historyResponseRPC)
2021-03-18 16:40:47 +00:00
if err != nil {
2021-04-07 21:16:29 +00:00
log.Error("could not read response", err)
2021-03-18 16:40:47 +00:00
return nil, err
}
return historyResponseRPC.Response, nil
}
// TODO: queryWithAccounting