diff --git a/examples/basic2/go.sum b/examples/basic2/go.sum index d65fd35f..426e9f9f 100644 --- a/examples/basic2/go.sum +++ b/examples/basic2/go.sum @@ -805,6 +805,8 @@ github.com/status-im/go-wakurelay-pubsub v0.4.2 h1:F4UGcP80H0PGaeJ0mRMzA1Ux3DKYi github.com/status-im/go-wakurelay-pubsub v0.4.2/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210711180556-9afd35dadd3f h1:/KXMnxtAe0ZrbErvgZPkKilpLCmd7g1CIKQ4x17Al5I= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210711180556-9afd35dadd3f/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= +github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4 h1:LaoaRVhtEu5kCzcEVn+l3sprDbTqpzWFsVT2lmAcHcA= +github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= diff --git a/examples/basic2/main.go b/examples/basic2/main.go index 1b3ed8c9..54b1296a 100644 --- a/examples/basic2/main.go +++ b/examples/basic2/main.go @@ -15,6 +15,7 @@ import ( logging "github.com/ipfs/go-log" "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/utils" ) var log = logging.Logger("basic2") @@ -68,7 +69,7 @@ func randomHex(n int) (string, error) { func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { var contentTopic string = "test" var version uint32 = 0 - var timestamp float64 = float64(time.Now().UnixNano()) + var timestamp float64 = utils.GetUnixEpoch() p := new(node.Payload) p.Data = []byte(wakuNode.ID() + ": " + msgContent) diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index 1a09d5d5..8bcfdf3b 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -13,6 +13,7 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/filter" wpb "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" + "github.com/status-im/go-waku/waku/v2/utils" "golang.org/x/crypto/pbkdf2" ) @@ -93,7 +94,7 @@ func (cr *Chat) Publish(ctx context.Context, message string) error { } var version uint32 - var timestamp float64 = float64(time.Now().UnixNano()) + var timestamp float64 = utils.GetUnixEpoch() var keyInfo *node.KeyInfo = &node.KeyInfo{} if cr.useV1Payload { // Use WakuV1 encryption diff --git a/examples/chat2/go.sum b/examples/chat2/go.sum index 4f18125b..487aeb32 100644 --- a/examples/chat2/go.sum +++ b/examples/chat2/go.sum @@ -952,6 +952,10 @@ github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210711180556-9afd35dadd3f h1 github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210711180556-9afd35dadd3f/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4 h1:LaoaRVhtEu5kCzcEVn+l3sprDbTqpzWFsVT2lmAcHcA= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= +github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729133637-e321eb74e8b4 h1:LRner8fJfKPX/NCzLlfToqQwNLr6u/EwSAZYFESSHtg= +github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729133637-e321eb74e8b4/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= +github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729143434-184b435d0b0c h1:31bXRinMJMnexs0tohXJ96MmVs0uuuNlcaAWfgItwn0= +github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729143434-184b435d0b0c/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570/go.mod h1:8OR4w3TdeIHIh1g6EMY5p0gVNOovcWC+1vpc7naMuAw= github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3/go.mod h1:hpGUWaI9xL8pRQCTXQgocU38Qw1g0Us7n5PxxTwTCYU= diff --git a/examples/peer_events/build/peer_events b/examples/peer_events/build/peer_events new file mode 100755 index 00000000..26c343a8 Binary files /dev/null and b/examples/peer_events/build/peer_events differ diff --git a/examples/peer_events/go.sum b/examples/peer_events/go.sum index 53bfcedf..4a5c1fb0 100644 --- a/examples/peer_events/go.sum +++ b/examples/peer_events/go.sum @@ -812,6 +812,8 @@ github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210629085338-e9c89c8f00f5 h1 github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210629085338-e9c89c8f00f5/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210711180556-9afd35dadd3f h1:/KXMnxtAe0ZrbErvgZPkKilpLCmd7g1CIKQ4x17Al5I= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210711180556-9afd35dadd3f/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= +github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4 h1:LaoaRVhtEu5kCzcEVn+l3sprDbTqpzWFsVT2lmAcHcA= +github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/status-go/extkeys v1.0.0/go.mod h1:GdqJbrcpkNm5ZsSCpp+PdMxnXx+OcRBdm3PI0rs1FpU= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/examples/peer_events/main.go b/examples/peer_events/main.go index ee9f8c41..b53ea914 100644 --- a/examples/peer_events/main.go +++ b/examples/peer_events/main.go @@ -15,6 +15,7 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/store" + "github.com/status-im/go-waku/waku/v2/utils" ) var log = logging.Logger("peer_events") @@ -168,7 +169,7 @@ func randomHex(n int) (string, error) { func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { var version uint32 = 0 - var timestamp float64 = float64(time.Now().UnixNano()) + var timestamp float64 = utils.GetUnixEpoch() p := new(node.Payload) p.Data = []byte(wakuNode.ID() + ": " + msgContent) diff --git a/go.mod b/go.mod index eac605c1..37a8c45d 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/spf13/cobra v1.1.3 github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/viper v1.7.1 - github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4 + github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729143434-184b435d0b0c go.opencensus.io v0.23.0 gopkg.in/ini.v1 v1.62.0 // indirect ) diff --git a/go.sum b/go.sum index 8dc7454d..1d5c686d 100644 --- a/go.sum +++ b/go.sum @@ -842,6 +842,10 @@ github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721190623-e23032977d88 h1 github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721190623-e23032977d88/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4 h1:LaoaRVhtEu5kCzcEVn+l3sprDbTqpzWFsVT2lmAcHcA= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= +github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729133637-e321eb74e8b4 h1:LRner8fJfKPX/NCzLlfToqQwNLr6u/EwSAZYFESSHtg= +github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729133637-e321eb74e8b4/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= +github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729143434-184b435d0b0c h1:31bXRinMJMnexs0tohXJ96MmVs0uuuNlcaAWfgItwn0= +github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729143434-184b435d0b0c/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/status-go/extkeys v1.0.0/go.mod h1:GdqJbrcpkNm5ZsSCpp+PdMxnXx+OcRBdm3PI0rs1FpU= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 3f53fdf9..c8d907ff 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -57,8 +57,8 @@ func NewDBStore(opt DBOption) (*DBStore, error) { func (d *DBStore) createTable() error { sqlStmt := `CREATE TABLE IF NOT EXISTS message ( id BLOB PRIMARY KEY, - receiverTimestamp INTEGER NOT NULL, - senderTimestamp INTEGER NOT NULL, + receiverTimestamp REAL NOT NULL, + senderTimestamp REAL NOT NULL, contentTopic BLOB NOT NULL, pubsubTopic BLOB NOT NULL, payload BLOB, @@ -78,11 +78,11 @@ func (d *DBStore) Stop() { // Inserts a WakuMessage into the DB func (d *DBStore) Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMessage) error { - stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES (?, ?, ?, ?, ?, ?)") + stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?)") if err != nil { return err } - _, err = stmt.Exec(cursor.Digest, uint64(cursor.ReceiverTime), uint64(message.Timestamp), message.ContentTopic, pubsubTopic, message.Payload, message.Version) + _, err = stmt.Exec(cursor.Digest, cursor.ReceiverTime, message.Timestamp, message.ContentTopic, pubsubTopic, message.Payload, message.Version) if err != nil { return err } @@ -92,7 +92,7 @@ func (d *DBStore) Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMess // Returns all the stored WakuMessages func (d *DBStore) GetAll() ([]store.StoredMessage, error) { - rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY timestamp ASC") + rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC") if err != nil { return nil, err } @@ -103,8 +103,8 @@ func (d *DBStore) GetAll() ([]store.StoredMessage, error) { for rows.Next() { var id []byte - var receiverTimestamp int64 - var senderTimestamp int64 + var receiverTimestamp float64 + var senderTimestamp float64 var contentTopic string var payload []byte var version uint32 @@ -118,7 +118,7 @@ func (d *DBStore) GetAll() ([]store.StoredMessage, error) { msg := new(pb.WakuMessage) msg.ContentTopic = contentTopic msg.Payload = payload - msg.Timestamp = float64(senderTimestamp) + msg.Timestamp = senderTimestamp msg.Version = version record := store.StoredMessage{ diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index a4d79a0b..bf60ae1d 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -33,7 +33,6 @@ var log = logging.Logger("wakustore") const WakuStoreCodec = "/vac/waku/store/2.0.0-beta3" const WakuStoreProtocolId = libp2pProtocol.ID(WakuStoreCodec) const MaxPageSize = 100 // Maximum number of waku messages in each page -const DefaultContentTopic = "/waku/2/default-content/proto" var ( ErrNoPeersAvailable = errors.New("no suitable remote peers") @@ -186,7 +185,7 @@ func (w *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse { type StoredMessage struct { ID []byte PubsubTopic string - ReceiverTime int64 + ReceiverTime float64 Message *pb.WakuMessage } @@ -203,10 +202,10 @@ type IndexedWakuMessage struct { } type WakuStore struct { - ctx context.Context - MsgC chan *protocol.Envelope - messages []IndexedWakuMessage - messageSet map[[32]byte]struct{} + ctx context.Context + MsgC chan *protocol.Envelope + messages []IndexedWakuMessage + seen map[[32]byte]struct{} messagesMutex sync.Mutex @@ -222,6 +221,7 @@ func NewWakuStore(shouldStoreMessages bool, p MessageProvider) *WakuStore { wakuStore.MsgC = make(chan *protocol.Envelope) wakuStore.msgProvider = p wakuStore.storeMsgs = shouldStoreMessages + wakuStore.seen = make(map[[32]byte]struct{}) return wakuStore } @@ -259,7 +259,7 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host, peerChan chan *e storedMessages, err := store.msgProvider.GetAll() if err != nil { - log.Error("could not load DBProvider messages") + log.Error("could not load DBProvider messages", err) stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_load_failure")}, metrics.Errors.M(1)) return } @@ -284,11 +284,11 @@ func (store *WakuStore) storeMessageWithIndex(pubsubTopic string, idx *pb.Index, var k [32]byte copy(k[:], idx.Digest) - if _, ok := store.messageSet[k]; ok { + if _, ok := store.seen[k]; ok { return } - store.messageSet[k] = struct{}{} + store.seen[k] = struct{}{} store.messages = append(store.messages, IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic}) } @@ -363,7 +363,7 @@ func computeIndex(msg *pb.WakuMessage) (*pb.Index, error) { digest := sha256.Sum256(data) return &pb.Index{ Digest: digest[:], - ReceiverTime: float64(time.Now().UnixNano()), + ReceiverTime: utils.GetUnixEpoch(), SenderTime: msg.Timestamp, }, nil } diff --git a/waku/v2/utils/time.go b/waku/v2/utils/time.go new file mode 100644 index 00000000..2d7b0aba --- /dev/null +++ b/waku/v2/utils/time.go @@ -0,0 +1,7 @@ +package utils + +import "time" + +func GetUnixEpoch() float64 { + return float64(time.Now().UnixNano()) / float64(time.Second) +}