From c4ed58c188987661533c3fbd3dedf4870fe4afd6 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 18 Mar 2021 12:40:47 -0400 Subject: [PATCH] Waku store - initial implementation --- go.mod | 4 +- go.sum | 2 + main.go | 20 +- waku/v2/node/peer_manager.go | 46 -- waku/v2/node/wakunode2.go | 40 +- waku/v2/protocol/waku_relay.go | 4 +- waku/v2/protocol/waku_store.pb.go | 537 ++++++++++++++++++++++ waku/v2/protocol/waku_store.proto | 36 ++ waku/v2/protocol/waku_store/waku_store.go | 419 +++++++++++++++++ 9 files changed, 1045 insertions(+), 63 deletions(-) delete mode 100644 waku/v2/node/peer_manager.go create mode 100644 waku/v2/protocol/waku_store.pb.go create mode 100644 waku/v2/protocol/waku_store.proto create mode 100644 waku/v2/protocol/waku_store/waku_store.go diff --git a/go.mod b/go.mod index eeddbc6f..36e2155f 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,9 @@ go 1.15 replace github.com/libp2p/go-libp2p-pubsub => github.com/status-im/go-libp2p-pubsub v0.4.1-customProtocols require ( + github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d github.com/ethereum/go-ethereum v1.10.1 + github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.4.3 github.com/ipfs/go-log/v2 v2.1.1 github.com/libp2p/go-libp2p v0.13.0 @@ -14,7 +16,7 @@ require ( github.com/minio/sha256-simd v0.1.1 github.com/multiformats/go-multiaddr v0.3.1 github.com/multiformats/go-multiaddr-net v0.2.0 - github.com/status-im/status-go/eth-node v1.1.0 // indirect + github.com/status-im/status-go/eth-node v1.1.0 github.com/status-im/status-go/extkeys v1.1.2 // indirect google.golang.org/protobuf v1.25.0 ) diff --git a/go.sum b/go.sum index fabd1d86..f5ed9a47 100644 --- a/go.sum +++ b/go.sum @@ -118,6 +118,8 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d h1:bE1UyBQ5aE6FjhNY4lbPtMqh7VDldoVkvZMtFEbd+CE= +github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d/go.mod h1:HAe1wsCrwH2uFnFaCC2vlcyEohnxs8KeShAFqGIHvmM= github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/main.go b/main.go index 5be96605..66330741 100644 --- a/main.go +++ b/main.go @@ -32,6 +32,7 @@ func main() { } wakuNode.MountRelay() + wakuNode.MountStore() sub, err := wakuNode.Subscribe(nil) if err != nil { @@ -40,18 +41,17 @@ func main() { // Read loop go func() { - for { - for value := range sub.C { - payload, err := node.DecodePayload(value, &node.KeyInfo{Kind: node.None}) - if err != nil { - fmt.Println(err) - return - } - - fmt.Println("Received message:", string(payload)) - // sub.Unsubscribe() + for value := range sub.C { + payload, err := node.DecodePayload(value, &node.KeyInfo{Kind: node.None}) + if err != nil { + fmt.Println(err) + return } + + fmt.Println("Received message:", string(payload)) + // sub.Unsubscribe() } + }() // Write loop diff --git a/waku/v2/node/peer_manager.go b/waku/v2/node/peer_manager.go deleted file mode 100644 index 8394e63d..00000000 --- a/waku/v2/node/peer_manager.go +++ /dev/null @@ -1,46 +0,0 @@ -package node - -import ( - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/peerstore" -) - -type Connectedness string - -const ( - // NotConnected: default state for a new peer. No connection and no further information on connectedness. - NotConnected Connectedness = "NotConnected" - // CannotConnect: attempted to connect to peer, but failed. - CannotConnect Connectedness = "CannotConnect" - // CanConnect: was recently connected to peer and disconnected gracefully. - CanConnect Connectedness = "CanConnect" - // Connected: actively connected to peer. - Connected Connectedness = "Connected" -) - -/* -type - ConnectionBook* = object of PeerBook[Connectedness] -*/ - -type WakuPeerStore struct { - connectionBook peerstore.Peerstore -} - -type PeerManager struct { - sw host.Host - peerStore *WakuPeerStore -} - -func NewWakuPeerStore() *WakuPeerStore { - p := new(WakuPeerStore) - return p -} - -func NewPeerManager(sw host.Host) *PeerManager { - peerStore := NewWakuPeerStore() - p := new(PeerManager) - p.sw = sw - p.peerStore = peerStore - return p -} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index f283dab4..03cd0984 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -14,11 +14,13 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr-net" "github.com/status-im/go-waku/waku/v2/protocol" + store "github.com/status-im/go-waku/waku/v2/protocol/waku_store" ) // Default clientId @@ -53,6 +55,7 @@ type WakuNode struct { // peerManager *PeerManager host host.Host pubsub *pubsub.PubSub + store *store.WakuStore topics map[Topic]*pubsub.Topic topicsLock sync.Mutex @@ -156,9 +159,38 @@ func (w *WakuNode) MountRelay() error { return nil } +func (w *WakuNode) MountStore() error { + sub, err := w.Subscribe(nil) + if err != nil { + return err + } + + // TODO: kill subscription on close + w.store = store.NewWakuStore(w.ctx, w.host, sub.C, new(store.MessageProvider)) // TODO: pass store + return nil +} + +func (w *WakuNode) AddStorePeer(address string) error { + if w.store == nil { + return errors.New("WakuStore is not set") + } + + storePeer, err := ma.NewMultiaddr(address) + if err != nil { + return err + } + + // Extract the peer ID from the multiaddr. + info, err := peer.AddrInfoFromP2pAddr(storePeer) + if err != nil { + return err + } + + return w.store.AddPeer(info.ID, info.Addrs) +} + func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { - // Subscribes to a PubSub topic. Triggers handler when receiving messages on - // this topic. TopicHandler is a method that takes a topic and some data. + // Subscribes to a PubSub topic. // NOTE The data field SHOULD be decoded as a WakuMessage. if node.pubsub == nil { @@ -195,10 +227,10 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { return case <-nextMsgTicker.C: msg, err := sub.Next(ctx) - if err != nil { fmt.Println("Error receiving message", err) - return // Should close channel? + close(subscription.quit) + return } wakuMessage := &protocol.WakuMessage{} diff --git a/waku/v2/protocol/waku_relay.go b/waku/v2/protocol/waku_relay.go index 0f9258f1..cd0898f6 100644 --- a/waku/v2/protocol/waku_relay.go +++ b/waku/v2/protocol/waku_relay.go @@ -12,7 +12,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" ) -const WakuRelayCodec = libp2pProtocol.ID("/vac/waku/relay/2.0.0-beta2") +const WakuRelayProtocol = libp2pProtocol.ID("/vac/waku/relay/2.0.0-beta2") type WakuRelay struct { p *pubsub.PubSub @@ -24,7 +24,7 @@ func NewWakuRelay(ctx context.Context, h host.Host, opts ...pubsub.Option) (*pub //opts = append(opts, pubsub.WithMessageIdFn(messageIdFn)) opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) - gossipSub, err := pubsub.NewGossipSub(ctx, h, []libp2pProtocol.ID{WakuRelayCodec}, opts...) + gossipSub, err := pubsub.NewGossipSub(ctx, h, []libp2pProtocol.ID{WakuRelayProtocol}, opts...) if err != nil { return nil, err diff --git a/waku/v2/protocol/waku_store.pb.go b/waku/v2/protocol/waku_store.pb.go new file mode 100644 index 00000000..8d2493a9 --- /dev/null +++ b/waku/v2/protocol/waku_store.pb.go @@ -0,0 +1,537 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0-devel +// protoc v3.14.0 +// source: waku_store.proto + +package protocol + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type PagingInfo_Direction int32 + +const ( + PagingInfo_FORWARD PagingInfo_Direction = 0 + PagingInfo_BACKWARD PagingInfo_Direction = 1 +) + +// Enum value maps for PagingInfo_Direction. +var ( + PagingInfo_Direction_name = map[int32]string{ + 0: "FORWARD", + 1: "BACKWARD", + } + PagingInfo_Direction_value = map[string]int32{ + "FORWARD": 0, + "BACKWARD": 1, + } +) + +func (x PagingInfo_Direction) Enum() *PagingInfo_Direction { + p := new(PagingInfo_Direction) + *p = x + return p +} + +func (x PagingInfo_Direction) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (PagingInfo_Direction) Descriptor() protoreflect.EnumDescriptor { + return file_waku_store_proto_enumTypes[0].Descriptor() +} + +func (PagingInfo_Direction) Type() protoreflect.EnumType { + return &file_waku_store_proto_enumTypes[0] +} + +func (x PagingInfo_Direction) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use PagingInfo_Direction.Descriptor instead. +func (PagingInfo_Direction) EnumDescriptor() ([]byte, []int) { + return file_waku_store_proto_rawDescGZIP(), []int{1, 0} +} + +type Index struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Digest []byte `protobuf:"bytes,1,opt,name=digest,proto3" json:"digest,omitempty"` + ReceivedTime float64 `protobuf:"fixed64,2,opt,name=receivedTime,proto3" json:"receivedTime,omitempty"` +} + +func (x *Index) Reset() { + *x = Index{} + if protoimpl.UnsafeEnabled { + mi := &file_waku_store_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Index) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Index) ProtoMessage() {} + +func (x *Index) ProtoReflect() protoreflect.Message { + mi := &file_waku_store_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Index.ProtoReflect.Descriptor instead. +func (*Index) Descriptor() ([]byte, []int) { + return file_waku_store_proto_rawDescGZIP(), []int{0} +} + +func (x *Index) GetDigest() []byte { + if x != nil { + return x.Digest + } + return nil +} + +func (x *Index) GetReceivedTime() float64 { + if x != nil { + return x.ReceivedTime + } + return 0 +} + +type PagingInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + PageSize int64 `protobuf:"varint,1,opt,name=pageSize,proto3" json:"pageSize,omitempty"` + Cursor *Index `protobuf:"bytes,2,opt,name=cursor,proto3" json:"cursor,omitempty"` + Direction PagingInfo_Direction `protobuf:"varint,3,opt,name=direction,proto3,enum=protocol.PagingInfo_Direction" json:"direction,omitempty"` +} + +func (x *PagingInfo) Reset() { + *x = PagingInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_waku_store_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PagingInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PagingInfo) ProtoMessage() {} + +func (x *PagingInfo) ProtoReflect() protoreflect.Message { + mi := &file_waku_store_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PagingInfo.ProtoReflect.Descriptor instead. +func (*PagingInfo) Descriptor() ([]byte, []int) { + return file_waku_store_proto_rawDescGZIP(), []int{1} +} + +func (x *PagingInfo) GetPageSize() int64 { + if x != nil { + return x.PageSize + } + return 0 +} + +func (x *PagingInfo) GetCursor() *Index { + if x != nil { + return x.Cursor + } + return nil +} + +func (x *PagingInfo) GetDirection() PagingInfo_Direction { + if x != nil { + return x.Direction + } + return PagingInfo_FORWARD +} + +type HistoryQuery struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Topics []uint32 `protobuf:"varint,2,rep,packed,name=topics,proto3" json:"topics,omitempty"` + PagingInfo *PagingInfo `protobuf:"bytes,3,opt,name=pagingInfo,proto3,oneof" json:"pagingInfo,omitempty"` // used for pagination +} + +func (x *HistoryQuery) Reset() { + *x = HistoryQuery{} + if protoimpl.UnsafeEnabled { + mi := &file_waku_store_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HistoryQuery) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HistoryQuery) ProtoMessage() {} + +func (x *HistoryQuery) ProtoReflect() protoreflect.Message { + mi := &file_waku_store_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HistoryQuery.ProtoReflect.Descriptor instead. +func (*HistoryQuery) Descriptor() ([]byte, []int) { + return file_waku_store_proto_rawDescGZIP(), []int{2} +} + +func (x *HistoryQuery) GetTopics() []uint32 { + if x != nil { + return x.Topics + } + return nil +} + +func (x *HistoryQuery) GetPagingInfo() *PagingInfo { + if x != nil { + return x.PagingInfo + } + return nil +} + +type HistoryResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Messages []*WakuMessage `protobuf:"bytes,2,rep,name=messages,proto3" json:"messages,omitempty"` + PagingInfo *PagingInfo `protobuf:"bytes,3,opt,name=pagingInfo,proto3,oneof" json:"pagingInfo,omitempty"` // used for pagination +} + +func (x *HistoryResponse) Reset() { + *x = HistoryResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_waku_store_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HistoryResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HistoryResponse) ProtoMessage() {} + +func (x *HistoryResponse) ProtoReflect() protoreflect.Message { + mi := &file_waku_store_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HistoryResponse.ProtoReflect.Descriptor instead. +func (*HistoryResponse) Descriptor() ([]byte, []int) { + return file_waku_store_proto_rawDescGZIP(), []int{3} +} + +func (x *HistoryResponse) GetMessages() []*WakuMessage { + if x != nil { + return x.Messages + } + return nil +} + +func (x *HistoryResponse) GetPagingInfo() *PagingInfo { + if x != nil { + return x.PagingInfo + } + return nil +} + +type HistoryRPC struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + Query *HistoryQuery `protobuf:"bytes,2,opt,name=query,proto3" json:"query,omitempty"` + Response *HistoryResponse `protobuf:"bytes,3,opt,name=response,proto3" json:"response,omitempty"` +} + +func (x *HistoryRPC) Reset() { + *x = HistoryRPC{} + if protoimpl.UnsafeEnabled { + mi := &file_waku_store_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HistoryRPC) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HistoryRPC) ProtoMessage() {} + +func (x *HistoryRPC) ProtoReflect() protoreflect.Message { + mi := &file_waku_store_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HistoryRPC.ProtoReflect.Descriptor instead. +func (*HistoryRPC) Descriptor() ([]byte, []int) { + return file_waku_store_proto_rawDescGZIP(), []int{4} +} + +func (x *HistoryRPC) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + +func (x *HistoryRPC) GetQuery() *HistoryQuery { + if x != nil { + return x.Query + } + return nil +} + +func (x *HistoryRPC) GetResponse() *HistoryResponse { + if x != nil { + return x.Response + } + return nil +} + +var File_waku_store_proto protoreflect.FileDescriptor + +var file_waku_store_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x77, 0x61, 0x6b, 0x75, 0x5f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x1a, 0x12, 0x77, 0x61, + 0x6b, 0x75, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x22, 0x43, 0x0a, 0x05, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x16, 0x0a, 0x06, 0x64, 0x69, 0x67, + 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x64, 0x69, 0x67, 0x65, 0x73, + 0x74, 0x12, 0x22, 0x0a, 0x0c, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, 0x54, 0x69, 0x6d, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x01, 0x52, 0x0c, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x64, 0x54, 0x69, 0x6d, 0x65, 0x22, 0xb7, 0x01, 0x0a, 0x0a, 0x50, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x67, 0x65, 0x53, 0x69, 0x7a, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x70, 0x61, 0x67, 0x65, 0x53, 0x69, 0x7a, 0x65, + 0x12, 0x27, 0x0a, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x49, 0x6e, 0x64, 0x65, + 0x78, 0x52, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x3c, 0x0a, 0x09, 0x64, 0x69, 0x72, + 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1e, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x50, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x49, 0x6e, + 0x66, 0x6f, 0x2e, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x64, 0x69, + 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x26, 0x0a, 0x09, 0x44, 0x69, 0x72, 0x65, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x4f, 0x52, 0x57, 0x41, 0x52, 0x44, 0x10, + 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x42, 0x41, 0x43, 0x4b, 0x57, 0x41, 0x52, 0x44, 0x10, 0x01, 0x22, + 0x70, 0x0a, 0x0c, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, + 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0d, 0x52, + 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x70, 0x61, 0x67, 0x69, 0x6e, + 0x67, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x50, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x49, 0x6e, 0x66, + 0x6f, 0x48, 0x00, 0x52, 0x0a, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x49, 0x6e, 0x66, 0x6f, 0x88, + 0x01, 0x01, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x49, 0x6e, 0x66, + 0x6f, 0x22, 0x8e, 0x01, 0x0a, 0x0f, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x31, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, + 0x6f, 0x6c, 0x2e, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x70, 0x61, 0x67, 0x69, + 0x6e, 0x67, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x50, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x49, 0x6e, + 0x66, 0x6f, 0x48, 0x00, 0x52, 0x0a, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x49, 0x6e, 0x66, 0x6f, + 0x88, 0x01, 0x01, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x49, 0x6e, + 0x66, 0x6f, 0x22, 0x90, 0x01, 0x0a, 0x0a, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x52, 0x50, + 0x43, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, + 0x12, 0x2c, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x48, 0x69, 0x73, 0x74, 0x6f, + 0x72, 0x79, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x35, + 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x48, 0x69, 0x73, 0x74, + 0x6f, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x52, 0x08, 0x72, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_waku_store_proto_rawDescOnce sync.Once + file_waku_store_proto_rawDescData = file_waku_store_proto_rawDesc +) + +func file_waku_store_proto_rawDescGZIP() []byte { + file_waku_store_proto_rawDescOnce.Do(func() { + file_waku_store_proto_rawDescData = protoimpl.X.CompressGZIP(file_waku_store_proto_rawDescData) + }) + return file_waku_store_proto_rawDescData +} + +var file_waku_store_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_waku_store_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_waku_store_proto_goTypes = []interface{}{ + (PagingInfo_Direction)(0), // 0: protocol.PagingInfo.Direction + (*Index)(nil), // 1: protocol.Index + (*PagingInfo)(nil), // 2: protocol.PagingInfo + (*HistoryQuery)(nil), // 3: protocol.HistoryQuery + (*HistoryResponse)(nil), // 4: protocol.HistoryResponse + (*HistoryRPC)(nil), // 5: protocol.HistoryRPC + (*WakuMessage)(nil), // 6: protocol.WakuMessage +} +var file_waku_store_proto_depIdxs = []int32{ + 1, // 0: protocol.PagingInfo.cursor:type_name -> protocol.Index + 0, // 1: protocol.PagingInfo.direction:type_name -> protocol.PagingInfo.Direction + 2, // 2: protocol.HistoryQuery.pagingInfo:type_name -> protocol.PagingInfo + 6, // 3: protocol.HistoryResponse.messages:type_name -> protocol.WakuMessage + 2, // 4: protocol.HistoryResponse.pagingInfo:type_name -> protocol.PagingInfo + 3, // 5: protocol.HistoryRPC.query:type_name -> protocol.HistoryQuery + 4, // 6: protocol.HistoryRPC.response:type_name -> protocol.HistoryResponse + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name +} + +func init() { file_waku_store_proto_init() } +func file_waku_store_proto_init() { + if File_waku_store_proto != nil { + return + } + file_waku_message_proto_init() + if !protoimpl.UnsafeEnabled { + file_waku_store_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Index); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_waku_store_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PagingInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_waku_store_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HistoryQuery); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_waku_store_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HistoryResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_waku_store_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HistoryRPC); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_waku_store_proto_msgTypes[2].OneofWrappers = []interface{}{} + file_waku_store_proto_msgTypes[3].OneofWrappers = []interface{}{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_waku_store_proto_rawDesc, + NumEnums: 1, + NumMessages: 5, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_waku_store_proto_goTypes, + DependencyIndexes: file_waku_store_proto_depIdxs, + EnumInfos: file_waku_store_proto_enumTypes, + MessageInfos: file_waku_store_proto_msgTypes, + }.Build() + File_waku_store_proto = out.File + file_waku_store_proto_rawDesc = nil + file_waku_store_proto_goTypes = nil + file_waku_store_proto_depIdxs = nil +} diff --git a/waku/v2/protocol/waku_store.proto b/waku/v2/protocol/waku_store.proto new file mode 100644 index 00000000..96d299c0 --- /dev/null +++ b/waku/v2/protocol/waku_store.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; + +package protocol; + +import "waku_message.proto"; + +message Index { + bytes digest = 1; + double receivedTime = 2; +} + +message PagingInfo { + int64 pageSize = 1; + Index cursor = 2; + enum Direction { + FORWARD = 0; + BACKWARD = 1; + } + Direction direction = 3; +} + +message HistoryQuery { + repeated uint32 topics = 2; + optional PagingInfo pagingInfo = 3; // used for pagination +} + +message HistoryResponse { + repeated WakuMessage messages = 2; + optional PagingInfo pagingInfo = 3; // used for pagination +} + +message HistoryRPC { + string request_id = 1; + HistoryQuery query = 2; + HistoryResponse response = 3; +} \ No newline at end of file diff --git a/waku/v2/protocol/waku_store/waku_store.go b/waku/v2/protocol/waku_store/waku_store.go new file mode 100644 index 00000000..9d5ea42f --- /dev/null +++ b/waku/v2/protocol/waku_store/waku_store.go @@ -0,0 +1,419 @@ +package store + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "errors" + "io/ioutil" + "log" + "sort" + "sync" + "time" + + "github.com/cruxic/go-hmac-drbg/hmacdrbg" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" + ma "github.com/multiformats/go-multiaddr" + "github.com/status-im/go-waku/waku/v2/protocol" + "google.golang.org/protobuf/proto" +) + +const WakuStoreProtocolId = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta2") +const MaxPageSize = 100 // Maximum number of waku messages in each page +const ConnectionTimeout = 10 * time.Second + +func minOf(vars ...int) int { + min := vars[0] + + for _, i := range vars { + if min > i { + min = i + } + } + + return min +} + +func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (resMessages []IndexedWakuMessage, resPagingInfo *protocol.PagingInfo) { + // takes list, and performs paging based on pinfo + // returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request + cursor := pinfo.Cursor + pageSize := pinfo.PageSize + dir := pinfo.Direction + + if pageSize == 0 { // pageSize being zero indicates that no pagination is required + return list, pinfo + } + + if len(list) == 0 { // no pagination is needed for an empty list + return list, &protocol.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction} + } + + msgList := make([]IndexedWakuMessage, len(list)) + _ = copy(msgList, list) // makes a copy of the list + + sort.Slice(msgList, func(i, j int) bool { // sorts msgList based on the custom comparison proc indexedWakuMessageComparison + return indexedWakuMessageComparison(msgList[i], msgList[j]) == -1 + }) + + initQuery := false + if cursor == nil { + initQuery = true // an empty cursor means it is an initial query + switch dir { + case protocol.PagingInfo_FORWARD: + cursor = list[0].index // perform paging from the begining of the list + case protocol.PagingInfo_BACKWARD: + cursor = list[len(list)-1].index // perform paging from the end of the list + } + } + + foundIndex := findIndex(&msgList, cursor) + if foundIndex == -1 { // the cursor is not valid + return nil, &protocol.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction} + } + + var retrievedPageSize, s, e int + var newCursor *protocol.Index // to be returned as part of the new paging info + switch dir { + case protocol.PagingInfo_FORWARD: // forward pagination + remainingMessages := len(msgList) - foundIndex - 1 + // the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., msgList.len-foundIndex + retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages) + if initQuery { + foundIndex = foundIndex - 1 + } + s = foundIndex + 1 // non inclusive + e = foundIndex + retrievedPageSize + newCursor = msgList[e].index // the new cursor points to the end of the page + case protocol.PagingInfo_BACKWARD: // backward pagination + remainingMessages := foundIndex + // the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., foundIndex-0 + retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages) + if initQuery { + foundIndex = foundIndex + 1 + } + s = foundIndex - retrievedPageSize + e = foundIndex - 1 + newCursor = msgList[s].index // the new cursor points to the begining of the page + } + + // retrieve the messages + for i := s; i <= e; i++ { + resMessages = append(resMessages, msgList[i]) + } + resPagingInfo = &protocol.PagingInfo{PageSize: int64(retrievedPageSize), Cursor: newCursor, Direction: pinfo.Direction} + + return +} + +func paginateWithoutIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (resMessages []*protocol.WakuMessage, resPinfo *protocol.PagingInfo) { + // takes list, and performs paging based on pinfo + // returns the page i.e, a sequence of WakuMessage and the new paging info to be used for the next paging request + indexedData, updatedPagingInfo := paginateWithIndex(list, pinfo) + for _, indexedMsg := range indexedData { + resMessages = append(resMessages, indexedMsg.msg) + } + resPinfo = updatedPagingInfo + return +} + +func contains(s []uint32, e uint32) bool { + for _, a := range s { + if a == e { + return true + } + } + return false +} + +func (w *WakuStore) FindMessages(query *protocol.HistoryQuery) *protocol.HistoryResponse { + result := new(protocol.HistoryResponse) + // data holds IndexedWakuMessage whose topics match the query + var data []IndexedWakuMessage + for _, indexedMsg := range w.messages { + if contains(query.Topics, *indexedMsg.msg.ContentTopic) { + data = append(data, indexedMsg) + } + } + + result.Messages, result.PagingInfo = paginateWithoutIndex(data, query.PagingInfo) + return result +} + +type MessageProvider interface { + GetAll() ([]*protocol.WakuMessage, error) +} + +type IndexedWakuMessage struct { + msg *protocol.WakuMessage + index *protocol.Index +} + +type WakuStore struct { + msg chan *protocol.WakuMessage + messages []IndexedWakuMessage + msgProvider *MessageProvider + h host.Host + ctx context.Context +} + +func NewWakuStore(ctx context.Context, h host.Host, msg chan *protocol.WakuMessage, p *MessageProvider) *WakuStore { + wakuStore := new(WakuStore) + wakuStore.msg = msg + wakuStore.msgProvider = p + wakuStore.h = h + wakuStore.ctx = ctx + + h.SetStreamHandler(WakuStoreProtocolId, wakuStore.onRequest) + + go wakuStore.processMessages() + + // TODO: Load all messages + // proc onData(timestamp: uint64, msg: WakuMessage) = + // ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex())) + // let res = ws.store.getAll(onData) + + return wakuStore +} + +func (store *WakuStore) processMessages() { + for message := range store.msg { + + index, err := computeIndex(message) + if err != nil { + log.Println(err) + continue + } + + store.messages = append(store.messages, IndexedWakuMessage{msg: message, index: index}) + + if store.msgProvider == nil { + continue + } + + // let res = store.msgProvider.put(index, msg) // TODO: store in the DB + } +} + +func (store *WakuStore) onRequest(s network.Stream) { + defer s.Close() + + historyRPCRequest := &protocol.HistoryRPC{} + buf, err := ioutil.ReadAll(s) + if err != nil { + s.Reset() + log.Println(err) + return + } + + proto.Unmarshal(buf, historyRPCRequest) + if err != nil { + log.Println(err) + return + } + + log.Printf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer()) + + historyResponseRPC := &protocol.HistoryRPC{} + historyResponseRPC.RequestId = historyRPCRequest.RequestId + historyResponseRPC.Response = store.FindMessages(historyRPCRequest.Query) + + // TODO: implement waku swap + + message, err := proto.Marshal(historyResponseRPC) + if err != nil { + log.Println(err) + return + } + + _, err = s.Write(message) + if err != nil { + log.Println(err) + s.Reset() + } else { + log.Printf("%s: Response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String()) + } +} + +func computeIndex(msg *protocol.WakuMessage) (*protocol.Index, error) { + data, err := proto.Marshal(msg) + if err != nil { + return nil, err + } + digest := sha256.Sum256(data) + return &protocol.Index{ + Digest: digest[:], + ReceivedTime: float64(time.Now().Unix()) / 1000000000, + }, nil +} + +func indexComparison(x, y *protocol.Index) int { + // compares x and y + // returns 0 if they are equal + // returns -1 if x < y + // returns 1 if x > y + + var timecmp int = 0 // TODO: ask waku team why Index ReceivedTime is is float? + if x.ReceivedTime > y.ReceivedTime { + timecmp = 1 + } else if x.ReceivedTime < y.ReceivedTime { + timecmp = -1 + } + + digestcm := bytes.Compare(x.Digest, y.Digest) + if timecmp != 0 { + return timecmp // timestamp has a higher priority for comparison + } + + return digestcm +} + +func indexedWakuMessageComparison(x, y IndexedWakuMessage) int { + // compares x and y + // returns 0 if they are equal + // returns -1 if x < y + // returns 1 if x > y + return indexComparison(x.index, y.index) +} + +func findIndex(msgList *[]IndexedWakuMessage, index *protocol.Index) int { + // returns the position of an IndexedWakuMessage in msgList whose index value matches the given index + // returns -1 if no match is found + for i, indexedWakuMessage := range *msgList { + if bytes.Compare(indexedWakuMessage.index.Digest, index.Digest) == 0 && indexedWakuMessage.index.ReceivedTime == index.ReceivedTime { + return i + } + } + return -1 +} + +func (store *WakuStore) AddPeer(p peer.ID, addrs []ma.Multiaddr) error { + for _, addr := range addrs { + store.h.Peerstore().AddAddr(p, addr, peerstore.PermanentAddrTTL) + } + err := store.h.Peerstore().AddProtocols(p, string(WakuStoreProtocolId)) + if err != nil { + return err + } + return nil +} + +func (store *WakuStore) selectPeer() *peer.ID { + // @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. + // Ideally depending on the query and our set of peers we take a subset of ideal peers. + // This will require us to check for various factors such as: + // - which topics they track + // - latency? + // - default store peer? + + // Selects the best peer for a given protocol + var peers peer.IDSlice + for _, peer := range store.h.Peerstore().Peers() { + protocols, err := store.h.Peerstore().SupportsProtocols(peer, string(WakuStoreProtocolId)) + if err != nil { + log.Println("error obtaining the protocols supported by peers", err) + return nil + } + + if len(protocols) > 0 { + peers = append(peers, peer) + } + } + + if len(peers) >= 1 { + // TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned + return &peers[0] + } + + return nil +} + +var brHmacDrbgPool = sync.Pool{New: func() interface{} { + seed := make([]byte, 48) + _, err := rand.Read(seed) + if err != nil { + log.Fatal(err) + } + return hmacdrbg.NewHmacDrbg(256, seed, nil) +}} + +func GenerateRequestId() string { + rng := brHmacDrbgPool.Get().(*hmacdrbg.HmacDrbg) + defer brHmacDrbgPool.Put(rng) + + randData := make([]byte, 10) + if !rng.Generate(randData) { + //Reseed is required every 10,000 calls + seed := make([]byte, 48) + _, err := rand.Read(seed) + if err != nil { + log.Fatal(err) + } + err = rng.Reseed(seed) + if err != nil { + //only happens if seed < security-level + log.Fatal(err) + } + + if !rng.Generate(randData) { + log.Fatal("could not generate random request id") + } + } + return hex.EncodeToString(randData) +} + +func (store *WakuStore) query(q *protocol.HistoryQuery) (*protocol.HistoryResponse, error) { + peer := store.selectPeer() + if peer == nil { + return nil, errors.New("no suitable remote peers") + } + + ctx, cancel := context.WithTimeout(store.ctx, ConnectionTimeout) + defer cancel() + + connOpt, err := store.h.NewStream(ctx, *peer, WakuStoreProtocolId) + if err != nil { + log.Println("failed to connect to remote peer", err) + return nil, err + } + + historyRequest := &protocol.HistoryRPC{Query: q, RequestId: GenerateRequestId()} + + message, err := proto.Marshal(historyRequest) + if err != nil { + log.Println(err) + return nil, err + } + + defer connOpt.Close() + defer connOpt.Reset() + + _, err = connOpt.Write(message) + if err != nil { + log.Println(err) + return nil, err + } + + buf, err := ioutil.ReadAll(connOpt) + if err != nil { + log.Println("failed to read response", err) + return nil, err + } + + historyResponseRPC := &protocol.HistoryRPC{} + proto.Unmarshal(buf, historyResponseRPC) + if err != nil { + log.Println("failed to decode response", err) + return nil, err + } + + return historyResponseRPC.Response, nil +} + +// TODO: queryWithAccounting