diff --git a/examples/basic2/main.go b/examples/basic2/main.go index 813c8ee1..bd8ad718 100644 --- a/examples/basic2/main.go +++ b/examples/basic2/main.go @@ -84,7 +84,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { contentTopic := protocol.NewContentTopic("basic2", 1, "test", "proto") var version uint32 = 0 - var timestamp float64 = utils.GetUnixEpoch() + var timestamp int64 = utils.GetUnixEpoch() p := new(node.Payload) p.Data = []byte(wakuNode.ID() + ": " + msgContent) diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index b5203db2..bff01372 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -95,7 +95,7 @@ func (cr *Chat) Publish(ctx context.Context, message string) error { } var version uint32 - var timestamp float64 = utils.GetUnixEpoch() + var timestamp int64 = utils.GetUnixEpoch() var keyInfo *node.KeyInfo = &node.KeyInfo{} if cr.useV1Payload { // Use WakuV1 encryption diff --git a/examples/filter2/main.go b/examples/filter2/main.go index 69afa73e..941e4a57 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -143,7 +143,7 @@ func randomHex(n int) (string, error) { func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { var version uint32 = 0 - var timestamp float64 = utils.GetUnixEpoch() + var timestamp int64 = utils.GetUnixEpoch() p := new(node.Payload) p.Data = []byte(wakuNode.ID() + ": " + msgContent) diff --git a/tests/connection_test.go b/tests/connection_test.go index 325ba10e..63029506 100644 --- a/tests/connection_test.go +++ b/tests/connection_test.go @@ -51,7 +51,7 @@ func TestBasicSendingReceiving(t *testing.T) { func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) error { var contentTopic string = "test" var version uint32 = 0 - var timestamp float64 = utils.GetUnixEpoch() + var timestamp int64 = utils.GetUnixEpoch() p := new(node.Payload) p.Data = []byte(wakuNode.ID() + ": " + msgContent) diff --git a/tests/utils.go b/tests/utils.go index 030692ac..496ab9df 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -81,7 +81,7 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e ) } -func CreateWakuMessage(contentTopic string, timestamp float64) *pb.WakuMessage { +func CreateWakuMessage(contentTopic string, timestamp int64) *pb.WakuMessage { return &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: contentTopic, Version: 0, Timestamp: timestamp} } diff --git a/waku/persistence/store.go b/waku/persistence/store.go index affc3947..78a40114 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -28,7 +28,7 @@ type DBStore struct { type StoredMessage struct { ID []byte PubsubTopic string - ReceiverTime float64 + ReceiverTime int64 Message *pb.WakuMessage } @@ -93,8 +93,8 @@ func NewDBStore(log *zap.SugaredLogger, options ...DBOption) (*DBStore, error) { func (d *DBStore) createTable() error { sqlStmt := `CREATE TABLE IF NOT EXISTS message ( id BLOB PRIMARY KEY, - receiverTimestamp REAL NOT NULL, - senderTimestamp REAL NOT NULL, + receiverTimestamp INTEGER NOT NULL, + senderTimestamp INTEGER NOT NULL, contentTopic BLOB NOT NULL, pubsubTopic BLOB NOT NULL, payload BLOB, @@ -161,8 +161,8 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) { for rows.Next() { var id []byte - var receiverTimestamp float64 - var senderTimestamp float64 + var receiverTimestamp int64 + var senderTimestamp int64 var contentTopic string var payload []byte var version uint32 diff --git a/waku/persistence/store_test.go b/waku/persistence/store_test.go index cbe20d8e..d32fe6e3 100644 --- a/waku/persistence/store_test.go +++ b/waku/persistence/store_test.go @@ -20,7 +20,7 @@ func NewMock() *sql.DB { return db } -func createIndex(digest []byte, receiverTime float64) *pb.Index { +func createIndex(digest []byte, receiverTime int64) *pb.Index { return &pb.Index{ Digest: digest, ReceiverTime: receiverTime, @@ -57,18 +57,18 @@ func TestStoreRetention(t *testing.T) { insertTime := time.Now() - _ = store.Put(createIndex([]byte{1}, float64(insertTime.Add(-70*time.Second).Unix())), "test", tests.CreateWakuMessage("test", 1)) - _ = store.Put(createIndex([]byte{2}, float64(insertTime.Add(-60*time.Second).Unix())), "test", tests.CreateWakuMessage("test", 2)) - _ = store.Put(createIndex([]byte{3}, float64(insertTime.Add(-50*time.Second).Unix())), "test", tests.CreateWakuMessage("test", 3)) - _ = store.Put(createIndex([]byte{4}, float64(insertTime.Add(-40*time.Second).Unix())), "test", tests.CreateWakuMessage("test", 4)) - _ = store.Put(createIndex([]byte{5}, float64(insertTime.Add(-30*time.Second).Unix())), "test", tests.CreateWakuMessage("test", 5)) + _ = store.Put(createIndex([]byte{1}, insertTime.Add(-70*time.Second).UnixNano()), "test", tests.CreateWakuMessage("test", 1)) + _ = store.Put(createIndex([]byte{2}, insertTime.Add(-60*time.Second).UnixNano()), "test", tests.CreateWakuMessage("test", 2)) + _ = store.Put(createIndex([]byte{3}, insertTime.Add(-50*time.Second).UnixNano()), "test", tests.CreateWakuMessage("test", 3)) + _ = store.Put(createIndex([]byte{4}, insertTime.Add(-40*time.Second).UnixNano()), "test", tests.CreateWakuMessage("test", 4)) + _ = store.Put(createIndex([]byte{5}, insertTime.Add(-30*time.Second).UnixNano()), "test", tests.CreateWakuMessage("test", 5)) dbResults, err := store.GetAll() require.NoError(t, err) require.Len(t, dbResults, 5) - _ = store.Put(createIndex([]byte{6}, float64(insertTime.Add(-20*time.Second).Unix())), "test", tests.CreateWakuMessage("test", 6)) - _ = store.Put(createIndex([]byte{7}, float64(insertTime.Add(-10*time.Second).Unix())), "test", tests.CreateWakuMessage("test", 7)) + _ = store.Put(createIndex([]byte{6}, insertTime.Add(-20*time.Second).UnixNano()), "test", tests.CreateWakuMessage("test", 6)) + _ = store.Put(createIndex([]byte{7}, insertTime.Add(-10*time.Second).UnixNano()), "test", tests.CreateWakuMessage("test", 7)) // This step simulates starting go-waku again from scratch diff --git a/waku/v2/node/connectedness.go b/waku/v2/node/connectedness.go index 62e5499f..d7a27cc6 100644 --- a/waku/v2/node/connectedness.go +++ b/waku/v2/node/connectedness.go @@ -93,8 +93,10 @@ func (w *WakuNode) connectednessListener() { select { case <-w.quit: return - case <-w.protocolEventSub.Out(): - case <-w.identificationEventSub.Out(): + case a := <-w.protocolEventSub.Out(): + fmt.Println(a) + case b := <-w.identificationEventSub.Out(): + fmt.Println(b) case <-w.connectionNotif.DisconnectChan: } w.sendConnStatus() diff --git a/waku/v2/node/waku_payload_encoding_test.go b/waku/v2/node/waku_payload_encoding_test.go index 8eabfd37..3adc9e71 100644 --- a/waku/v2/node/waku_payload_encoding_test.go +++ b/waku/v2/node/waku_payload_encoding_test.go @@ -13,7 +13,7 @@ func createTestMsg(version uint32) *pb.WakuMessage { message := new(pb.WakuMessage) message.Payload = []byte{0, 1, 2} message.Version = version - message.Timestamp = float64(123456) + message.Timestamp = 123456 return message } @@ -36,7 +36,7 @@ func TestEncodeDecodePayload(t *testing.T) { message := new(pb.WakuMessage) message.Payload = encodedPayload message.Version = version - message.Timestamp = float64(123456) + message.Timestamp = 123456 decodedPayload, err := DecodePayload(message, keyInfo) require.NoError(t, err) diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index e3d44da9..ec7c2d80 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -129,7 +129,7 @@ func Test1100(t *testing.T) { for i := 1; i <= 1100; i++ { msg := createTestMsg(0) msg.Payload = []byte(fmt.Sprint(i)) - msg.Timestamp = float64(i) + msg.Timestamp = int64(i) if err := wakuNode2.Publish(ctx, msg); err != nil { require.Fail(t, "Could not publish all messages") } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 795affd8..e191606b 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -78,8 +78,8 @@ func TestWakuLightPush(t *testing.T) { err = clientHost.Peerstore().AddProtocols(host2.ID(), string(LightPushID_v20beta1)) require.NoError(t, err) - msg1 := tests.CreateWakuMessage("test1", float64(0)) - msg2 := tests.CreateWakuMessage("test2", float64(1)) + msg1 := tests.CreateWakuMessage("test1", 0) + msg2 := tests.CreateWakuMessage("test2", 1) req := new(pb.PushRequest) req.Message = msg1 @@ -137,6 +137,6 @@ func TestWakuLightPushNoPeers(t *testing.T) { require.NoError(t, err) client := NewWakuLightPush(ctx, clientHost, nil, tests.Logger()) - _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", float64(0)), testTopic) + _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", 0), testTopic) require.Errorf(t, err, "no suitable remote peers") } diff --git a/waku/v2/protocol/pb/utils_test.go b/waku/v2/protocol/pb/utils_test.go index 7f42bb4b..d2d84fe1 100644 --- a/waku/v2/protocol/pb/utils_test.go +++ b/waku/v2/protocol/pb/utils_test.go @@ -16,12 +16,11 @@ func TestEnvelopeHash(t *testing.T) { msg := new(WakuMessage) msg.ContentTopic = "Test" msg.Payload = []byte("Hello World") - msg.Timestamp = float64(123456789123456789) + msg.Timestamp = 123456789123456789 msg.Version = 1 - expected := []byte{77, 197, 250, 41, 30, 163, 192, 239, 48, 104, 58, 175, 36, 81, 96, 58, 118, 107, 73, 4, 153, 182, 33, 199, 144, 156, 110, 226, 93, 85, 160, 180} + expected := []byte{82, 136, 166, 250, 14, 69, 211, 99, 19, 161, 139, 206, 179, 3, 117, 51, 112, 111, 203, 150, 207, 35, 104, 102, 21, 181, 114, 165, 77, 29, 190, 61} result, err := msg.Hash() - require.NoError(t, err) require.Equal(t, expected, result) } diff --git a/waku/v2/protocol/pb/waku_message.pb.go b/waku/v2/protocol/pb/waku_message.pb.go index 8930d373..f0da3d1d 100644 --- a/waku/v2/protocol/pb/waku_message.pb.go +++ b/waku/v2/protocol/pb/waku_message.pb.go @@ -4,7 +4,6 @@ package pb import ( - encoding_binary "encoding/binary" fmt "fmt" proto "github.com/golang/protobuf/proto" io "io" @@ -27,7 +26,7 @@ type WakuMessage struct { Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` ContentTopic string `protobuf:"bytes,2,opt,name=contentTopic,proto3" json:"contentTopic,omitempty"` Version uint32 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"` - Timestamp float64 `protobuf:"fixed64,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Timestamp int64 `protobuf:"zigzag64,10,opt,name=timestamp,proto3" json:"timestamp,omitempty"` Proof []byte `protobuf:"bytes,21,opt,name=proof,proto3" json:"proof,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` @@ -88,7 +87,7 @@ func (m *WakuMessage) GetVersion() uint32 { return 0 } -func (m *WakuMessage) GetTimestamp() float64 { +func (m *WakuMessage) GetTimestamp() int64 { if m != nil { return m.Timestamp } @@ -117,11 +116,11 @@ var fileDescriptor_6f0a20862b3bf714 = []byte{ 0x82, 0x71, 0x85, 0x94, 0xb8, 0x78, 0x92, 0xf3, 0xf3, 0x4a, 0x52, 0xf3, 0x4a, 0x42, 0xf2, 0x0b, 0x32, 0x93, 0x25, 0x98, 0x14, 0x18, 0x35, 0x38, 0x83, 0x50, 0xc4, 0x40, 0xba, 0xcb, 0x52, 0x8b, 0x8a, 0x33, 0xf3, 0xf3, 0x24, 0x98, 0x15, 0x18, 0x35, 0x78, 0x83, 0x60, 0x5c, 0x21, 0x19, 0x2e, - 0xce, 0x92, 0xcc, 0xdc, 0xd4, 0xe2, 0x92, 0xc4, 0xdc, 0x02, 0x09, 0x16, 0x05, 0x46, 0x0d, 0xc6, + 0xce, 0x92, 0xcc, 0xdc, 0xd4, 0xe2, 0x92, 0xc4, 0xdc, 0x02, 0x09, 0x2e, 0x05, 0x46, 0x0d, 0xa1, 0x20, 0x84, 0x80, 0x90, 0x08, 0x17, 0x6b, 0x41, 0x51, 0x7e, 0x7e, 0x9a, 0x84, 0x28, 0xd8, 0x4e, 0x08, 0xc7, 0x49, 0xe0, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x9c, 0xf1, 0x58, 0x8e, 0x21, 0x89, 0x0d, 0xec, 0x70, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, - 0x0a, 0xc6, 0x91, 0x28, 0xce, 0x00, 0x00, 0x00, + 0x33, 0x1b, 0x64, 0x81, 0xce, 0x00, 0x00, 0x00, } func (m *WakuMessage) Marshal() (dAtA []byte, err error) { @@ -158,10 +157,9 @@ func (m *WakuMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0xaa } if m.Timestamp != 0 { - i -= 8 - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Timestamp)))) + i = encodeVarintWakuMessage(dAtA, i, uint64((uint64(m.Timestamp)<<1)^uint64((m.Timestamp>>63)))) i-- - dAtA[i] = 0x21 + dAtA[i] = 0x50 } if m.Version != 0 { i = encodeVarintWakuMessage(dAtA, i, uint64(m.Version)) @@ -214,7 +212,7 @@ func (m *WakuMessage) Size() (n int) { n += 1 + sovWakuMessage(uint64(m.Version)) } if m.Timestamp != 0 { - n += 9 + n += 1 + sozWakuMessage(uint64(m.Timestamp)) } l = len(m.Proof) if l > 0 { @@ -346,17 +344,27 @@ func (m *WakuMessage) Unmarshal(dAtA []byte) error { break } } - case 4: - if wireType != 1 { + case 10: + if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) } var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } } - v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.Timestamp = float64(math.Float64frombits(v)) + v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63) + m.Timestamp = int64(v) case 21: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Proof", wireType) diff --git a/waku/v2/protocol/pb/waku_message.proto b/waku/v2/protocol/pb/waku_message.proto index 8b689cba..dc79c976 100644 --- a/waku/v2/protocol/pb/waku_message.proto +++ b/waku/v2/protocol/pb/waku_message.proto @@ -6,6 +6,6 @@ message WakuMessage { bytes payload = 1; string contentTopic = 2; uint32 version = 3; - double timestamp = 4; + sint64 timestamp = 10; bytes proof = 21; } \ No newline at end of file diff --git a/waku/v2/protocol/pb/waku_store.pb.go b/waku/v2/protocol/pb/waku_store.pb.go index 144b92ec..bc12c210 100644 --- a/waku/v2/protocol/pb/waku_store.pb.go +++ b/waku/v2/protocol/pb/waku_store.pb.go @@ -4,7 +4,6 @@ package pb import ( - encoding_binary "encoding/binary" fmt "fmt" proto "github.com/golang/protobuf/proto" io "io" @@ -75,8 +74,8 @@ func (HistoryResponse_Error) EnumDescriptor() ([]byte, []int) { type Index struct { Digest []byte `protobuf:"bytes,1,opt,name=digest,proto3" json:"digest,omitempty"` - ReceiverTime float64 `protobuf:"fixed64,2,opt,name=receiverTime,proto3" json:"receiverTime,omitempty"` - SenderTime float64 `protobuf:"fixed64,3,opt,name=senderTime,proto3" json:"senderTime,omitempty"` + ReceiverTime int64 `protobuf:"zigzag64,2,opt,name=receiverTime,proto3" json:"receiverTime,omitempty"` + SenderTime int64 `protobuf:"zigzag64,3,opt,name=senderTime,proto3" json:"senderTime,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -122,14 +121,14 @@ func (m *Index) GetDigest() []byte { return nil } -func (m *Index) GetReceiverTime() float64 { +func (m *Index) GetReceiverTime() int64 { if m != nil { return m.ReceiverTime } return 0 } -func (m *Index) GetSenderTime() float64 { +func (m *Index) GetSenderTime() int64 { if m != nil { return m.SenderTime } @@ -250,8 +249,8 @@ type HistoryQuery struct { PubsubTopic string `protobuf:"bytes,2,opt,name=pubsubTopic,proto3" json:"pubsubTopic,omitempty"` ContentFilters []*ContentFilter `protobuf:"bytes,3,rep,name=contentFilters,proto3" json:"contentFilters,omitempty"` PagingInfo *PagingInfo `protobuf:"bytes,4,opt,name=pagingInfo,proto3" json:"pagingInfo,omitempty"` - StartTime float64 `protobuf:"fixed64,5,opt,name=startTime,proto3" json:"startTime,omitempty"` - EndTime float64 `protobuf:"fixed64,6,opt,name=endTime,proto3" json:"endTime,omitempty"` + StartTime int64 `protobuf:"zigzag64,5,opt,name=startTime,proto3" json:"startTime,omitempty"` + EndTime int64 `protobuf:"zigzag64,6,opt,name=endTime,proto3" json:"endTime,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -311,14 +310,14 @@ func (m *HistoryQuery) GetPagingInfo() *PagingInfo { return nil } -func (m *HistoryQuery) GetStartTime() float64 { +func (m *HistoryQuery) GetStartTime() int64 { if m != nil { return m.StartTime } return 0 } -func (m *HistoryQuery) GetEndTime() float64 { +func (m *HistoryQuery) GetEndTime() int64 { if m != nil { return m.EndTime } @@ -476,31 +475,31 @@ var fileDescriptor_ca6891f77a46e680 = []byte{ 0x25, 0x32, 0x4e, 0x63, 0x2c, 0x24, 0x93, 0x7d, 0xd4, 0xea, 0x35, 0x29, 0xe5, 0x4f, 0x8d, 0xee, 0x06, 0x50, 0x1e, 0x44, 0x21, 0x7d, 0xc4, 0xbb, 0x50, 0x09, 0xc5, 0x94, 0x54, 0xea, 0xb0, 0x16, 0x6b, 0xd7, 0xb9, 0xa9, 0xd0, 0x85, 0xba, 0xa4, 0x80, 0xc4, 0x7b, 0x92, 0x17, 0xe2, 0x9a, 0x9c, - 0x42, 0x8b, 0xb5, 0x19, 0xdf, 0xd0, 0xb0, 0x09, 0xa0, 0x28, 0x0a, 0x0d, 0x51, 0xd4, 0xc4, 0x9a, - 0xe2, 0x7e, 0x65, 0x00, 0x67, 0xfe, 0x54, 0x44, 0xd3, 0x41, 0xf4, 0x36, 0xc6, 0x7d, 0xa8, 0x26, - 0xfe, 0x94, 0xce, 0xc5, 0x27, 0xd2, 0x66, 0x25, 0xbe, 0xac, 0xf1, 0x21, 0x54, 0x82, 0x4c, 0xaa, - 0x58, 0x6a, 0xa3, 0x5a, 0xc7, 0xf2, 0x92, 0x89, 0xa7, 0x13, 0x72, 0xd3, 0xc0, 0x27, 0x60, 0x85, - 0x42, 0x52, 0x90, 0x8a, 0x38, 0xd2, 0x66, 0x8d, 0x8e, 0x93, 0x53, 0x2b, 0x07, 0xaf, 0xbf, 0xe8, - 0xf3, 0x15, 0xea, 0x1e, 0x80, 0xb5, 0xd4, 0xb1, 0x0e, 0xd5, 0xe3, 0x6e, 0xef, 0xf9, 0x65, 0x97, - 0xf7, 0xed, 0x1d, 0xac, 0xc1, 0xee, 0xe9, 0x88, 0xeb, 0x82, 0xb9, 0x47, 0xf0, 0x5f, 0x2f, 0x8e, - 0x52, 0x8a, 0xd2, 0x53, 0xf1, 0x2e, 0x25, 0x99, 0xaf, 0x20, 0x98, 0x0b, 0x17, 0x71, 0x22, 0x02, - 0x9d, 0xd9, 0xe2, 0x1b, 0x9a, 0xfb, 0x93, 0x41, 0xfd, 0x99, 0xc8, 0x37, 0x7e, 0xfb, 0x2a, 0x23, - 0x79, 0x8b, 0x2d, 0xa8, 0x25, 0xd9, 0x44, 0x65, 0x93, 0xf9, 0x37, 0x05, 0xfd, 0xcd, 0xba, 0x84, - 0x4f, 0xa1, 0x11, 0xac, 0xfb, 0x28, 0xa7, 0xd8, 0x2a, 0xb6, 0x6b, 0x9d, 0xff, 0xf3, 0x61, 0x36, - 0x12, 0xf0, 0x2d, 0x10, 0x3d, 0x80, 0x64, 0x39, 0xad, 0x53, 0xd2, 0x9b, 0x6a, 0x6c, 0xee, 0x80, - 0xaf, 0x11, 0x78, 0x1f, 0x2c, 0x95, 0xfa, 0x32, 0xd5, 0xf7, 0x29, 0xeb, 0xfb, 0xac, 0x04, 0x74, - 0x60, 0x97, 0xa2, 0x50, 0xf7, 0x2a, 0xba, 0xb7, 0x28, 0xdd, 0x6f, 0x0c, 0xf6, 0xcc, 0x54, 0x9c, - 0x54, 0x12, 0x47, 0x8a, 0xf0, 0x31, 0x54, 0xcd, 0x13, 0x52, 0x4e, 0x41, 0x07, 0xde, 0xcb, 0x9d, - 0x2f, 0xfd, 0xab, 0xec, 0xe5, 0x5c, 0xe7, 0x4b, 0x60, 0x2b, 0x68, 0xf1, 0x9f, 0x41, 0x0f, 0xa1, - 0x4c, 0x52, 0xc6, 0x52, 0xcf, 0xd4, 0xe8, 0xdc, 0xcb, 0xd1, 0xad, 0x00, 0xde, 0x49, 0x0e, 0xf0, - 0x39, 0xe7, 0x3e, 0x82, 0xb2, 0xae, 0xb1, 0x0a, 0xa5, 0xe1, 0x68, 0x78, 0x62, 0xef, 0x20, 0x42, - 0x63, 0x30, 0x7c, 0xd3, 0x7d, 0x31, 0xe8, 0x8f, 0x7b, 0xaf, 0xf9, 0xf9, 0x88, 0xdb, 0xcc, 0xfd, - 0xcc, 0x00, 0x16, 0xbf, 0x73, 0xd6, 0xc3, 0x07, 0x00, 0x92, 0x6e, 0x32, 0x52, 0xe9, 0x58, 0x84, - 0xe6, 0x9e, 0x96, 0x51, 0x06, 0x21, 0x1e, 0x40, 0xf9, 0x26, 0x3f, 0xa2, 0x79, 0x83, 0xf6, 0x5a, - 0x0a, 0x7d, 0x5c, 0x3e, 0x6f, 0xe3, 0x21, 0x54, 0xa5, 0x49, 0x65, 0x66, 0xbb, 0xf3, 0x97, 0xc0, - 0x7c, 0x09, 0x1d, 0xdb, 0xdf, 0x67, 0x4d, 0xf6, 0x63, 0xd6, 0x64, 0xbf, 0x66, 0x4d, 0xf6, 0xe5, - 0x77, 0x73, 0x67, 0x52, 0xd1, 0x7f, 0xc3, 0xa3, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xb7, 0x5c, - 0xa1, 0x0e, 0xb2, 0x03, 0x00, 0x00, + 0x42, 0x8b, 0xb5, 0x91, 0x6f, 0x68, 0xd8, 0x04, 0x50, 0x14, 0x85, 0x86, 0x28, 0x6a, 0x62, 0x4d, + 0x71, 0xbf, 0x32, 0x80, 0x33, 0x7f, 0x2a, 0xa2, 0xe9, 0x20, 0x7a, 0x1b, 0xe3, 0x3e, 0x54, 0x13, + 0x7f, 0x4a, 0xe7, 0xe2, 0x13, 0x69, 0xb3, 0x12, 0x5f, 0xd6, 0xf8, 0x10, 0x2a, 0x41, 0x26, 0x55, + 0x2c, 0xb5, 0x51, 0xad, 0x63, 0x79, 0xc9, 0xc4, 0xd3, 0x09, 0xb9, 0x69, 0xe0, 0x13, 0xb0, 0x42, + 0x21, 0x29, 0x48, 0x45, 0x1c, 0x69, 0xb3, 0x46, 0xc7, 0xc9, 0xa9, 0x95, 0x83, 0xd7, 0x5f, 0xf4, + 0xf9, 0x0a, 0x75, 0x0f, 0xc0, 0x5a, 0xea, 0x58, 0x87, 0xea, 0x71, 0xb7, 0xf7, 0xfc, 0xb2, 0xcb, + 0xfb, 0xf6, 0x0e, 0xd6, 0x60, 0xf7, 0x74, 0xc4, 0x75, 0xc1, 0xdc, 0x23, 0xf8, 0xaf, 0x17, 0x47, + 0x29, 0x45, 0xe9, 0xa9, 0x78, 0x97, 0x92, 0xcc, 0x57, 0x10, 0xcc, 0x85, 0x8b, 0x38, 0x11, 0x81, + 0xce, 0x6c, 0xf1, 0x0d, 0xcd, 0xfd, 0xc9, 0xa0, 0xfe, 0x4c, 0xe4, 0x1b, 0xbf, 0x7d, 0x95, 0x91, + 0xbc, 0xc5, 0x16, 0xd4, 0x92, 0x6c, 0xa2, 0xb2, 0xc9, 0xfc, 0x9b, 0x82, 0xfe, 0x66, 0x5d, 0xc2, + 0xa7, 0xd0, 0x08, 0xd6, 0x7d, 0x94, 0x53, 0x6c, 0x15, 0xdb, 0xb5, 0xce, 0xff, 0xf9, 0x30, 0x1b, + 0x09, 0xf8, 0x16, 0x88, 0x1e, 0x40, 0xb2, 0x9c, 0xd6, 0x29, 0xe9, 0x4d, 0x35, 0x36, 0x77, 0xc0, + 0xd7, 0x08, 0xbc, 0x0f, 0x96, 0x4a, 0x7d, 0x99, 0xea, 0xfb, 0x94, 0xf5, 0x7d, 0x56, 0x02, 0x3a, + 0xb0, 0x4b, 0x51, 0xa8, 0x7b, 0x15, 0xdd, 0x5b, 0x94, 0xee, 0x37, 0x06, 0x7b, 0x66, 0x2a, 0x4e, + 0x2a, 0x89, 0x23, 0x45, 0xf8, 0x18, 0xaa, 0xe6, 0x09, 0x29, 0xa7, 0xa0, 0x03, 0xef, 0xe5, 0xce, + 0x97, 0xfe, 0x55, 0xf6, 0x72, 0xae, 0xf3, 0x25, 0xb0, 0x15, 0xb4, 0xf8, 0xcf, 0xa0, 0x87, 0x50, + 0x26, 0x29, 0x63, 0xa9, 0x67, 0x6a, 0x74, 0xee, 0xe5, 0xe8, 0x56, 0x00, 0xef, 0x24, 0x07, 0xf8, + 0x9c, 0x73, 0x1f, 0x41, 0x59, 0xd7, 0x58, 0x85, 0xd2, 0x70, 0x34, 0x3c, 0xb1, 0x77, 0x10, 0xa1, + 0x31, 0x18, 0xbe, 0xe9, 0xbe, 0x18, 0xf4, 0xc7, 0xbd, 0xd7, 0xfc, 0x7c, 0xc4, 0x6d, 0xe6, 0x7e, + 0x66, 0x00, 0x8b, 0xdf, 0x39, 0xeb, 0xe1, 0x03, 0x00, 0x49, 0x37, 0x19, 0xa9, 0x74, 0x2c, 0x42, + 0x73, 0x4f, 0xcb, 0x28, 0x83, 0x10, 0x0f, 0xa0, 0x7c, 0x93, 0x1f, 0xd1, 0xbc, 0x41, 0x7b, 0x2d, + 0x85, 0x3e, 0x2e, 0x9f, 0xb7, 0xf1, 0x10, 0xaa, 0xd2, 0xa4, 0x32, 0xb3, 0xdd, 0xf9, 0x4b, 0x60, + 0xbe, 0x84, 0x8e, 0xed, 0xef, 0xb3, 0x26, 0xfb, 0x31, 0x6b, 0xb2, 0x5f, 0xb3, 0x26, 0xfb, 0xf2, + 0xbb, 0xb9, 0x33, 0xa9, 0xe8, 0xbf, 0xe1, 0xd1, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x14, 0x8f, + 0x84, 0xdf, 0xb2, 0x03, 0x00, 0x00, } func (m *Index) Marshal() (dAtA []byte, err error) { @@ -528,16 +527,14 @@ func (m *Index) MarshalToSizedBuffer(dAtA []byte) (int, error) { copy(dAtA[i:], m.XXX_unrecognized) } if m.SenderTime != 0 { - i -= 8 - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.SenderTime)))) + i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.SenderTime)<<1)^uint64((m.SenderTime>>63)))) i-- - dAtA[i] = 0x19 + dAtA[i] = 0x18 } if m.ReceiverTime != 0 { - i -= 8 - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.ReceiverTime)))) + i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.ReceiverTime)<<1)^uint64((m.ReceiverTime>>63)))) i-- - dAtA[i] = 0x11 + dAtA[i] = 0x10 } if len(m.Digest) > 0 { i -= len(m.Digest) @@ -657,16 +654,14 @@ func (m *HistoryQuery) MarshalToSizedBuffer(dAtA []byte) (int, error) { copy(dAtA[i:], m.XXX_unrecognized) } if m.EndTime != 0 { - i -= 8 - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.EndTime)))) + i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.EndTime)<<1)^uint64((m.EndTime>>63)))) i-- - dAtA[i] = 0x31 + dAtA[i] = 0x30 } if m.StartTime != 0 { - i -= 8 - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.StartTime)))) + i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.StartTime)<<1)^uint64((m.StartTime>>63)))) i-- - dAtA[i] = 0x29 + dAtA[i] = 0x28 } if m.PagingInfo != nil { { @@ -842,10 +837,10 @@ func (m *Index) Size() (n int) { n += 1 + l + sovWakuStore(uint64(l)) } if m.ReceiverTime != 0 { - n += 9 + n += 1 + sozWakuStore(uint64(m.ReceiverTime)) } if m.SenderTime != 0 { - n += 9 + n += 1 + sozWakuStore(uint64(m.SenderTime)) } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) @@ -912,10 +907,10 @@ func (m *HistoryQuery) Size() (n int) { n += 1 + l + sovWakuStore(uint64(l)) } if m.StartTime != 0 { - n += 9 + n += 1 + sozWakuStore(uint64(m.StartTime)) } if m.EndTime != 0 { - n += 9 + n += 1 + sozWakuStore(uint64(m.EndTime)) } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) @@ -1042,27 +1037,47 @@ func (m *Index) Unmarshal(dAtA []byte) error { } iNdEx = postIndex case 2: - if wireType != 1 { + if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field ReceiverTime", wireType) } var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuStore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } } - v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.ReceiverTime = float64(math.Float64frombits(v)) + v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63) + m.ReceiverTime = int64(v) case 3: - if wireType != 1 { + if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field SenderTime", wireType) } var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuStore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } } - v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.SenderTime = float64(math.Float64frombits(v)) + v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63) + m.SenderTime = int64(v) default: iNdEx = preIndex skippy, err := skipWakuStore(dAtA[iNdEx:]) @@ -1425,27 +1440,47 @@ func (m *HistoryQuery) Unmarshal(dAtA []byte) error { } iNdEx = postIndex case 5: - if wireType != 1 { + if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field StartTime", wireType) } var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuStore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } } - v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.StartTime = float64(math.Float64frombits(v)) + v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63) + m.StartTime = int64(v) case 6: - if wireType != 1 { + if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field EndTime", wireType) } var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuStore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } } - v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.EndTime = float64(math.Float64frombits(v)) + v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63) + m.EndTime = int64(v) default: iNdEx = preIndex skippy, err := skipWakuStore(dAtA[iNdEx:]) diff --git a/waku/v2/protocol/pb/waku_store.proto b/waku/v2/protocol/pb/waku_store.proto index 7e4c879d..4d940cd8 100644 --- a/waku/v2/protocol/pb/waku_store.proto +++ b/waku/v2/protocol/pb/waku_store.proto @@ -6,8 +6,8 @@ import "waku_message.proto"; message Index { bytes digest = 1; - double receiverTime = 2; - double senderTime = 3; + sint64 receiverTime = 2; + sint64 senderTime = 3; } message PagingInfo { @@ -28,8 +28,8 @@ message HistoryQuery { string pubsubTopic = 2; repeated ContentFilter contentFilters = 3; PagingInfo pagingInfo = 4; // used for pagination - double startTime = 5; - double endTime = 6; + sint64 startTime = 5; + sint64 endTime = 6; } message HistoryResponse { diff --git a/waku/v2/protocol/store/message_queue.go b/waku/v2/protocol/store/message_queue.go index 71333111..b029c2a0 100644 --- a/waku/v2/protocol/store/message_queue.go +++ b/waku/v2/protocol/store/message_queue.go @@ -28,6 +28,7 @@ func (self *MessageQueue) Push(msg IndexedWakuMessage) error { var k [32]byte copy(k[:], msg.index.Digest) + if _, ok := self.seen[k]; ok { return ErrDuplicatedMessage } diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index fd0f01a2..c79caa77 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -22,11 +22,11 @@ func TestFindLastSeenMessage(t *testing.T) { msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test") s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) - s.storeMessage(msg1) - s.storeMessage(msg3) - s.storeMessage(msg5) - s.storeMessage(msg2) - s.storeMessage(msg4) + _ = s.storeMessage(msg1) + _ = s.storeMessage(msg3) + _ = s.storeMessage(msg5) + _ = s.storeMessage(msg2) + _ = s.storeMessage(msg4) require.Equal(t, msg5.Message().Timestamp, s.findLastSeen()) } @@ -48,8 +48,9 @@ func TestResume(t *testing.T) { contentTopic = "2" } - msg := protocol.NewEnvelope(tests.CreateWakuMessage(contentTopic, float64(time.Duration(i)*time.Second)), "test") - s1.storeMessage(msg) + wakuMessage := tests.CreateWakuMessage(contentTopic, (time.Duration(i) * time.Second).Nanoseconds()) + msg := protocol.NewEnvelope(wakuMessage, "test") + _ = s1.storeMessage(msg) } host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) @@ -91,9 +92,9 @@ func TestResumeWithListOfPeers(t *testing.T) { s1.Start(ctx) defer s1.Stop() - msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)} + msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: 0} - s1.storeMessage(protocol.NewEnvelope(msg0, "test")) + _ = s1.storeMessage(protocol.NewEnvelope(msg0, "test")) host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) @@ -124,9 +125,9 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { s1.Start(ctx) defer s1.Stop() - msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)} + msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: 0} - s1.storeMessage(protocol.NewEnvelope(msg0, "test")) + _ = s1.storeMessage(protocol.NewEnvelope(msg0, "test")) host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index e4807cbc..452bef0e 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -27,7 +27,7 @@ import ( ) // StoreID_v20beta3 is the current Waku Store protocol identifier -const StoreID_v20beta3 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta3") +const StoreID_v20beta3 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta4") // MaxPageSize is the maximum number of waku messages to return per page const MaxPageSize = 100 @@ -193,8 +193,8 @@ type MessageProvider interface { type Query struct { Topic string ContentTopics []string - StartTime float64 - EndTime float64 + StartTime int64 + EndTime int64 } // Result represents a valid response from a store node @@ -296,7 +296,7 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) { for _, storedMessage := range storedMessages { idx := &pb.Index{ Digest: storedMessage.ID, - ReceiverTime: float64(storedMessage.ReceiverTime), + ReceiverTime: storedMessage.ReceiverTime, } _ = store.addToMessageQueue(storedMessage.PubsubTopic, idx, storedMessage.Message) @@ -309,21 +309,21 @@ func (store *WakuStore) addToMessageQueue(pubsubTopic string, idx *pb.Index, msg return store.messageQueue.Push(IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic}) } -func (store *WakuStore) storeMessage(env *protocol.Envelope) { +func (store *WakuStore) storeMessage(env *protocol.Envelope) error { index, err := computeIndex(env) if err != nil { store.log.Error("could not calculate message index", err) - return + return err } err = store.addToMessageQueue(env.PubsubTopic(), index, env.Message()) if err == ErrDuplicatedMessage { - return + return err } if store.msgProvider == nil { metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length()) - return + return err } // TODO: Move this to a separate go routine if DB writes becomes a bottleneck @@ -331,16 +331,17 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) { if err != nil { store.log.Error("could not store message", err) metrics.RecordStoreError(store.ctx, "store_failure") - return + return err } metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length()) + return nil } func (store *WakuStore) storeIncomingMessages(ctx context.Context) { defer store.wg.Done() for envelope := range store.MsgC { - store.storeMessage(envelope) + _ = store.storeMessage(envelope) } } @@ -650,6 +651,7 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestId()) if err == nil { resultChan <- result + return } store.log.Error(fmt.Errorf("resume history with peer %s failed: %w", peer, err)) }() @@ -672,8 +674,8 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c return nil, ErrFailedQuery } -func (store *WakuStore) findLastSeen() float64 { - var lastSeenTime float64 = 0 +func (store *WakuStore) findLastSeen() int64 { + var lastSeenTime int64 = 0 for imsg := range store.messageQueue.Messages() { if imsg.msg.Timestamp > lastSeenTime { lastSeenTime = imsg.msg.Timestamp @@ -682,6 +684,13 @@ func (store *WakuStore) findLastSeen() float64 { return lastSeenTime } +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} + // Resume retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online // messages are stored in the store node's messages field and in the message db // the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message @@ -698,9 +707,9 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList currentTime := utils.GetUnixEpoch() lastSeenTime := store.findLastSeen() - var offset float64 = 200000 + var offset int64 = int64(20 * time.Nanosecond) currentTime = currentTime + offset - lastSeenTime = math.Max(lastSeenTime-offset, 0) + lastSeenTime = max(lastSeenTime-offset, 0) rpc := &pb.HistoryQuery{ PubsubTopic: pubsubTopic, @@ -728,13 +737,16 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList return -1, ErrFailedToResumeHistory } + msgCount := 0 for _, msg := range messages { - store.storeMessage(protocol.NewEnvelope(msg, pubsubTopic)) + if err = store.storeMessage(protocol.NewEnvelope(msg, pubsubTopic)); err == nil { + msgCount++ + } } store.log.Info("Retrieved messages since the last online time: ", len(messages)) - return len(messages), nil + return msgCount, nil } // TODO: queryWithAccounting diff --git a/waku/v2/protocol/store/waku_store_pagination_test.go b/waku/v2/protocol/store/waku_store_pagination_test.go index e07a94bb..52cb9c8a 100644 --- a/waku/v2/protocol/store/waku_store_pagination_test.go +++ b/waku/v2/protocol/store/waku_store_pagination_test.go @@ -96,8 +96,8 @@ func createSampleList(s int) []IndexedWakuMessage { Payload: []byte{byte(i)}, }, index: &pb.Index{ - ReceiverTime: float64(i), - SenderTime: float64(i), + ReceiverTime: int64(i), + SenderTime: int64(i), Digest: []byte{1}, }, }) diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go index f4628b2b..770098de 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -38,7 +38,7 @@ func TestStorePersistence(t *testing.T) { Timestamp: utils.GetUnixEpoch(), } - s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic)) + _ = s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic)) s2 := NewWakuStore(nil, nil, dbStore, 0, 0, tests.Logger()) s2.fetchDBRecords(ctx) @@ -46,5 +46,6 @@ func TestStorePersistence(t *testing.T) { require.Equal(t, msg, s2.messageQueue.messages[0].msg) // Storing a duplicated message should not crash. It's okay to generate an error log in this case - s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic)) + err = s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic)) + require.ErrorIs(t, err, ErrDuplicatedMessage) } diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 15f53c4f..e8150cea 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -73,11 +73,11 @@ func TestWakuStoreProtocolNext(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - msg1 := tests.CreateWakuMessage(topic1, float64(1)) - msg2 := tests.CreateWakuMessage(topic1, float64(2)) - msg3 := tests.CreateWakuMessage(topic1, float64(3)) - msg4 := tests.CreateWakuMessage(topic1, float64(4)) - msg5 := tests.CreateWakuMessage(topic1, float64(5)) + msg1 := tests.CreateWakuMessage(topic1, 1) + msg2 := tests.CreateWakuMessage(topic1, 2) + msg3 := tests.CreateWakuMessage(topic1, 3) + msg4 := tests.CreateWakuMessage(topic1, 4) + msg5 := tests.CreateWakuMessage(topic1, 5) s1.MsgC <- protocol.NewEnvelope(msg1, pubsubTopic1) s1.MsgC <- protocol.NewEnvelope(msg2, pubsubTopic1) diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index e47b4bea..fde0616d 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -18,8 +18,8 @@ func TestStoreQuery(t *testing.T) { msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) - s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) - s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) + _ = s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) + _ = s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) response := s.FindMessages(&pb.HistoryQuery{ ContentFilters: []*pb.ContentFilter{ @@ -45,9 +45,9 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) - s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) - s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) - s.storeMessage(protocol.NewEnvelope(msg3, defaultPubSubTopic)) + _ = s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) + _ = s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) + _ = s.storeMessage(protocol.NewEnvelope(msg3, defaultPubSubTopic)) response := s.FindMessages(&pb.HistoryQuery{ ContentFilters: []*pb.ContentFilter{ @@ -78,9 +78,9 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) - s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) - s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) - s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) + _ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) + _ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) + _ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) response := s.FindMessages(&pb.HistoryQuery{ PubsubTopic: pubsubTopic1, @@ -110,9 +110,9 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) - s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2)) - s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) - s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) + _ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2)) + _ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) + _ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) response := s.FindMessages(&pb.HistoryQuery{ PubsubTopic: pubsubTopic1, @@ -132,9 +132,9 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) - s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) - s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1)) - s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1)) + _ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) + _ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1)) + _ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1)) response := s.FindMessages(&pb.HistoryQuery{ PubsubTopic: pubsubTopic1, @@ -154,7 +154,7 @@ func TestStoreQueryForwardPagination(t *testing.T) { for i := 0; i < 10; i++ { msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg.Payload = []byte{byte(i)} - s.storeMessage(protocol.NewEnvelope(msg, pubsubTopic1)) + _ = s.storeMessage(protocol.NewEnvelope(msg, pubsubTopic1)) } response := s.FindMessages(&pb.HistoryQuery{ @@ -182,7 +182,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { Version: 0, Timestamp: utils.GetUnixEpoch(), } - s.storeMessage(protocol.NewEnvelope(msg, pubsubTopic1)) + _ = s.storeMessage(protocol.NewEnvelope(msg, pubsubTopic1)) } @@ -208,16 +208,16 @@ func TestTemporalHistoryQueries(t *testing.T) { if i%2 == 0 { contentTopic = "2" } - msg := tests.CreateWakuMessage(contentTopic, float64(i)) - s.storeMessage(protocol.NewEnvelope(msg, "test")) + msg := tests.CreateWakuMessage(contentTopic, int64(i)) + _ = s.storeMessage(protocol.NewEnvelope(msg, "test")) messages = append(messages, msg) } // handle temporal history query with a valid time window response := s.FindMessages(&pb.HistoryQuery{ ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}}, - StartTime: float64(2), - EndTime: float64(5), + StartTime: int64(2), + EndTime: int64(5), }) require.Len(t, response.Messages, 2) @@ -227,8 +227,8 @@ func TestTemporalHistoryQueries(t *testing.T) { // handle temporal history query with a zero-size time window response = s.FindMessages(&pb.HistoryQuery{ ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}}, - StartTime: float64(2), - EndTime: float64(2), + StartTime: int64(2), + EndTime: int64(2), }) require.Len(t, response.Messages, 0) @@ -236,8 +236,8 @@ func TestTemporalHistoryQueries(t *testing.T) { // handle temporal history query with an invalid time window response = s.FindMessages(&pb.HistoryQuery{ ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}}, - StartTime: float64(5), - EndTime: float64(2), + StartTime: int64(5), + EndTime: int64(2), }) // time window is invalid since start time > end time // perhaps it should return an error? diff --git a/waku/v2/rpc/store.go b/waku/v2/rpc/store.go index 7ce5465f..dae2dc78 100644 --- a/waku/v2/rpc/store.go +++ b/waku/v2/rpc/store.go @@ -27,8 +27,8 @@ type StorePagingOptions struct { type StoreMessagesArgs struct { Topic string `json:"pubsubTopic,omitempty"` ContentFilters []string `json:"contentFilters,omitempty"` - StartTime float64 `json:"startTime,omitempty"` - EndTime float64 `json:"endTime,omitempty"` + StartTime int64 `json:"startTime,omitempty"` + EndTime int64 `json:"endTime,omitempty"` PagingOptions StorePagingOptions `json:"pagingOptions,omitempty"` } diff --git a/waku/v2/utils/time.go b/waku/v2/utils/time.go index 1b28756e..1634b3ed 100644 --- a/waku/v2/utils/time.go +++ b/waku/v2/utils/time.go @@ -2,14 +2,13 @@ package utils import "time" -// GetUnixEpoch converts a time into a unix timestamp with the integer part -// representing seconds and the decimal part representing subseconds -func GetUnixEpochFrom(now time.Time) float64 { - return float64(now.UnixNano()) / float64(time.Second) +// GetUnixEpoch converts a time into a unix timestamp with nanoseconds +func GetUnixEpochFrom(now time.Time) int64 { + return now.UnixNano() } // GetUnixEpoch returns the current time in unix timestamp with the integer part // representing seconds and the decimal part representing subseconds -func GetUnixEpoch() float64 { +func GetUnixEpoch() int64 { return GetUnixEpochFrom(time.Now()) } diff --git a/waku/v2/utils/time_test.go b/waku/v2/utils/time_test.go index 26bd7549..d2e9fef6 100644 --- a/waku/v2/utils/time_test.go +++ b/waku/v2/utils/time_test.go @@ -10,7 +10,7 @@ import ( func TestGetUnixEpochFrom(t *testing.T) { loc := time.UTC timestamp := GetUnixEpochFrom(time.Date(2019, 1, 1, 0, 0, 0, 0, loc)) - require.Equal(t, float64(1546300800), timestamp) + require.Equal(t, int64(1546300800*time.Second), timestamp) timestamp = GetUnixEpoch() require.NotNil(t, timestamp)