refactor: use int64 timestamps (#189)

* refactor: use int64 timestamps
* fix: changed PB Timestamp index to 10
This commit is contained in:
Richard Ramos 2022-02-23 11:01:53 -04:00 committed by GitHub
parent e9dafb6038
commit 58f739765e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 250 additions and 192 deletions

View File

@ -84,7 +84,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
contentTopic := protocol.NewContentTopic("basic2", 1, "test", "proto")
var version uint32 = 0
var timestamp float64 = utils.GetUnixEpoch()
var timestamp int64 = utils.GetUnixEpoch()
p := new(node.Payload)
p.Data = []byte(wakuNode.ID() + ": " + msgContent)

View File

@ -95,7 +95,7 @@ func (cr *Chat) Publish(ctx context.Context, message string) error {
}
var version uint32
var timestamp float64 = utils.GetUnixEpoch()
var timestamp int64 = utils.GetUnixEpoch()
var keyInfo *node.KeyInfo = &node.KeyInfo{}
if cr.useV1Payload { // Use WakuV1 encryption

View File

@ -143,7 +143,7 @@ func randomHex(n int) (string, error) {
func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
var version uint32 = 0
var timestamp float64 = utils.GetUnixEpoch()
var timestamp int64 = utils.GetUnixEpoch()
p := new(node.Payload)
p.Data = []byte(wakuNode.ID() + ": " + msgContent)

View File

@ -51,7 +51,7 @@ func TestBasicSendingReceiving(t *testing.T) {
func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) error {
var contentTopic string = "test"
var version uint32 = 0
var timestamp float64 = utils.GetUnixEpoch()
var timestamp int64 = utils.GetUnixEpoch()
p := new(node.Payload)
p.Data = []byte(wakuNode.ID() + ": " + msgContent)

View File

@ -81,7 +81,7 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e
)
}
func CreateWakuMessage(contentTopic string, timestamp float64) *pb.WakuMessage {
func CreateWakuMessage(contentTopic string, timestamp int64) *pb.WakuMessage {
return &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: contentTopic, Version: 0, Timestamp: timestamp}
}

View File

@ -28,7 +28,7 @@ type DBStore struct {
type StoredMessage struct {
ID []byte
PubsubTopic string
ReceiverTime float64
ReceiverTime int64
Message *pb.WakuMessage
}
@ -93,8 +93,8 @@ func NewDBStore(log *zap.SugaredLogger, options ...DBOption) (*DBStore, error) {
func (d *DBStore) createTable() error {
sqlStmt := `CREATE TABLE IF NOT EXISTS message (
id BLOB PRIMARY KEY,
receiverTimestamp REAL NOT NULL,
senderTimestamp REAL NOT NULL,
receiverTimestamp INTEGER NOT NULL,
senderTimestamp INTEGER NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
@ -161,8 +161,8 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) {
for rows.Next() {
var id []byte
var receiverTimestamp float64
var senderTimestamp float64
var receiverTimestamp int64
var senderTimestamp int64
var contentTopic string
var payload []byte
var version uint32

View File

@ -20,7 +20,7 @@ func NewMock() *sql.DB {
return db
}
func createIndex(digest []byte, receiverTime float64) *pb.Index {
func createIndex(digest []byte, receiverTime int64) *pb.Index {
return &pb.Index{
Digest: digest,
ReceiverTime: receiverTime,
@ -57,18 +57,18 @@ func TestStoreRetention(t *testing.T) {
insertTime := time.Now()
_ = store.Put(createIndex([]byte{1}, float64(insertTime.Add(-70*time.Second).Unix())), "test", tests.CreateWakuMessage("test", 1))
_ = store.Put(createIndex([]byte{2}, float64(insertTime.Add(-60*time.Second).Unix())), "test", tests.CreateWakuMessage("test", 2))
_ = store.Put(createIndex([]byte{3}, float64(insertTime.Add(-50*time.Second).Unix())), "test", tests.CreateWakuMessage("test", 3))
_ = store.Put(createIndex([]byte{4}, float64(insertTime.Add(-40*time.Second).Unix())), "test", tests.CreateWakuMessage("test", 4))
_ = store.Put(createIndex([]byte{5}, float64(insertTime.Add(-30*time.Second).Unix())), "test", tests.CreateWakuMessage("test", 5))
_ = store.Put(createIndex([]byte{1}, insertTime.Add(-70*time.Second).UnixNano()), "test", tests.CreateWakuMessage("test", 1))
_ = store.Put(createIndex([]byte{2}, insertTime.Add(-60*time.Second).UnixNano()), "test", tests.CreateWakuMessage("test", 2))
_ = store.Put(createIndex([]byte{3}, insertTime.Add(-50*time.Second).UnixNano()), "test", tests.CreateWakuMessage("test", 3))
_ = store.Put(createIndex([]byte{4}, insertTime.Add(-40*time.Second).UnixNano()), "test", tests.CreateWakuMessage("test", 4))
_ = store.Put(createIndex([]byte{5}, insertTime.Add(-30*time.Second).UnixNano()), "test", tests.CreateWakuMessage("test", 5))
dbResults, err := store.GetAll()
require.NoError(t, err)
require.Len(t, dbResults, 5)
_ = store.Put(createIndex([]byte{6}, float64(insertTime.Add(-20*time.Second).Unix())), "test", tests.CreateWakuMessage("test", 6))
_ = store.Put(createIndex([]byte{7}, float64(insertTime.Add(-10*time.Second).Unix())), "test", tests.CreateWakuMessage("test", 7))
_ = store.Put(createIndex([]byte{6}, insertTime.Add(-20*time.Second).UnixNano()), "test", tests.CreateWakuMessage("test", 6))
_ = store.Put(createIndex([]byte{7}, insertTime.Add(-10*time.Second).UnixNano()), "test", tests.CreateWakuMessage("test", 7))
// This step simulates starting go-waku again from scratch

View File

@ -93,8 +93,10 @@ func (w *WakuNode) connectednessListener() {
select {
case <-w.quit:
return
case <-w.protocolEventSub.Out():
case <-w.identificationEventSub.Out():
case a := <-w.protocolEventSub.Out():
fmt.Println(a)
case b := <-w.identificationEventSub.Out():
fmt.Println(b)
case <-w.connectionNotif.DisconnectChan:
}
w.sendConnStatus()

View File

@ -13,7 +13,7 @@ func createTestMsg(version uint32) *pb.WakuMessage {
message := new(pb.WakuMessage)
message.Payload = []byte{0, 1, 2}
message.Version = version
message.Timestamp = float64(123456)
message.Timestamp = 123456
return message
}
@ -36,7 +36,7 @@ func TestEncodeDecodePayload(t *testing.T) {
message := new(pb.WakuMessage)
message.Payload = encodedPayload
message.Version = version
message.Timestamp = float64(123456)
message.Timestamp = 123456
decodedPayload, err := DecodePayload(message, keyInfo)
require.NoError(t, err)

View File

@ -129,7 +129,7 @@ func Test1100(t *testing.T) {
for i := 1; i <= 1100; i++ {
msg := createTestMsg(0)
msg.Payload = []byte(fmt.Sprint(i))
msg.Timestamp = float64(i)
msg.Timestamp = int64(i)
if err := wakuNode2.Publish(ctx, msg); err != nil {
require.Fail(t, "Could not publish all messages")
}

View File

@ -78,8 +78,8 @@ func TestWakuLightPush(t *testing.T) {
err = clientHost.Peerstore().AddProtocols(host2.ID(), string(LightPushID_v20beta1))
require.NoError(t, err)
msg1 := tests.CreateWakuMessage("test1", float64(0))
msg2 := tests.CreateWakuMessage("test2", float64(1))
msg1 := tests.CreateWakuMessage("test1", 0)
msg2 := tests.CreateWakuMessage("test2", 1)
req := new(pb.PushRequest)
req.Message = msg1
@ -137,6 +137,6 @@ func TestWakuLightPushNoPeers(t *testing.T) {
require.NoError(t, err)
client := NewWakuLightPush(ctx, clientHost, nil, tests.Logger())
_, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", float64(0)), testTopic)
_, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", 0), testTopic)
require.Errorf(t, err, "no suitable remote peers")
}

View File

@ -16,12 +16,11 @@ func TestEnvelopeHash(t *testing.T) {
msg := new(WakuMessage)
msg.ContentTopic = "Test"
msg.Payload = []byte("Hello World")
msg.Timestamp = float64(123456789123456789)
msg.Timestamp = 123456789123456789
msg.Version = 1
expected := []byte{77, 197, 250, 41, 30, 163, 192, 239, 48, 104, 58, 175, 36, 81, 96, 58, 118, 107, 73, 4, 153, 182, 33, 199, 144, 156, 110, 226, 93, 85, 160, 180}
expected := []byte{82, 136, 166, 250, 14, 69, 211, 99, 19, 161, 139, 206, 179, 3, 117, 51, 112, 111, 203, 150, 207, 35, 104, 102, 21, 181, 114, 165, 77, 29, 190, 61}
result, err := msg.Hash()
require.NoError(t, err)
require.Equal(t, expected, result)
}

View File

@ -4,7 +4,6 @@
package pb
import (
encoding_binary "encoding/binary"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
io "io"
@ -27,7 +26,7 @@ type WakuMessage struct {
Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
ContentTopic string `protobuf:"bytes,2,opt,name=contentTopic,proto3" json:"contentTopic,omitempty"`
Version uint32 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"`
Timestamp float64 `protobuf:"fixed64,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Timestamp int64 `protobuf:"zigzag64,10,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Proof []byte `protobuf:"bytes,21,opt,name=proof,proto3" json:"proof,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
@ -88,7 +87,7 @@ func (m *WakuMessage) GetVersion() uint32 {
return 0
}
func (m *WakuMessage) GetTimestamp() float64 {
func (m *WakuMessage) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
@ -117,11 +116,11 @@ var fileDescriptor_6f0a20862b3bf714 = []byte{
0x82, 0x71, 0x85, 0x94, 0xb8, 0x78, 0x92, 0xf3, 0xf3, 0x4a, 0x52, 0xf3, 0x4a, 0x42, 0xf2, 0x0b,
0x32, 0x93, 0x25, 0x98, 0x14, 0x18, 0x35, 0x38, 0x83, 0x50, 0xc4, 0x40, 0xba, 0xcb, 0x52, 0x8b,
0x8a, 0x33, 0xf3, 0xf3, 0x24, 0x98, 0x15, 0x18, 0x35, 0x78, 0x83, 0x60, 0x5c, 0x21, 0x19, 0x2e,
0xce, 0x92, 0xcc, 0xdc, 0xd4, 0xe2, 0x92, 0xc4, 0xdc, 0x02, 0x09, 0x16, 0x05, 0x46, 0x0d, 0xc6,
0xce, 0x92, 0xcc, 0xdc, 0xd4, 0xe2, 0x92, 0xc4, 0xdc, 0x02, 0x09, 0x2e, 0x05, 0x46, 0x0d, 0xa1,
0x20, 0x84, 0x80, 0x90, 0x08, 0x17, 0x6b, 0x41, 0x51, 0x7e, 0x7e, 0x9a, 0x84, 0x28, 0xd8, 0x4e,
0x08, 0xc7, 0x49, 0xe0, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63,
0x9c, 0xf1, 0x58, 0x8e, 0x21, 0x89, 0x0d, 0xec, 0x70, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff,
0x0a, 0xc6, 0x91, 0x28, 0xce, 0x00, 0x00, 0x00,
0x33, 0x1b, 0x64, 0x81, 0xce, 0x00, 0x00, 0x00,
}
func (m *WakuMessage) Marshal() (dAtA []byte, err error) {
@ -158,10 +157,9 @@ func (m *WakuMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) {
dAtA[i] = 0xaa
}
if m.Timestamp != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Timestamp))))
i = encodeVarintWakuMessage(dAtA, i, uint64((uint64(m.Timestamp)<<1)^uint64((m.Timestamp>>63))))
i--
dAtA[i] = 0x21
dAtA[i] = 0x50
}
if m.Version != 0 {
i = encodeVarintWakuMessage(dAtA, i, uint64(m.Version))
@ -214,7 +212,7 @@ func (m *WakuMessage) Size() (n int) {
n += 1 + sovWakuMessage(uint64(m.Version))
}
if m.Timestamp != 0 {
n += 9
n += 1 + sozWakuMessage(uint64(m.Timestamp))
}
l = len(m.Proof)
if l > 0 {
@ -346,17 +344,27 @@ func (m *WakuMessage) Unmarshal(dAtA []byte) error {
break
}
}
case 4:
if wireType != 1 {
case 10:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
}
var v uint64
if (iNdEx + 8) > l {
return io.ErrUnexpectedEOF
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.Timestamp = float64(math.Float64frombits(v))
v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63)
m.Timestamp = int64(v)
case 21:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Proof", wireType)

View File

@ -6,6 +6,6 @@ message WakuMessage {
bytes payload = 1;
string contentTopic = 2;
uint32 version = 3;
double timestamp = 4;
sint64 timestamp = 10;
bytes proof = 21;
}

View File

@ -4,7 +4,6 @@
package pb
import (
encoding_binary "encoding/binary"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
io "io"
@ -75,8 +74,8 @@ func (HistoryResponse_Error) EnumDescriptor() ([]byte, []int) {
type Index struct {
Digest []byte `protobuf:"bytes,1,opt,name=digest,proto3" json:"digest,omitempty"`
ReceiverTime float64 `protobuf:"fixed64,2,opt,name=receiverTime,proto3" json:"receiverTime,omitempty"`
SenderTime float64 `protobuf:"fixed64,3,opt,name=senderTime,proto3" json:"senderTime,omitempty"`
ReceiverTime int64 `protobuf:"zigzag64,2,opt,name=receiverTime,proto3" json:"receiverTime,omitempty"`
SenderTime int64 `protobuf:"zigzag64,3,opt,name=senderTime,proto3" json:"senderTime,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -122,14 +121,14 @@ func (m *Index) GetDigest() []byte {
return nil
}
func (m *Index) GetReceiverTime() float64 {
func (m *Index) GetReceiverTime() int64 {
if m != nil {
return m.ReceiverTime
}
return 0
}
func (m *Index) GetSenderTime() float64 {
func (m *Index) GetSenderTime() int64 {
if m != nil {
return m.SenderTime
}
@ -250,8 +249,8 @@ type HistoryQuery struct {
PubsubTopic string `protobuf:"bytes,2,opt,name=pubsubTopic,proto3" json:"pubsubTopic,omitempty"`
ContentFilters []*ContentFilter `protobuf:"bytes,3,rep,name=contentFilters,proto3" json:"contentFilters,omitempty"`
PagingInfo *PagingInfo `protobuf:"bytes,4,opt,name=pagingInfo,proto3" json:"pagingInfo,omitempty"`
StartTime float64 `protobuf:"fixed64,5,opt,name=startTime,proto3" json:"startTime,omitempty"`
EndTime float64 `protobuf:"fixed64,6,opt,name=endTime,proto3" json:"endTime,omitempty"`
StartTime int64 `protobuf:"zigzag64,5,opt,name=startTime,proto3" json:"startTime,omitempty"`
EndTime int64 `protobuf:"zigzag64,6,opt,name=endTime,proto3" json:"endTime,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -311,14 +310,14 @@ func (m *HistoryQuery) GetPagingInfo() *PagingInfo {
return nil
}
func (m *HistoryQuery) GetStartTime() float64 {
func (m *HistoryQuery) GetStartTime() int64 {
if m != nil {
return m.StartTime
}
return 0
}
func (m *HistoryQuery) GetEndTime() float64 {
func (m *HistoryQuery) GetEndTime() int64 {
if m != nil {
return m.EndTime
}
@ -476,31 +475,31 @@ var fileDescriptor_ca6891f77a46e680 = []byte{
0x25, 0x32, 0x4e, 0x63, 0x2c, 0x24, 0x93, 0x7d, 0xd4, 0xea, 0x35, 0x29, 0xe5, 0x4f, 0x8d, 0xee,
0x06, 0x50, 0x1e, 0x44, 0x21, 0x7d, 0xc4, 0xbb, 0x50, 0x09, 0xc5, 0x94, 0x54, 0xea, 0xb0, 0x16,
0x6b, 0xd7, 0xb9, 0xa9, 0xd0, 0x85, 0xba, 0xa4, 0x80, 0xc4, 0x7b, 0x92, 0x17, 0xe2, 0x9a, 0x9c,
0x42, 0x8b, 0xb5, 0x19, 0xdf, 0xd0, 0xb0, 0x09, 0xa0, 0x28, 0x0a, 0x0d, 0x51, 0xd4, 0xc4, 0x9a,
0xe2, 0x7e, 0x65, 0x00, 0x67, 0xfe, 0x54, 0x44, 0xd3, 0x41, 0xf4, 0x36, 0xc6, 0x7d, 0xa8, 0x26,
0xfe, 0x94, 0xce, 0xc5, 0x27, 0xd2, 0x66, 0x25, 0xbe, 0xac, 0xf1, 0x21, 0x54, 0x82, 0x4c, 0xaa,
0x58, 0x6a, 0xa3, 0x5a, 0xc7, 0xf2, 0x92, 0x89, 0xa7, 0x13, 0x72, 0xd3, 0xc0, 0x27, 0x60, 0x85,
0x42, 0x52, 0x90, 0x8a, 0x38, 0xd2, 0x66, 0x8d, 0x8e, 0x93, 0x53, 0x2b, 0x07, 0xaf, 0xbf, 0xe8,
0xf3, 0x15, 0xea, 0x1e, 0x80, 0xb5, 0xd4, 0xb1, 0x0e, 0xd5, 0xe3, 0x6e, 0xef, 0xf9, 0x65, 0x97,
0xf7, 0xed, 0x1d, 0xac, 0xc1, 0xee, 0xe9, 0x88, 0xeb, 0x82, 0xb9, 0x47, 0xf0, 0x5f, 0x2f, 0x8e,
0x52, 0x8a, 0xd2, 0x53, 0xf1, 0x2e, 0x25, 0x99, 0xaf, 0x20, 0x98, 0x0b, 0x17, 0x71, 0x22, 0x02,
0x9d, 0xd9, 0xe2, 0x1b, 0x9a, 0xfb, 0x93, 0x41, 0xfd, 0x99, 0xc8, 0x37, 0x7e, 0xfb, 0x2a, 0x23,
0x79, 0x8b, 0x2d, 0xa8, 0x25, 0xd9, 0x44, 0x65, 0x93, 0xf9, 0x37, 0x05, 0xfd, 0xcd, 0xba, 0x84,
0x4f, 0xa1, 0x11, 0xac, 0xfb, 0x28, 0xa7, 0xd8, 0x2a, 0xb6, 0x6b, 0x9d, 0xff, 0xf3, 0x61, 0x36,
0x12, 0xf0, 0x2d, 0x10, 0x3d, 0x80, 0x64, 0x39, 0xad, 0x53, 0xd2, 0x9b, 0x6a, 0x6c, 0xee, 0x80,
0xaf, 0x11, 0x78, 0x1f, 0x2c, 0x95, 0xfa, 0x32, 0xd5, 0xf7, 0x29, 0xeb, 0xfb, 0xac, 0x04, 0x74,
0x60, 0x97, 0xa2, 0x50, 0xf7, 0x2a, 0xba, 0xb7, 0x28, 0xdd, 0x6f, 0x0c, 0xf6, 0xcc, 0x54, 0x9c,
0x54, 0x12, 0x47, 0x8a, 0xf0, 0x31, 0x54, 0xcd, 0x13, 0x52, 0x4e, 0x41, 0x07, 0xde, 0xcb, 0x9d,
0x2f, 0xfd, 0xab, 0xec, 0xe5, 0x5c, 0xe7, 0x4b, 0x60, 0x2b, 0x68, 0xf1, 0x9f, 0x41, 0x0f, 0xa1,
0x4c, 0x52, 0xc6, 0x52, 0xcf, 0xd4, 0xe8, 0xdc, 0xcb, 0xd1, 0xad, 0x00, 0xde, 0x49, 0x0e, 0xf0,
0x39, 0xe7, 0x3e, 0x82, 0xb2, 0xae, 0xb1, 0x0a, 0xa5, 0xe1, 0x68, 0x78, 0x62, 0xef, 0x20, 0x42,
0x63, 0x30, 0x7c, 0xd3, 0x7d, 0x31, 0xe8, 0x8f, 0x7b, 0xaf, 0xf9, 0xf9, 0x88, 0xdb, 0xcc, 0xfd,
0xcc, 0x00, 0x16, 0xbf, 0x73, 0xd6, 0xc3, 0x07, 0x00, 0x92, 0x6e, 0x32, 0x52, 0xe9, 0x58, 0x84,
0xe6, 0x9e, 0x96, 0x51, 0x06, 0x21, 0x1e, 0x40, 0xf9, 0x26, 0x3f, 0xa2, 0x79, 0x83, 0xf6, 0x5a,
0x0a, 0x7d, 0x5c, 0x3e, 0x6f, 0xe3, 0x21, 0x54, 0xa5, 0x49, 0x65, 0x66, 0xbb, 0xf3, 0x97, 0xc0,
0x7c, 0x09, 0x1d, 0xdb, 0xdf, 0x67, 0x4d, 0xf6, 0x63, 0xd6, 0x64, 0xbf, 0x66, 0x4d, 0xf6, 0xe5,
0x77, 0x73, 0x67, 0x52, 0xd1, 0x7f, 0xc3, 0xa3, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xb7, 0x5c,
0xa1, 0x0e, 0xb2, 0x03, 0x00, 0x00,
0x42, 0x8b, 0xb5, 0x91, 0x6f, 0x68, 0xd8, 0x04, 0x50, 0x14, 0x85, 0x86, 0x28, 0x6a, 0x62, 0x4d,
0x71, 0xbf, 0x32, 0x80, 0x33, 0x7f, 0x2a, 0xa2, 0xe9, 0x20, 0x7a, 0x1b, 0xe3, 0x3e, 0x54, 0x13,
0x7f, 0x4a, 0xe7, 0xe2, 0x13, 0x69, 0xb3, 0x12, 0x5f, 0xd6, 0xf8, 0x10, 0x2a, 0x41, 0x26, 0x55,
0x2c, 0xb5, 0x51, 0xad, 0x63, 0x79, 0xc9, 0xc4, 0xd3, 0x09, 0xb9, 0x69, 0xe0, 0x13, 0xb0, 0x42,
0x21, 0x29, 0x48, 0x45, 0x1c, 0x69, 0xb3, 0x46, 0xc7, 0xc9, 0xa9, 0x95, 0x83, 0xd7, 0x5f, 0xf4,
0xf9, 0x0a, 0x75, 0x0f, 0xc0, 0x5a, 0xea, 0x58, 0x87, 0xea, 0x71, 0xb7, 0xf7, 0xfc, 0xb2, 0xcb,
0xfb, 0xf6, 0x0e, 0xd6, 0x60, 0xf7, 0x74, 0xc4, 0x75, 0xc1, 0xdc, 0x23, 0xf8, 0xaf, 0x17, 0x47,
0x29, 0x45, 0xe9, 0xa9, 0x78, 0x97, 0x92, 0xcc, 0x57, 0x10, 0xcc, 0x85, 0x8b, 0x38, 0x11, 0x81,
0xce, 0x6c, 0xf1, 0x0d, 0xcd, 0xfd, 0xc9, 0xa0, 0xfe, 0x4c, 0xe4, 0x1b, 0xbf, 0x7d, 0x95, 0x91,
0xbc, 0xc5, 0x16, 0xd4, 0x92, 0x6c, 0xa2, 0xb2, 0xc9, 0xfc, 0x9b, 0x82, 0xfe, 0x66, 0x5d, 0xc2,
0xa7, 0xd0, 0x08, 0xd6, 0x7d, 0x94, 0x53, 0x6c, 0x15, 0xdb, 0xb5, 0xce, 0xff, 0xf9, 0x30, 0x1b,
0x09, 0xf8, 0x16, 0x88, 0x1e, 0x40, 0xb2, 0x9c, 0xd6, 0x29, 0xe9, 0x4d, 0x35, 0x36, 0x77, 0xc0,
0xd7, 0x08, 0xbc, 0x0f, 0x96, 0x4a, 0x7d, 0x99, 0xea, 0xfb, 0x94, 0xf5, 0x7d, 0x56, 0x02, 0x3a,
0xb0, 0x4b, 0x51, 0xa8, 0x7b, 0x15, 0xdd, 0x5b, 0x94, 0xee, 0x37, 0x06, 0x7b, 0x66, 0x2a, 0x4e,
0x2a, 0x89, 0x23, 0x45, 0xf8, 0x18, 0xaa, 0xe6, 0x09, 0x29, 0xa7, 0xa0, 0x03, 0xef, 0xe5, 0xce,
0x97, 0xfe, 0x55, 0xf6, 0x72, 0xae, 0xf3, 0x25, 0xb0, 0x15, 0xb4, 0xf8, 0xcf, 0xa0, 0x87, 0x50,
0x26, 0x29, 0x63, 0xa9, 0x67, 0x6a, 0x74, 0xee, 0xe5, 0xe8, 0x56, 0x00, 0xef, 0x24, 0x07, 0xf8,
0x9c, 0x73, 0x1f, 0x41, 0x59, 0xd7, 0x58, 0x85, 0xd2, 0x70, 0x34, 0x3c, 0xb1, 0x77, 0x10, 0xa1,
0x31, 0x18, 0xbe, 0xe9, 0xbe, 0x18, 0xf4, 0xc7, 0xbd, 0xd7, 0xfc, 0x7c, 0xc4, 0x6d, 0xe6, 0x7e,
0x66, 0x00, 0x8b, 0xdf, 0x39, 0xeb, 0xe1, 0x03, 0x00, 0x49, 0x37, 0x19, 0xa9, 0x74, 0x2c, 0x42,
0x73, 0x4f, 0xcb, 0x28, 0x83, 0x10, 0x0f, 0xa0, 0x7c, 0x93, 0x1f, 0xd1, 0xbc, 0x41, 0x7b, 0x2d,
0x85, 0x3e, 0x2e, 0x9f, 0xb7, 0xf1, 0x10, 0xaa, 0xd2, 0xa4, 0x32, 0xb3, 0xdd, 0xf9, 0x4b, 0x60,
0xbe, 0x84, 0x8e, 0xed, 0xef, 0xb3, 0x26, 0xfb, 0x31, 0x6b, 0xb2, 0x5f, 0xb3, 0x26, 0xfb, 0xf2,
0xbb, 0xb9, 0x33, 0xa9, 0xe8, 0xbf, 0xe1, 0xd1, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x14, 0x8f,
0x84, 0xdf, 0xb2, 0x03, 0x00, 0x00,
}
func (m *Index) Marshal() (dAtA []byte, err error) {
@ -528,16 +527,14 @@ func (m *Index) MarshalToSizedBuffer(dAtA []byte) (int, error) {
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.SenderTime != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.SenderTime))))
i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.SenderTime)<<1)^uint64((m.SenderTime>>63))))
i--
dAtA[i] = 0x19
dAtA[i] = 0x18
}
if m.ReceiverTime != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.ReceiverTime))))
i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.ReceiverTime)<<1)^uint64((m.ReceiverTime>>63))))
i--
dAtA[i] = 0x11
dAtA[i] = 0x10
}
if len(m.Digest) > 0 {
i -= len(m.Digest)
@ -657,16 +654,14 @@ func (m *HistoryQuery) MarshalToSizedBuffer(dAtA []byte) (int, error) {
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.EndTime != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.EndTime))))
i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.EndTime)<<1)^uint64((m.EndTime>>63))))
i--
dAtA[i] = 0x31
dAtA[i] = 0x30
}
if m.StartTime != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.StartTime))))
i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.StartTime)<<1)^uint64((m.StartTime>>63))))
i--
dAtA[i] = 0x29
dAtA[i] = 0x28
}
if m.PagingInfo != nil {
{
@ -842,10 +837,10 @@ func (m *Index) Size() (n int) {
n += 1 + l + sovWakuStore(uint64(l))
}
if m.ReceiverTime != 0 {
n += 9
n += 1 + sozWakuStore(uint64(m.ReceiverTime))
}
if m.SenderTime != 0 {
n += 9
n += 1 + sozWakuStore(uint64(m.SenderTime))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
@ -912,10 +907,10 @@ func (m *HistoryQuery) Size() (n int) {
n += 1 + l + sovWakuStore(uint64(l))
}
if m.StartTime != 0 {
n += 9
n += 1 + sozWakuStore(uint64(m.StartTime))
}
if m.EndTime != 0 {
n += 9
n += 1 + sozWakuStore(uint64(m.EndTime))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
@ -1042,27 +1037,47 @@ func (m *Index) Unmarshal(dAtA []byte) error {
}
iNdEx = postIndex
case 2:
if wireType != 1 {
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field ReceiverTime", wireType)
}
var v uint64
if (iNdEx + 8) > l {
return io.ErrUnexpectedEOF
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuStore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.ReceiverTime = float64(math.Float64frombits(v))
v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63)
m.ReceiverTime = int64(v)
case 3:
if wireType != 1 {
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field SenderTime", wireType)
}
var v uint64
if (iNdEx + 8) > l {
return io.ErrUnexpectedEOF
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuStore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.SenderTime = float64(math.Float64frombits(v))
v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63)
m.SenderTime = int64(v)
default:
iNdEx = preIndex
skippy, err := skipWakuStore(dAtA[iNdEx:])
@ -1425,27 +1440,47 @@ func (m *HistoryQuery) Unmarshal(dAtA []byte) error {
}
iNdEx = postIndex
case 5:
if wireType != 1 {
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field StartTime", wireType)
}
var v uint64
if (iNdEx + 8) > l {
return io.ErrUnexpectedEOF
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuStore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.StartTime = float64(math.Float64frombits(v))
v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63)
m.StartTime = int64(v)
case 6:
if wireType != 1 {
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field EndTime", wireType)
}
var v uint64
if (iNdEx + 8) > l {
return io.ErrUnexpectedEOF
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuStore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.EndTime = float64(math.Float64frombits(v))
v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63)
m.EndTime = int64(v)
default:
iNdEx = preIndex
skippy, err := skipWakuStore(dAtA[iNdEx:])

View File

@ -6,8 +6,8 @@ import "waku_message.proto";
message Index {
bytes digest = 1;
double receiverTime = 2;
double senderTime = 3;
sint64 receiverTime = 2;
sint64 senderTime = 3;
}
message PagingInfo {
@ -28,8 +28,8 @@ message HistoryQuery {
string pubsubTopic = 2;
repeated ContentFilter contentFilters = 3;
PagingInfo pagingInfo = 4; // used for pagination
double startTime = 5;
double endTime = 6;
sint64 startTime = 5;
sint64 endTime = 6;
}
message HistoryResponse {

View File

@ -28,6 +28,7 @@ func (self *MessageQueue) Push(msg IndexedWakuMessage) error {
var k [32]byte
copy(k[:], msg.index.Digest)
if _, ok := self.seen[k]; ok {
return ErrDuplicatedMessage
}

View File

@ -22,11 +22,11 @@ func TestFindLastSeenMessage(t *testing.T) {
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test")
s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger())
s.storeMessage(msg1)
s.storeMessage(msg3)
s.storeMessage(msg5)
s.storeMessage(msg2)
s.storeMessage(msg4)
_ = s.storeMessage(msg1)
_ = s.storeMessage(msg3)
_ = s.storeMessage(msg5)
_ = s.storeMessage(msg2)
_ = s.storeMessage(msg4)
require.Equal(t, msg5.Message().Timestamp, s.findLastSeen())
}
@ -48,8 +48,9 @@ func TestResume(t *testing.T) {
contentTopic = "2"
}
msg := protocol.NewEnvelope(tests.CreateWakuMessage(contentTopic, float64(time.Duration(i)*time.Second)), "test")
s1.storeMessage(msg)
wakuMessage := tests.CreateWakuMessage(contentTopic, (time.Duration(i) * time.Second).Nanoseconds())
msg := protocol.NewEnvelope(wakuMessage, "test")
_ = s1.storeMessage(msg)
}
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
@ -91,9 +92,9 @@ func TestResumeWithListOfPeers(t *testing.T) {
s1.Start(ctx)
defer s1.Stop()
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)}
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: 0}
s1.storeMessage(protocol.NewEnvelope(msg0, "test"))
_ = s1.storeMessage(protocol.NewEnvelope(msg0, "test"))
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
@ -124,9 +125,9 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
s1.Start(ctx)
defer s1.Stop()
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)}
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: 0}
s1.storeMessage(protocol.NewEnvelope(msg0, "test"))
_ = s1.storeMessage(protocol.NewEnvelope(msg0, "test"))
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)

