fix: nim-waku interop for store

This commit is contained in:
Richard Ramos 2021-04-07 17:16:29 -04:00
parent f83423facd
commit 9c224c1849
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
9 changed files with 1944 additions and 645 deletions

View File

@ -3,6 +3,7 @@ package cmd
import (
"context"
"crypto/rand"
"encoding/binary"
"encoding/hex"
"fmt"
"net"
@ -38,11 +39,9 @@ func (dbStore *DBStore) Put(message *protocol.WakuMessage) error {
func (dbStore *DBStore) GetAll() ([]*protocol.WakuMessage, error) {
fmt.Println("TODO: Implement MessageProvider.GetAll. Returning a sample message")
exampleMessage := new(protocol.WakuMessage)
var contentTopic uint32 = 1
var version uint32 = 0
exampleMessage.ContentTopic = &contentTopic
exampleMessage.ContentTopic = 1
exampleMessage.Payload = []byte("Hello!")
exampleMessage.Version = &version
exampleMessage.Version = 0
return []*protocol.WakuMessage{exampleMessage}, nil
}
@ -112,7 +111,9 @@ var rootCmd = &cobra.Command{
return
}
response, err := wakuNode.Query(1, true, 10)
var DefaultContentTopic uint32 = binary.LittleEndian.Uint32([]byte("dingpu"))
response, err := wakuNode.Query(DefaultContentTopic, true, 10)
if err != nil {
fmt.Println(err)
return

4
go.mod
View File

@ -7,10 +7,10 @@ require (
github.com/ethereum/go-ethereum v1.9.5
github.com/golang/protobuf v1.4.1
github.com/ipfs/go-log v1.0.4
github.com/ipfs/go-log/v2 v2.1.1
github.com/libp2p/go-libp2p v0.13.0
github.com/libp2p/go-libp2p-connmgr v0.2.4
github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-msgio v0.0.6
github.com/magiconair/properties v1.8.4 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/multiformats/go-multiaddr v0.3.1
@ -24,6 +24,6 @@ require (
github.com/status-im/go-wakurelay-pubsub v0.4.2
golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e // indirect
golang.org/x/text v0.3.5 // indirect
google.golang.org/protobuf v1.25.0
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/ini.v1 v1.62.0 // indirect
)

View File

@ -87,7 +87,7 @@ func (payload Payload) Encode(version uint32) ([]byte, error) {
}
func DecodePayload(message *protocol.WakuMessage, keyInfo *KeyInfo) (*DecodedPayload, error) {
switch *message.Version {
switch message.Version {
case uint32(0):
return &DecodedPayload{Data: message.Payload}, nil
case uint32(1):

View File

@ -196,7 +196,7 @@ func (w *WakuNode) AddStorePeer(address string) error {
return w.store.AddPeer(info.ID, info.Addrs)
}
func (w *WakuNode) Query(contentTopic uint32, asc bool, pageSize int64) (*protocol.HistoryResponse, error) {
func (w *WakuNode) Query(contentTopic uint32, asc bool, pageSize uint64) (*protocol.HistoryResponse, error) {
if w.store == nil {
return nil, errors.New("WakuStore is not set")
}

View File

@ -1,192 +1,481 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.25.0
// protoc v3.14.0
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: waku_message.proto
package protocol
import (
encoding_binary "encoding/binary"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
io "io"
math "math"
math_bits "math/bits"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion that a sufficiently up-to-date version
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type WakuMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3,oneof" json:"payload,omitempty"`
ContentTopic *uint32 `protobuf:"varint,2,opt,name=contentTopic,proto3,oneof" json:"contentTopic,omitempty"`
Version *uint32 `protobuf:"varint,3,opt,name=version,proto3,oneof" json:"version,omitempty"`
Proof []byte `protobuf:"bytes,4,opt,name=proof,proto3,oneof" json:"proof,omitempty"`
Timestamp *float64 `protobuf:"fixed64,5,opt,name=timestamp,proto3,oneof" json:"timestamp,omitempty"`
Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
ContentTopic uint32 `protobuf:"varint,2,opt,name=contentTopic,proto3" json:"contentTopic,omitempty"`
Version uint32 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"`
Proof []byte `protobuf:"bytes,4,opt,name=proof,proto3" json:"proof,omitempty"`
Timestamp float64 `protobuf:"fixed64,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (x *WakuMessage) Reset() {
*x = WakuMessage{}
if protoimpl.UnsafeEnabled {
mi := &file_waku_message_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *WakuMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*WakuMessage) ProtoMessage() {}
func (x *WakuMessage) ProtoReflect() protoreflect.Message {
mi := &file_waku_message_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use WakuMessage.ProtoReflect.Descriptor instead.
func (m *WakuMessage) Reset() { *m = WakuMessage{} }
func (m *WakuMessage) String() string { return proto.CompactTextString(m) }
func (*WakuMessage) ProtoMessage() {}
func (*WakuMessage) Descriptor() ([]byte, []int) {
return file_waku_message_proto_rawDescGZIP(), []int{0}
return fileDescriptor_6f0a20862b3bf714, []int{0}
}
func (m *WakuMessage) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *WakuMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_WakuMessage.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *WakuMessage) XXX_Merge(src proto.Message) {
xxx_messageInfo_WakuMessage.Merge(m, src)
}
func (m *WakuMessage) XXX_Size() int {
return m.Size()
}
func (m *WakuMessage) XXX_DiscardUnknown() {
xxx_messageInfo_WakuMessage.DiscardUnknown(m)
}
func (x *WakuMessage) GetPayload() []byte {
if x != nil {
return x.Payload
var xxx_messageInfo_WakuMessage proto.InternalMessageInfo
func (m *WakuMessage) GetPayload() []byte {
if m != nil {
return m.Payload
}
return nil
}
func (x *WakuMessage) GetContentTopic() uint32 {
if x != nil && x.ContentTopic != nil {
return *x.ContentTopic
func (m *WakuMessage) GetContentTopic() uint32 {
if m != nil {
return m.ContentTopic
}
return 0
}
func (x *WakuMessage) GetVersion() uint32 {
if x != nil && x.Version != nil {
return *x.Version
func (m *WakuMessage) GetVersion() uint32 {
if m != nil {
return m.Version
}
return 0
}
func (x *WakuMessage) GetProof() []byte {
if x != nil {
return x.Proof
func (m *WakuMessage) GetProof() []byte {
if m != nil {
return m.Proof
}
return nil
}
func (x *WakuMessage) GetTimestamp() float64 {
if x != nil && x.Timestamp != nil {
return *x.Timestamp
func (m *WakuMessage) GetTimestamp() float64 {
if m != nil {
return m.Timestamp
}
return 0
}
var File_waku_message_proto protoreflect.FileDescriptor
func init() {
proto.RegisterType((*WakuMessage)(nil), "protocol.WakuMessage")
}
var file_waku_message_proto_rawDesc = []byte{
0x0a, 0x12, 0x77, 0x61, 0x6b, 0x75, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x22, 0xf3,
0x01, 0x0a, 0x0b, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1d,
0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x48,
0x00, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x88, 0x01, 0x01, 0x12, 0x27, 0x0a,
0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20,
0x01, 0x28, 0x0d, 0x48, 0x01, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f,
0x70, 0x69, 0x63, 0x88, 0x01, 0x01, 0x12, 0x1d, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f,
0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x02, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69,
0x6f, 0x6e, 0x88, 0x01, 0x01, 0x12, 0x19, 0x0a, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x04,
0x20, 0x01, 0x28, 0x0c, 0x48, 0x03, 0x52, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x88, 0x01, 0x01,
0x12, 0x21, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x05, 0x20,
0x01, 0x28, 0x01, 0x48, 0x04, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70,
0x88, 0x01, 0x01, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x42,
0x0f, 0x0a, 0x0d, 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63,
0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x08, 0x0a, 0x06,
0x5f, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
func init() { proto.RegisterFile("waku_message.proto", fileDescriptor_6f0a20862b3bf714) }
var fileDescriptor_6f0a20862b3bf714 = []byte{
// 183 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2a, 0x4f, 0xcc, 0x2e,
0x8d, 0xcf, 0x4d, 0x2d, 0x2e, 0x4e, 0x4c, 0x4f, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2,
0x00, 0x53, 0xc9, 0xf9, 0x39, 0x4a, 0x33, 0x19, 0xb9, 0xb8, 0xc3, 0x13, 0xb3, 0x4b, 0x7d, 0x21,
0xf2, 0x42, 0x12, 0x5c, 0xec, 0x05, 0x89, 0x95, 0x39, 0xf9, 0x89, 0x29, 0x12, 0x8c, 0x0a, 0x8c,
0x1a, 0x3c, 0x41, 0x30, 0xae, 0x90, 0x12, 0x17, 0x4f, 0x72, 0x7e, 0x5e, 0x49, 0x6a, 0x5e, 0x49,
0x48, 0x7e, 0x41, 0x66, 0xb2, 0x04, 0x93, 0x02, 0xa3, 0x06, 0x6f, 0x10, 0x8a, 0x18, 0x48, 0x77,
0x59, 0x6a, 0x51, 0x71, 0x66, 0x7e, 0x9e, 0x04, 0x33, 0x58, 0x1a, 0xc6, 0x15, 0x12, 0xe1, 0x62,
0x2d, 0x28, 0xca, 0xcf, 0x4f, 0x93, 0x60, 0x01, 0x9b, 0x0a, 0xe1, 0x08, 0xc9, 0x70, 0x71, 0x96,
0x64, 0xe6, 0xa6, 0x16, 0x97, 0x24, 0xe6, 0x16, 0x48, 0xb0, 0x2a, 0x30, 0x6a, 0x30, 0x06, 0x21,
0x04, 0x9c, 0x04, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6,
0x19, 0x8f, 0xe5, 0x18, 0x92, 0xd8, 0xc0, 0xee, 0x36, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x54,
0xc5, 0x11, 0x54, 0xd4, 0x00, 0x00, 0x00,
}
func (m *WakuMessage) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *WakuMessage) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *WakuMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Timestamp != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Timestamp))))
i--
dAtA[i] = 0x29
}
if len(m.Proof) > 0 {
i -= len(m.Proof)
copy(dAtA[i:], m.Proof)
i = encodeVarintWakuMessage(dAtA, i, uint64(len(m.Proof)))
i--
dAtA[i] = 0x22
}
if m.Version != 0 {
i = encodeVarintWakuMessage(dAtA, i, uint64(m.Version))
i--
dAtA[i] = 0x18
}
if m.ContentTopic != 0 {
i = encodeVarintWakuMessage(dAtA, i, uint64(m.ContentTopic))
i--
dAtA[i] = 0x10
}
if len(m.Payload) > 0 {
i -= len(m.Payload)
copy(dAtA[i:], m.Payload)
i = encodeVarintWakuMessage(dAtA, i, uint64(len(m.Payload)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintWakuMessage(dAtA []byte, offset int, v uint64) int {
offset -= sovWakuMessage(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *WakuMessage) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Payload)
if l > 0 {
n += 1 + l + sovWakuMessage(uint64(l))
}
if m.ContentTopic != 0 {
n += 1 + sovWakuMessage(uint64(m.ContentTopic))
}
if m.Version != 0 {
n += 1 + sovWakuMessage(uint64(m.Version))
}
l = len(m.Proof)
if l > 0 {
n += 1 + l + sovWakuMessage(uint64(l))
}
if m.Timestamp != 0 {
n += 9
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovWakuMessage(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozWakuMessage(x uint64) (n int) {
return sovWakuMessage(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *WakuMessage) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: WakuMessage: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: WakuMessage: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Payload", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthWakuMessage
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthWakuMessage
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Payload = append(m.Payload[:0], dAtA[iNdEx:postIndex]...)
if m.Payload == nil {
m.Payload = []byte{}
}
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field ContentTopic", wireType)
}
m.ContentTopic = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.ContentTopic |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType)
}
m.Version = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Version |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Proof", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthWakuMessage
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthWakuMessage
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Proof = append(m.Proof[:0], dAtA[iNdEx:postIndex]...)
if m.Proof == nil {
m.Proof = []byte{}
}
iNdEx = postIndex
case 5:
if wireType != 1 {
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
}
var v uint64
if (iNdEx + 8) > l {
return io.ErrUnexpectedEOF
}
v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.Timestamp = float64(math.Float64frombits(v))
default:
iNdEx = preIndex
skippy, err := skipWakuMessage(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthWakuMessage
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipWakuMessage(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowWakuMessage
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowWakuMessage
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowWakuMessage
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthWakuMessage
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupWakuMessage
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthWakuMessage
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
file_waku_message_proto_rawDescOnce sync.Once
file_waku_message_proto_rawDescData = file_waku_message_proto_rawDesc
ErrInvalidLengthWakuMessage = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowWakuMessage = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupWakuMessage = fmt.Errorf("proto: unexpected end of group")
)
func file_waku_message_proto_rawDescGZIP() []byte {
file_waku_message_proto_rawDescOnce.Do(func() {
file_waku_message_proto_rawDescData = protoimpl.X.CompressGZIP(file_waku_message_proto_rawDescData)
})
return file_waku_message_proto_rawDescData
}
var file_waku_message_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_waku_message_proto_goTypes = []interface{}{
(*WakuMessage)(nil), // 0: protocol.WakuMessage
}
var file_waku_message_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_waku_message_proto_init() }
func file_waku_message_proto_init() {
if File_waku_message_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_waku_message_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*WakuMessage); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
file_waku_message_proto_msgTypes[0].OneofWrappers = []interface{}{}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_waku_message_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_waku_message_proto_goTypes,
DependencyIndexes: file_waku_message_proto_depIdxs,
MessageInfos: file_waku_message_proto_msgTypes,
}.Build()
File_waku_message_proto = out.File
file_waku_message_proto_rawDesc = nil
file_waku_message_proto_goTypes = nil
file_waku_message_proto_depIdxs = nil
}

View File

@ -3,9 +3,9 @@ syntax = "proto3";
package protocol;
message WakuMessage {
optional bytes payload = 1;
optional uint32 contentTopic = 2;
optional uint32 version = 3;
optional bytes proof = 4;
optional double timestamp = 5;
bytes payload = 1;
uint32 contentTopic = 2;
uint32 version = 3;
bytes proof = 4;
double timestamp = 5;
}

File diff suppressed because it is too large Load Diff

View File

@ -10,27 +10,29 @@ message Index {
}
message PagingInfo {
int64 pageSize = 1;
uint64 pageSize = 1;
Index cursor = 2;
enum Direction {
FORWARD = 0;
BACKWARD = 1;
BACKWARD = 0;
FORWARD = 1;
}
Direction direction = 3;
}
message HistoryQuery {
repeated uint32 topics = 1;
optional PagingInfo pagingInfo = 2; // used for pagination
PagingInfo pagingInfo = 2; // used for pagination
double startTime = 3;
double endTime = 4;
}
message HistoryResponse {
repeated WakuMessage messages = 1;
optional PagingInfo pagingInfo = 2; // used for pagination
PagingInfo pagingInfo = 2; // used for pagination
}
message HistoryRPC {
string request_id = 1;
HistoryQuery query = 2;
HistoryResponse response = 3;
}
}

View File

@ -8,7 +8,6 @@ import (
"encoding/hex"
"errors"
"fmt"
"io/ioutil"
"sort"
"sync"
"time"
@ -20,11 +19,11 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-msgio/protoio"
ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/common"
"github.com/status-im/go-waku/waku/v2/protocol"
"google.golang.org/protobuf/proto"
)
var log = logging.Logger("wakustore")
@ -118,7 +117,7 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (r
for i := s; i <= e; i++ {
resMessages = append(resMessages, msgList[i])
}
resPagingInfo = &protocol.PagingInfo{PageSize: int64(retrievedPageSize), Cursor: newCursor, Direction: pinfo.Direction}
resPagingInfo = &protocol.PagingInfo{PageSize: uint64(retrievedPageSize), Cursor: newCursor, Direction: pinfo.Direction}
return
}
@ -148,7 +147,7 @@ func (w *WakuStore) FindMessages(query *protocol.HistoryQuery) *protocol.History
// data holds IndexedWakuMessage whose topics match the query
var data []IndexedWakuMessage
for _, indexedMsg := range w.messages {
if contains(query.Topics, *indexedMsg.msg.ContentTopic) {
if contains(query.Topics, indexedMsg.msg.ContentTopic) {
data = append(data, indexedMsg)
}
}
@ -188,6 +187,10 @@ func NewWakuStore(ctx context.Context, h host.Host, msg chan *common.Envelope, p
}
func (store *WakuStore) Start() {
if store.msgProvider == nil {
return
}
store.h.SetStreamHandler(WakuStoreProtocolId, store.onRequest)
messages, err := store.msgProvider.GetAll()
@ -237,34 +240,22 @@ func (store *WakuStore) onRequest(s network.Stream) {
historyRPCRequest := &protocol.HistoryRPC{}
buf := make([]byte, 64*1024)
_, err := s.Read(buf)
writer := protoio.NewDelimitedWriter(s)
reader := protoio.NewDelimitedReader(s, 64*1024)
err := reader.ReadMsg(historyRPCRequest)
if err != nil {
s.Reset()
log.Error("error reading request", err)
return
}
proto.Unmarshal(buf, historyRPCRequest)
if err != nil {
log.Error("error decoding request", err)
return
}
log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer()))
historyResponseRPC := &protocol.HistoryRPC{}
historyResponseRPC.RequestId = historyRPCRequest.RequestId
historyResponseRPC.Response = store.FindMessages(historyRPCRequest.Query)
message, err := proto.Marshal(historyResponseRPC)
if err != nil {
log.Error("error encoding response", err)
return
}
_, err = s.Write(message)
err = writer.WriteMsg(historyResponseRPC)
if err != nil {
log.Error("error writing response", err)
s.Reset()
@ -274,7 +265,7 @@ func (store *WakuStore) onRequest(s network.Stream) {
}
func computeIndex(msg *protocol.WakuMessage) (*protocol.Index, error) {
data, err := proto.Marshal(msg)
data, err := msg.Marshal()
if err != nil {
return nil, err
}
@ -415,36 +406,27 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery) (*protocol.HistoryRespon
return nil, err
}
historyRequest := &protocol.HistoryRPC{Query: q, RequestId: GenerateRequestId()}
message, err := proto.Marshal(historyRequest)
if err != nil {
log.Error("could not encode request", err)
return nil, err
}
defer connOpt.Close()
defer connOpt.Reset()
_, err = connOpt.Write(message)
historyRequest := &protocol.HistoryRPC{Query: q, RequestId: GenerateRequestId()}
writer := protoio.NewDelimitedWriter(connOpt)
reader := protoio.NewDelimitedReader(connOpt, 64*1024)
err = writer.WriteMsg(historyRequest)
if err != nil {
log.Error("could not write request", err)
return nil, err
}
buf, err := ioutil.ReadAll(connOpt)
historyResponseRPC := &protocol.HistoryRPC{}
err = reader.ReadMsg(historyResponseRPC)
if err != nil {
log.Error("could not read response", err)
return nil, err
}
historyResponseRPC := &protocol.HistoryRPC{}
proto.Unmarshal(buf, historyResponseRPC)
if err != nil {
log.Error("could not decode response", err)
return nil, err
}
return historyResponseRPC.Response, nil
}