Waku store - initial implementation

This commit is contained in:
Richard Ramos 2021-03-18 12:40:47 -04:00
parent 783ad01d92
commit c4ed58c188
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
9 changed files with 1045 additions and 63 deletions

4
go.mod
View File

@ -5,7 +5,9 @@ go 1.15
replace github.com/libp2p/go-libp2p-pubsub => github.com/status-im/go-libp2p-pubsub v0.4.1-customProtocols
require (
github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d
github.com/ethereum/go-ethereum v1.10.1
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.4.3
github.com/ipfs/go-log/v2 v2.1.1
github.com/libp2p/go-libp2p v0.13.0
@ -14,7 +16,7 @@ require (
github.com/minio/sha256-simd v0.1.1
github.com/multiformats/go-multiaddr v0.3.1
github.com/multiformats/go-multiaddr-net v0.2.0
github.com/status-im/status-go/eth-node v1.1.0 // indirect
github.com/status-im/status-go/eth-node v1.1.0
github.com/status-im/status-go/extkeys v1.1.2 // indirect
google.golang.org/protobuf v1.25.0
)

2
go.sum
View File

@ -118,6 +118,8 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d h1:bE1UyBQ5aE6FjhNY4lbPtMqh7VDldoVkvZMtFEbd+CE=
github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d/go.mod h1:HAe1wsCrwH2uFnFaCC2vlcyEohnxs8KeShAFqGIHvmM=
github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@ -32,6 +32,7 @@ func main() {
}
wakuNode.MountRelay()
wakuNode.MountStore()
sub, err := wakuNode.Subscribe(nil)
if err != nil {
@ -40,7 +41,6 @@ func main() {
// Read loop
go func() {
for {
for value := range sub.C {
payload, err := node.DecodePayload(value, &node.KeyInfo{Kind: node.None})
if err != nil {
@ -51,7 +51,7 @@ func main() {
fmt.Println("Received message:", string(payload))
// sub.Unsubscribe()
}
}
}()
// Write loop

View File

@ -1,46 +0,0 @@
package node
import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peerstore"
)
type Connectedness string
const (
// NotConnected: default state for a new peer. No connection and no further information on connectedness.
NotConnected Connectedness = "NotConnected"
// CannotConnect: attempted to connect to peer, but failed.
CannotConnect Connectedness = "CannotConnect"
// CanConnect: was recently connected to peer and disconnected gracefully.
CanConnect Connectedness = "CanConnect"
// Connected: actively connected to peer.
Connected Connectedness = "Connected"
)
/*
type
ConnectionBook* = object of PeerBook[Connectedness]
*/
type WakuPeerStore struct {
connectionBook peerstore.Peerstore
}
type PeerManager struct {
sw host.Host
peerStore *WakuPeerStore
}
func NewWakuPeerStore() *WakuPeerStore {
p := new(WakuPeerStore)
return p
}
func NewPeerManager(sw host.Host) *PeerManager {
peerStore := NewWakuPeerStore()
p := new(PeerManager)
p.sw = sw
p.peerStore = peerStore
return p
}

View File

@ -14,11 +14,13 @@ import (
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/status-im/go-waku/waku/v2/protocol"
store "github.com/status-im/go-waku/waku/v2/protocol/waku_store"
)
// Default clientId
@ -53,6 +55,7 @@ type WakuNode struct {
// peerManager *PeerManager
host host.Host
pubsub *pubsub.PubSub
store *store.WakuStore
topics map[Topic]*pubsub.Topic
topicsLock sync.Mutex
@ -156,9 +159,38 @@ func (w *WakuNode) MountRelay() error {
return nil
}
func (w *WakuNode) MountStore() error {
sub, err := w.Subscribe(nil)
if err != nil {
return err
}
// TODO: kill subscription on close
w.store = store.NewWakuStore(w.ctx, w.host, sub.C, new(store.MessageProvider)) // TODO: pass store
return nil
}
func (w *WakuNode) AddStorePeer(address string) error {
if w.store == nil {
return errors.New("WakuStore is not set")
}
storePeer, err := ma.NewMultiaddr(address)
if err != nil {
return err
}
// Extract the peer ID from the multiaddr.
info, err := peer.AddrInfoFromP2pAddr(storePeer)
if err != nil {
return err
}
return w.store.AddPeer(info.ID, info.Addrs)
}
func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
// Subscribes to a PubSub topic. Triggers handler when receiving messages on
// this topic. TopicHandler is a method that takes a topic and some data.
// Subscribes to a PubSub topic.
// NOTE The data field SHOULD be decoded as a WakuMessage.
if node.pubsub == nil {
@ -195,10 +227,10 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
return
case <-nextMsgTicker.C:
msg, err := sub.Next(ctx)
if err != nil {
fmt.Println("Error receiving message", err)
return // Should close channel?
close(subscription.quit)
return
}
wakuMessage := &protocol.WakuMessage{}

View File

@ -12,7 +12,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
const WakuRelayCodec = libp2pProtocol.ID("/vac/waku/relay/2.0.0-beta2")
const WakuRelayProtocol = libp2pProtocol.ID("/vac/waku/relay/2.0.0-beta2")
type WakuRelay struct {
p *pubsub.PubSub
@ -24,7 +24,7 @@ func NewWakuRelay(ctx context.Context, h host.Host, opts ...pubsub.Option) (*pub
//opts = append(opts, pubsub.WithMessageIdFn(messageIdFn))
opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
gossipSub, err := pubsub.NewGossipSub(ctx, h, []libp2pProtocol.ID{WakuRelayCodec}, opts...)
gossipSub, err := pubsub.NewGossipSub(ctx, h, []libp2pProtocol.ID{WakuRelayProtocol}, opts...)
if err != nil {
return nil, err

View File

@ -0,0 +1,537 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.25.0-devel
// protoc v3.14.0
// source: waku_store.proto
package protocol
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type PagingInfo_Direction int32
const (
PagingInfo_FORWARD PagingInfo_Direction = 0
PagingInfo_BACKWARD PagingInfo_Direction = 1
)
// Enum value maps for PagingInfo_Direction.
var (
PagingInfo_Direction_name = map[int32]string{
0: "FORWARD",
1: "BACKWARD",
}
PagingInfo_Direction_value = map[string]int32{
"FORWARD": 0,
"BACKWARD": 1,
}
)
func (x PagingInfo_Direction) Enum() *PagingInfo_Direction {
p := new(PagingInfo_Direction)
*p = x
return p
}
func (x PagingInfo_Direction) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (PagingInfo_Direction) Descriptor() protoreflect.EnumDescriptor {
return file_waku_store_proto_enumTypes[0].Descriptor()
}
func (PagingInfo_Direction) Type() protoreflect.EnumType {
return &file_waku_store_proto_enumTypes[0]
}
func (x PagingInfo_Direction) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Use PagingInfo_Direction.Descriptor instead.
func (PagingInfo_Direction) EnumDescriptor() ([]byte, []int) {
return file_waku_store_proto_rawDescGZIP(), []int{1, 0}
}
type Index struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Digest []byte `protobuf:"bytes,1,opt,name=digest,proto3" json:"digest,omitempty"`
ReceivedTime float64 `protobuf:"fixed64,2,opt,name=receivedTime,proto3" json:"receivedTime,omitempty"`
}
func (x *Index) Reset() {
*x = Index{}
if protoimpl.UnsafeEnabled {
mi := &file_waku_store_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Index) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Index) ProtoMessage() {}
func (x *Index) ProtoReflect() protoreflect.Message {
mi := &file_waku_store_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Index.ProtoReflect.Descriptor instead.
func (*Index) Descriptor() ([]byte, []int) {
return file_waku_store_proto_rawDescGZIP(), []int{0}
}
func (x *Index) GetDigest() []byte {
if x != nil {
return x.Digest
}
return nil
}
func (x *Index) GetReceivedTime() float64 {
if x != nil {
return x.ReceivedTime
}
return 0
}
type PagingInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
PageSize int64 `protobuf:"varint,1,opt,name=pageSize,proto3" json:"pageSize,omitempty"`
Cursor *Index `protobuf:"bytes,2,opt,name=cursor,proto3" json:"cursor,omitempty"`
Direction PagingInfo_Direction `protobuf:"varint,3,opt,name=direction,proto3,enum=protocol.PagingInfo_Direction" json:"direction,omitempty"`
}
func (x *PagingInfo) Reset() {
*x = PagingInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_waku_store_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PagingInfo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PagingInfo) ProtoMessage() {}
func (x *PagingInfo) ProtoReflect() protoreflect.Message {
mi := &file_waku_store_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PagingInfo.ProtoReflect.Descriptor instead.
func (*PagingInfo) Descriptor() ([]byte, []int) {
return file_waku_store_proto_rawDescGZIP(), []int{1}
}
func (x *PagingInfo) GetPageSize() int64 {
if x != nil {
return x.PageSize
}
return 0
}
func (x *PagingInfo) GetCursor() *Index {
if x != nil {
return x.Cursor
}
return nil
}
func (x *PagingInfo) GetDirection() PagingInfo_Direction {
if x != nil {
return x.Direction
}
return PagingInfo_FORWARD
}
type HistoryQuery struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Topics []uint32 `protobuf:"varint,2,rep,packed,name=topics,proto3" json:"topics,omitempty"`
PagingInfo *PagingInfo `protobuf:"bytes,3,opt,name=pagingInfo,proto3,oneof" json:"pagingInfo,omitempty"` // used for pagination
}
func (x *HistoryQuery) Reset() {
*x = HistoryQuery{}
if protoimpl.UnsafeEnabled {
mi := &file_waku_store_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *HistoryQuery) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HistoryQuery) ProtoMessage() {}
func (x *HistoryQuery) ProtoReflect() protoreflect.Message {
mi := &file_waku_store_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HistoryQuery.ProtoReflect.Descriptor instead.
func (*HistoryQuery) Descriptor() ([]byte, []int) {
return file_waku_store_proto_rawDescGZIP(), []int{2}
}
func (x *HistoryQuery) GetTopics() []uint32 {
if x != nil {
return x.Topics
}
return nil
}
func (x *HistoryQuery) GetPagingInfo() *PagingInfo {
if x != nil {
return x.PagingInfo
}
return nil
}
type HistoryResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Messages []*WakuMessage `protobuf:"bytes,2,rep,name=messages,proto3" json:"messages,omitempty"`
PagingInfo *PagingInfo `protobuf:"bytes,3,opt,name=pagingInfo,proto3,oneof" json:"pagingInfo,omitempty"` // used for pagination
}
func (x *HistoryResponse) Reset() {
*x = HistoryResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_waku_store_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *HistoryResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HistoryResponse) ProtoMessage() {}
func (x *HistoryResponse) ProtoReflect() protoreflect.Message {
mi := &file_waku_store_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HistoryResponse.ProtoReflect.Descriptor instead.
func (*HistoryResponse) Descriptor() ([]byte, []int) {
return file_waku_store_proto_rawDescGZIP(), []int{3}
}
func (x *HistoryResponse) GetMessages() []*WakuMessage {
if x != nil {
return x.Messages
}
return nil
}
func (x *HistoryResponse) GetPagingInfo() *PagingInfo {
if x != nil {
return x.PagingInfo
}
return nil
}
type HistoryRPC struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"`
Query *HistoryQuery `protobuf:"bytes,2,opt,name=query,proto3" json:"query,omitempty"`
Response *HistoryResponse `protobuf:"bytes,3,opt,name=response,proto3" json:"response,omitempty"`
}
func (x *HistoryRPC) Reset() {
*x = HistoryRPC{}
if protoimpl.UnsafeEnabled {
mi := &file_waku_store_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *HistoryRPC) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HistoryRPC) ProtoMessage() {}
func (x *HistoryRPC) ProtoReflect() protoreflect.Message {
mi := &file_waku_store_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HistoryRPC.ProtoReflect.Descriptor instead.
func (*HistoryRPC) Descriptor() ([]byte, []int) {
return file_waku_store_proto_rawDescGZIP(), []int{4}
}
func (x *HistoryRPC) GetRequestId() string {
if x != nil {
return x.RequestId
}
return ""
}
func (x *HistoryRPC) GetQuery() *HistoryQuery {
if x != nil {
return x.Query
}
return nil
}
func (x *HistoryRPC) GetResponse() *HistoryResponse {
if x != nil {
return x.Response
}
return nil
}
var File_waku_store_proto protoreflect.FileDescriptor
var file_waku_store_proto_rawDesc = []byte{
0x0a, 0x10, 0x77, 0x61, 0x6b, 0x75, 0x5f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x12, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x1a, 0x12, 0x77, 0x61,
0x6b, 0x75, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x22, 0x43, 0x0a, 0x05, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x16, 0x0a, 0x06, 0x64, 0x69, 0x67,
0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x64, 0x69, 0x67, 0x65, 0x73,
0x74, 0x12, 0x22, 0x0a, 0x0c, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, 0x54, 0x69, 0x6d,
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x01, 0x52, 0x0c, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65,
0x64, 0x54, 0x69, 0x6d, 0x65, 0x22, 0xb7, 0x01, 0x0a, 0x0a, 0x50, 0x61, 0x67, 0x69, 0x6e, 0x67,
0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x67, 0x65, 0x53, 0x69, 0x7a, 0x65,
0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x70, 0x61, 0x67, 0x65, 0x53, 0x69, 0x7a, 0x65,
0x12, 0x27, 0x0a, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x49, 0x6e, 0x64, 0x65,
0x78, 0x52, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x3c, 0x0a, 0x09, 0x64, 0x69, 0x72,
0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1e, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x50, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x49, 0x6e,
0x66, 0x6f, 0x2e, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x64, 0x69,
0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x26, 0x0a, 0x09, 0x44, 0x69, 0x72, 0x65, 0x63,
0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x4f, 0x52, 0x57, 0x41, 0x52, 0x44, 0x10,
0x00, 0x12, 0x0c, 0x0a, 0x08, 0x42, 0x41, 0x43, 0x4b, 0x57, 0x41, 0x52, 0x44, 0x10, 0x01, 0x22,
0x70, 0x0a, 0x0c, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12,
0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0d, 0x52,
0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x70, 0x61, 0x67, 0x69, 0x6e,
0x67, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x50, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x49, 0x6e, 0x66,
0x6f, 0x48, 0x00, 0x52, 0x0a, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x49, 0x6e, 0x66, 0x6f, 0x88,
0x01, 0x01, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x49, 0x6e, 0x66,
0x6f, 0x22, 0x8e, 0x01, 0x0a, 0x0f, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x31, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63,
0x6f, 0x6c, 0x2e, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08,
0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x70, 0x61, 0x67, 0x69,
0x6e, 0x67, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x50, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x49, 0x6e,
0x66, 0x6f, 0x48, 0x00, 0x52, 0x0a, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x49, 0x6e, 0x66, 0x6f,
0x88, 0x01, 0x01, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x49, 0x6e,
0x66, 0x6f, 0x22, 0x90, 0x01, 0x0a, 0x0a, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x52, 0x50,
0x43, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64,
0x12, 0x2c, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x48, 0x69, 0x73, 0x74, 0x6f,
0x72, 0x79, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x35,
0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x48, 0x69, 0x73, 0x74,
0x6f, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x52, 0x08, 0x72, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_waku_store_proto_rawDescOnce sync.Once
file_waku_store_proto_rawDescData = file_waku_store_proto_rawDesc
)
func file_waku_store_proto_rawDescGZIP() []byte {
file_waku_store_proto_rawDescOnce.Do(func() {
file_waku_store_proto_rawDescData = protoimpl.X.CompressGZIP(file_waku_store_proto_rawDescData)
})
return file_waku_store_proto_rawDescData
}
var file_waku_store_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_waku_store_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_waku_store_proto_goTypes = []interface{}{
(PagingInfo_Direction)(0), // 0: protocol.PagingInfo.Direction
(*Index)(nil), // 1: protocol.Index
(*PagingInfo)(nil), // 2: protocol.PagingInfo
(*HistoryQuery)(nil), // 3: protocol.HistoryQuery
(*HistoryResponse)(nil), // 4: protocol.HistoryResponse
(*HistoryRPC)(nil), // 5: protocol.HistoryRPC
(*WakuMessage)(nil), // 6: protocol.WakuMessage
}
var file_waku_store_proto_depIdxs = []int32{
1, // 0: protocol.PagingInfo.cursor:type_name -> protocol.Index
0, // 1: protocol.PagingInfo.direction:type_name -> protocol.PagingInfo.Direction
2, // 2: protocol.HistoryQuery.pagingInfo:type_name -> protocol.PagingInfo
6, // 3: protocol.HistoryResponse.messages:type_name -> protocol.WakuMessage
2, // 4: protocol.HistoryResponse.pagingInfo:type_name -> protocol.PagingInfo
3, // 5: protocol.HistoryRPC.query:type_name -> protocol.HistoryQuery
4, // 6: protocol.HistoryRPC.response:type_name -> protocol.HistoryResponse
7, // [7:7] is the sub-list for method output_type
7, // [7:7] is the sub-list for method input_type
7, // [7:7] is the sub-list for extension type_name
7, // [7:7] is the sub-list for extension extendee
0, // [0:7] is the sub-list for field type_name
}
func init() { file_waku_store_proto_init() }
func file_waku_store_proto_init() {
if File_waku_store_proto != nil {
return
}
file_waku_message_proto_init()
if !protoimpl.UnsafeEnabled {
file_waku_store_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Index); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_waku_store_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PagingInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_waku_store_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*HistoryQuery); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_waku_store_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*HistoryResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_waku_store_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*HistoryRPC); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
file_waku_store_proto_msgTypes[2].OneofWrappers = []interface{}{}
file_waku_store_proto_msgTypes[3].OneofWrappers = []interface{}{}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_waku_store_proto_rawDesc,
NumEnums: 1,
NumMessages: 5,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_waku_store_proto_goTypes,
DependencyIndexes: file_waku_store_proto_depIdxs,
EnumInfos: file_waku_store_proto_enumTypes,
MessageInfos: file_waku_store_proto_msgTypes,
}.Build()
File_waku_store_proto = out.File
file_waku_store_proto_rawDesc = nil
file_waku_store_proto_goTypes = nil
file_waku_store_proto_depIdxs = nil
}

View File

@ -0,0 +1,36 @@
syntax = "proto3";
package protocol;
import "waku_message.proto";
message Index {
bytes digest = 1;
double receivedTime = 2;
}
message PagingInfo {
int64 pageSize = 1;
Index cursor = 2;
enum Direction {
FORWARD = 0;
BACKWARD = 1;
}
Direction direction = 3;
}
message HistoryQuery {
repeated uint32 topics = 2;
optional PagingInfo pagingInfo = 3; // used for pagination
}
message HistoryResponse {
repeated WakuMessage messages = 2;
optional PagingInfo pagingInfo = 3; // used for pagination
}
message HistoryRPC {
string request_id = 1;
HistoryQuery query = 2;
HistoryResponse response = 3;
}

View File

@ -0,0 +1,419 @@
package store
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"errors"
"io/ioutil"
"log"
"sort"
"sync"
"time"
"github.com/cruxic/go-hmac-drbg/hmacdrbg"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/v2/protocol"
"google.golang.org/protobuf/proto"
)
const WakuStoreProtocolId = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta2")
const MaxPageSize = 100 // Maximum number of waku messages in each page
const ConnectionTimeout = 10 * time.Second
func minOf(vars ...int) int {
min := vars[0]
for _, i := range vars {
if min > i {
min = i
}
}
return min
}
func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (resMessages []IndexedWakuMessage, resPagingInfo *protocol.PagingInfo) {
// takes list, and performs paging based on pinfo
// returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request
cursor := pinfo.Cursor
pageSize := pinfo.PageSize
dir := pinfo.Direction
if pageSize == 0 { // pageSize being zero indicates that no pagination is required
return list, pinfo
}
if len(list) == 0 { // no pagination is needed for an empty list
return list, &protocol.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction}
}
msgList := make([]IndexedWakuMessage, len(list))
_ = copy(msgList, list) // makes a copy of the list
sort.Slice(msgList, func(i, j int) bool { // sorts msgList based on the custom comparison proc indexedWakuMessageComparison
return indexedWakuMessageComparison(msgList[i], msgList[j]) == -1
})
initQuery := false
if cursor == nil {
initQuery = true // an empty cursor means it is an initial query
switch dir {
case protocol.PagingInfo_FORWARD:
cursor = list[0].index // perform paging from the begining of the list
case protocol.PagingInfo_BACKWARD:
cursor = list[len(list)-1].index // perform paging from the end of the list
}
}
foundIndex := findIndex(&msgList, cursor)
if foundIndex == -1 { // the cursor is not valid
return nil, &protocol.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction}
}
var retrievedPageSize, s, e int
var newCursor *protocol.Index // to be returned as part of the new paging info
switch dir {
case protocol.PagingInfo_FORWARD: // forward pagination
remainingMessages := len(msgList) - foundIndex - 1
// the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., msgList.len-foundIndex
retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages)
if initQuery {
foundIndex = foundIndex - 1
}
s = foundIndex + 1 // non inclusive
e = foundIndex + retrievedPageSize
newCursor = msgList[e].index // the new cursor points to the end of the page
case protocol.PagingInfo_BACKWARD: // backward pagination
remainingMessages := foundIndex
// the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., foundIndex-0
retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages)
if initQuery {
foundIndex = foundIndex + 1
}
s = foundIndex - retrievedPageSize
e = foundIndex - 1
newCursor = msgList[s].index // the new cursor points to the begining of the page
}
// retrieve the messages
for i := s; i <= e; i++ {
resMessages = append(resMessages, msgList[i])
}
resPagingInfo = &protocol.PagingInfo{PageSize: int64(retrievedPageSize), Cursor: newCursor, Direction: pinfo.Direction}
return
}
func paginateWithoutIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (resMessages []*protocol.WakuMessage, resPinfo *protocol.PagingInfo) {
// takes list, and performs paging based on pinfo
// returns the page i.e, a sequence of WakuMessage and the new paging info to be used for the next paging request
indexedData, updatedPagingInfo := paginateWithIndex(list, pinfo)
for _, indexedMsg := range indexedData {
resMessages = append(resMessages, indexedMsg.msg)
}
resPinfo = updatedPagingInfo
return
}
func contains(s []uint32, e uint32) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}
func (w *WakuStore) FindMessages(query *protocol.HistoryQuery) *protocol.HistoryResponse {
result := new(protocol.HistoryResponse)
// data holds IndexedWakuMessage whose topics match the query
var data []IndexedWakuMessage
for _, indexedMsg := range w.messages {
if contains(query.Topics, *indexedMsg.msg.ContentTopic) {
data = append(data, indexedMsg)
}
}
result.Messages, result.PagingInfo = paginateWithoutIndex(data, query.PagingInfo)
return result
}
type MessageProvider interface {
GetAll() ([]*protocol.WakuMessage, error)
}
type IndexedWakuMessage struct {
msg *protocol.WakuMessage
index *protocol.Index
}
type WakuStore struct {
msg chan *protocol.WakuMessage
messages []IndexedWakuMessage
msgProvider *MessageProvider
h host.Host
ctx context.Context
}
func NewWakuStore(ctx context.Context, h host.Host, msg chan *protocol.WakuMessage, p *MessageProvider) *WakuStore {
wakuStore := new(WakuStore)
wakuStore.msg = msg
wakuStore.msgProvider = p
wakuStore.h = h
wakuStore.ctx = ctx
h.SetStreamHandler(WakuStoreProtocolId, wakuStore.onRequest)
go wakuStore.processMessages()
// TODO: Load all messages
// proc onData(timestamp: uint64, msg: WakuMessage) =
// ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex()))
// let res = ws.store.getAll(onData)
return wakuStore
}
func (store *WakuStore) processMessages() {
for message := range store.msg {
index, err := computeIndex(message)
if err != nil {
log.Println(err)
continue
}
store.messages = append(store.messages, IndexedWakuMessage{msg: message, index: index})
if store.msgProvider == nil {
continue
}
// let res = store.msgProvider.put(index, msg) // TODO: store in the DB
}
}
func (store *WakuStore) onRequest(s network.Stream) {
defer s.Close()
historyRPCRequest := &protocol.HistoryRPC{}
buf, err := ioutil.ReadAll(s)
if err != nil {
s.Reset()
log.Println(err)
return
}
proto.Unmarshal(buf, historyRPCRequest)
if err != nil {
log.Println(err)
return
}
log.Printf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())
historyResponseRPC := &protocol.HistoryRPC{}
historyResponseRPC.RequestId = historyRPCRequest.RequestId
historyResponseRPC.Response = store.FindMessages(historyRPCRequest.Query)
// TODO: implement waku swap
message, err := proto.Marshal(historyResponseRPC)
if err != nil {
log.Println(err)
return
}
_, err = s.Write(message)
if err != nil {
log.Println(err)
s.Reset()
} else {
log.Printf("%s: Response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())
}
}
func computeIndex(msg *protocol.WakuMessage) (*protocol.Index, error) {
data, err := proto.Marshal(msg)
if err != nil {
return nil, err
}
digest := sha256.Sum256(data)
return &protocol.Index{
Digest: digest[:],
ReceivedTime: float64(time.Now().Unix()) / 1000000000,
}, nil
}
func indexComparison(x, y *protocol.Index) int {
// compares x and y
// returns 0 if they are equal
// returns -1 if x < y
// returns 1 if x > y
var timecmp int = 0 // TODO: ask waku team why Index ReceivedTime is is float?
if x.ReceivedTime > y.ReceivedTime {
timecmp = 1
} else if x.ReceivedTime < y.ReceivedTime {
timecmp = -1
}
digestcm := bytes.Compare(x.Digest, y.Digest)
if timecmp != 0 {
return timecmp // timestamp has a higher priority for comparison
}
return digestcm
}
func indexedWakuMessageComparison(x, y IndexedWakuMessage) int {
// compares x and y
// returns 0 if they are equal
// returns -1 if x < y
// returns 1 if x > y
return indexComparison(x.index, y.index)
}
func findIndex(msgList *[]IndexedWakuMessage, index *protocol.Index) int {
// returns the position of an IndexedWakuMessage in msgList whose index value matches the given index
// returns -1 if no match is found
for i, indexedWakuMessage := range *msgList {
if bytes.Compare(indexedWakuMessage.index.Digest, index.Digest) == 0 && indexedWakuMessage.index.ReceivedTime == index.ReceivedTime {
return i
}
}
return -1
}
func (store *WakuStore) AddPeer(p peer.ID, addrs []ma.Multiaddr) error {
for _, addr := range addrs {
store.h.Peerstore().AddAddr(p, addr, peerstore.PermanentAddrTTL)
}
err := store.h.Peerstore().AddProtocols(p, string(WakuStoreProtocolId))
if err != nil {
return err
}
return nil
}
func (store *WakuStore) selectPeer() *peer.ID {
// @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
// This will require us to check for various factors such as:
// - which topics they track
// - latency?
// - default store peer?
// Selects the best peer for a given protocol
var peers peer.IDSlice
for _, peer := range store.h.Peerstore().Peers() {
protocols, err := store.h.Peerstore().SupportsProtocols(peer, string(WakuStoreProtocolId))
if err != nil {
log.Println("error obtaining the protocols supported by peers", err)
return nil
}
if len(protocols) > 0 {
peers = append(peers, peer)
}
}
if len(peers) >= 1 {
// TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
return &peers[0]
}
return nil
}
var brHmacDrbgPool = sync.Pool{New: func() interface{} {
seed := make([]byte, 48)
_, err := rand.Read(seed)
if err != nil {
log.Fatal(err)
}
return hmacdrbg.NewHmacDrbg(256, seed, nil)
}}
func GenerateRequestId() string {
rng := brHmacDrbgPool.Get().(*hmacdrbg.HmacDrbg)
defer brHmacDrbgPool.Put(rng)
randData := make([]byte, 10)
if !rng.Generate(randData) {
//Reseed is required every 10,000 calls
seed := make([]byte, 48)
_, err := rand.Read(seed)
if err != nil {
log.Fatal(err)
}
err = rng.Reseed(seed)
if err != nil {
//only happens if seed < security-level
log.Fatal(err)
}
if !rng.Generate(randData) {
log.Fatal("could not generate random request id")
}
}
return hex.EncodeToString(randData)
}
func (store *WakuStore) query(q *protocol.HistoryQuery) (*protocol.HistoryResponse, error) {
peer := store.selectPeer()
if peer == nil {
return nil, errors.New("no suitable remote peers")
}
ctx, cancel := context.WithTimeout(store.ctx, ConnectionTimeout)
defer cancel()
connOpt, err := store.h.NewStream(ctx, *peer, WakuStoreProtocolId)
if err != nil {
log.Println("failed to connect to remote peer", err)
return nil, err
}
historyRequest := &protocol.HistoryRPC{Query: q, RequestId: GenerateRequestId()}
message, err := proto.Marshal(historyRequest)
if err != nil {
log.Println(err)
return nil, err
}
defer connOpt.Close()
defer connOpt.Reset()
_, err = connOpt.Write(message)
if err != nil {
log.Println(err)
return nil, err
}
buf, err := ioutil.ReadAll(connOpt)
if err != nil {
log.Println("failed to read response", err)
return nil, err
}
historyResponseRPC := &protocol.HistoryRPC{}
proto.Unmarshal(buf, historyResponseRPC)
if err != nil {
log.Println("failed to decode response", err)
return nil, err
}
return historyResponseRPC.Response, nil
}
// TODO: queryWithAccounting