View File

@ -27,7 +27,7 @@ import (
)
// StoreID_v20beta3 is the current Waku Store protocol identifier
const StoreID_v20beta3 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta3")
const StoreID_v20beta3 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta4")
// MaxPageSize is the maximum number of waku messages to return per page
const MaxPageSize = 100
@ -193,8 +193,8 @@ type MessageProvider interface {
type Query struct {
Topic string
ContentTopics []string
StartTime float64
EndTime float64
StartTime int64
EndTime int64
}
// Result represents a valid response from a store node
@ -296,7 +296,7 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) {
for _, storedMessage := range storedMessages {
idx := &pb.Index{
Digest: storedMessage.ID,
ReceiverTime: float64(storedMessage.ReceiverTime),
ReceiverTime: storedMessage.ReceiverTime,
}
_ = store.addToMessageQueue(storedMessage.PubsubTopic, idx, storedMessage.Message)
@ -309,21 +309,21 @@ func (store *WakuStore) addToMessageQueue(pubsubTopic string, idx *pb.Index, msg
return store.messageQueue.Push(IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic})
}
func (store *WakuStore) storeMessage(env *protocol.Envelope) {
func (store *WakuStore) storeMessage(env *protocol.Envelope) error {
index, err := computeIndex(env)
if err != nil {
store.log.Error("could not calculate message index", err)
return
return err
}
err = store.addToMessageQueue(env.PubsubTopic(), index, env.Message())
if err == ErrDuplicatedMessage {
return
return err
}
if store.msgProvider == nil {
metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length())
return
return err
}
// TODO: Move this to a separate go routine if DB writes becomes a bottleneck
@ -331,16 +331,17 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) {
if err != nil {
store.log.Error("could not store message", err)
metrics.RecordStoreError(store.ctx, "store_failure")
return
return err
}
metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length())
return nil
}
func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
defer store.wg.Done()
for envelope := range store.MsgC {
store.storeMessage(envelope)
_ = store.storeMessage(envelope)
}
}
@ -650,6 +651,7 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c
result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestId())
if err == nil {
resultChan <- result
return
}
store.log.Error(fmt.Errorf("resume history with peer %s failed: %w", peer, err))
}()
@ -672,8 +674,8 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c
return nil, ErrFailedQuery
}
func (store *WakuStore) findLastSeen() float64 {
var lastSeenTime float64 = 0
func (store *WakuStore) findLastSeen() int64 {
var lastSeenTime int64 = 0
for imsg := range store.messageQueue.Messages() {
if imsg.msg.Timestamp > lastSeenTime {
lastSeenTime = imsg.msg.Timestamp
@ -682,6 +684,13 @@ func (store *WakuStore) findLastSeen() float64 {
return lastSeenTime
}
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}
// Resume retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
// messages are stored in the store node's messages field and in the message db
// the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
@ -698,9 +707,9 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
currentTime := utils.GetUnixEpoch()
lastSeenTime := store.findLastSeen()
var offset float64 = 200000
var offset int64 = int64(20 * time.Nanosecond)
currentTime = currentTime + offset
lastSeenTime = math.Max(lastSeenTime-offset, 0)
lastSeenTime = max(lastSeenTime-offset, 0)
rpc := &pb.HistoryQuery{
PubsubTopic: pubsubTopic,
@ -728,13 +737,16 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
return -1, ErrFailedToResumeHistory
}
msgCount := 0
for _, msg := range messages {
store.storeMessage(protocol.NewEnvelope(msg, pubsubTopic))
if err = store.storeMessage(protocol.NewEnvelope(msg, pubsubTopic)); err == nil {
msgCount++
}
}
store.log.Info("Retrieved messages since the last online time: ", len(messages))
return len(messages), nil
return msgCount, nil
}
// TODO: queryWithAccounting

View File

@ -96,8 +96,8 @@ func createSampleList(s int) []IndexedWakuMessage {
Payload: []byte{byte(i)},
},
index: &pb.Index{
ReceiverTime: float64(i),
SenderTime: float64(i),
ReceiverTime: int64(i),
SenderTime: int64(i),
Digest: []byte{1},
},
})

View File

@ -38,7 +38,7 @@ func TestStorePersistence(t *testing.T) {
Timestamp: utils.GetUnixEpoch(),
}
s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
_ = s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
s2 := NewWakuStore(nil, nil, dbStore, 0, 0, tests.Logger())
s2.fetchDBRecords(ctx)
@ -46,5 +46,6 @@ func TestStorePersistence(t *testing.T) {
require.Equal(t, msg, s2.messageQueue.messages[0].msg)
// Storing a duplicated message should not crash. It's okay to generate an error log in this case
s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
err = s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
require.ErrorIs(t, err, ErrDuplicatedMessage)
}

View File

@ -73,11 +73,11 @@ func TestWakuStoreProtocolNext(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
msg1 := tests.CreateWakuMessage(topic1, float64(1))
msg2 := tests.CreateWakuMessage(topic1, float64(2))
msg3 := tests.CreateWakuMessage(topic1, float64(3))
msg4 := tests.CreateWakuMessage(topic1, float64(4))
msg5 := tests.CreateWakuMessage(topic1, float64(5))
msg1 := tests.CreateWakuMessage(topic1, 1)
msg2 := tests.CreateWakuMessage(topic1, 2)
msg3 := tests.CreateWakuMessage(topic1, 3)
msg4 := tests.CreateWakuMessage(topic1, 4)
msg5 := tests.CreateWakuMessage(topic1, 5)
s1.MsgC <- protocol.NewEnvelope(msg1, pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg2, pubsubTopic1)

View File

@ -18,8 +18,8 @@ func TestStoreQuery(t *testing.T) {
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger())
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
response := s.FindMessages(&pb.HistoryQuery{
ContentFilters: []*pb.ContentFilter{
@ -45,9 +45,9 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger())
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
s.storeMessage(protocol.NewEnvelope(msg3, defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg3, defaultPubSubTopic))
response := s.FindMessages(&pb.HistoryQuery{
ContentFilters: []*pb.ContentFilter{
@ -78,9 +78,9 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger())
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
response := s.FindMessages(&pb.HistoryQuery{
PubsubTopic: pubsubTopic1,
@ -110,9 +110,9 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger())
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2))
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
response := s.FindMessages(&pb.HistoryQuery{
PubsubTopic: pubsubTopic1,
@ -132,9 +132,9 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger())
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1))
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1))
response := s.FindMessages(&pb.HistoryQuery{
PubsubTopic: pubsubTopic1,
@ -154,7 +154,7 @@ func TestStoreQueryForwardPagination(t *testing.T) {
for i := 0; i < 10; i++ {
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
msg.Payload = []byte{byte(i)}
s.storeMessage(protocol.NewEnvelope(msg, pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg, pubsubTopic1))
}
response := s.FindMessages(&pb.HistoryQuery{
@ -182,7 +182,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s.storeMessage(protocol.NewEnvelope(msg, pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg, pubsubTopic1))
}
@ -208,16 +208,16 @@ func TestTemporalHistoryQueries(t *testing.T) {
if i%2 == 0 {
contentTopic = "2"
}
msg := tests.CreateWakuMessage(contentTopic, float64(i))
s.storeMessage(protocol.NewEnvelope(msg, "test"))
msg := tests.CreateWakuMessage(contentTopic, int64(i))
_ = s.storeMessage(protocol.NewEnvelope(msg, "test"))
messages = append(messages, msg)
}
// handle temporal history query with a valid time window
response := s.FindMessages(&pb.HistoryQuery{
ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}},
StartTime: float64(2),
EndTime: float64(5),
StartTime: int64(2),
EndTime: int64(5),
})
require.Len(t, response.Messages, 2)
@ -227,8 +227,8 @@ func TestTemporalHistoryQueries(t *testing.T) {
// handle temporal history query with a zero-size time window
response = s.FindMessages(&pb.HistoryQuery{
ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}},
StartTime: float64(2),
EndTime: float64(2),
StartTime: int64(2),
EndTime: int64(2),
})
require.Len(t, response.Messages, 0)
@ -236,8 +236,8 @@ func TestTemporalHistoryQueries(t *testing.T) {
// handle temporal history query with an invalid time window
response = s.FindMessages(&pb.HistoryQuery{
ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}},
StartTime: float64(5),
EndTime: float64(2),
StartTime: int64(5),
EndTime: int64(2),
})
// time window is invalid since start time > end time
// perhaps it should return an error?

View File

@ -27,8 +27,8 @@ type StorePagingOptions struct {
type StoreMessagesArgs struct {
Topic string `json:"pubsubTopic,omitempty"`
ContentFilters []string `json:"contentFilters,omitempty"`
StartTime float64 `json:"startTime,omitempty"`
EndTime float64 `json:"endTime,omitempty"`
StartTime int64 `json:"startTime,omitempty"`
EndTime int64 `json:"endTime,omitempty"`
PagingOptions StorePagingOptions `json:"pagingOptions,omitempty"`
}

View File

@ -2,14 +2,13 @@ package utils
import "time"
// GetUnixEpoch converts a time into a unix timestamp with the integer part
// representing seconds and the decimal part representing subseconds
func GetUnixEpochFrom(now time.Time) float64 {
return float64(now.UnixNano()) / float64(time.Second)
// GetUnixEpoch converts a time into a unix timestamp with nanoseconds
func GetUnixEpochFrom(now time.Time) int64 {
return now.UnixNano()
}
// GetUnixEpoch returns the current time in unix timestamp with the integer part
// representing seconds and the decimal part representing subseconds
func GetUnixEpoch() float64 {
func GetUnixEpoch() int64 {
return GetUnixEpochFrom(time.Now())
}

View File

@ -10,7 +10,7 @@ import (
func TestGetUnixEpochFrom(t *testing.T) {
loc := time.UTC
timestamp := GetUnixEpochFrom(time.Date(2019, 1, 1, 0, 0, 0, 0, loc))
require.Equal(t, float64(1546300800), timestamp)
require.Equal(t, int64(1546300800*time.Second), timestamp)
timestamp = GetUnixEpoch()
require.NotNil(t, timestamp)