diff --git a/dbutils/insert.go b/dbutils/insert.go index 883f1001..7fae35f0 100644 --- a/dbutils/insert.go +++ b/dbutils/insert.go @@ -134,7 +134,7 @@ func main() { } envelope := protocol.NewEnvelope(&msg, msg.Timestamp, pubsubTopic) - dbKey := persistence.NewDBKey(uint64(msg.Timestamp), pubsubTopic, envelope.Index().Digest) + dbKey := persistence.NewDBKey(uint64(msg.Timestamp), uint64(time.Now().UnixNano()), pubsubTopic, envelope.Index().Digest) _, err = stmt.Exec(dbKey.Bytes(), msg.Timestamp, msg.Timestamp, msg.ContentTopic, pubsubTopic, msg.Payload, msg.Version) if err != nil { diff --git a/waku/persistence/db_key.go b/waku/persistence/db_key.go index 47ad6a49..3eddc6d1 100644 --- a/waku/persistence/db_key.go +++ b/waku/persistence/db_key.go @@ -32,17 +32,19 @@ func (k *DBKey) Bytes() []byte { return k.raw } -func (k *DBKey) Digest() []byte { - return k.raw[TimestampLength+PubsubTopicLength : TimestampLength+PubsubTopicLength+DigestLength] -} - // NewDBKey creates a new DBKey with the given values. -func NewDBKey(timestamp uint64, pubsubTopic string, digest []byte) *DBKey { +func NewDBKey(senderTimestamp uint64, receiverTimestamp uint64, pubsubTopic string, digest []byte) *DBKey { pubSubHash := sha256.Sum256([]byte(pubsubTopic)) var k DBKey k.raw = make([]byte, DBKeyLength) - binary.BigEndian.PutUint64(k.raw, timestamp) + + if senderTimestamp == 0 { + binary.BigEndian.PutUint64(k.raw, receiverTimestamp) + } else { + binary.BigEndian.PutUint64(k.raw, senderTimestamp) + } + copy(k.raw[TimestampLength:], pubSubHash[:]) copy(k.raw[TimestampLength+PubsubTopicLength:], digest) diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 5958af85..7959a0b1 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -224,7 +224,7 @@ func (d *DBStore) Put(env *protocol.Envelope) error { } cursor := env.Index() - dbKey := NewDBKey(uint64(cursor.SenderTime), env.PubsubTopic(), env.Index().Digest) + dbKey := NewDBKey(uint64(cursor.SenderTime), uint64(cursor.ReceiverTime), env.PubsubTopic(), env.Index().Digest) _, err = stmt.Exec(dbKey.Bytes(), cursor.ReceiverTime, env.Message().Timestamp, env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().Version) if err != nil { return err @@ -275,7 +275,7 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err if query.PagingInfo.Cursor != nil { usesCursor = true var exists bool - cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest) + cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest) err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = ?)", cursorDBKey.Bytes(), @@ -301,7 +301,7 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err if query.StartTime != 0 { if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_BACKWARD { conditions = append(conditions, "id >= ?") - startTimeDBKey := NewDBKey(uint64(query.StartTime), "", []byte{}) + startTimeDBKey := NewDBKey(uint64(query.StartTime), uint64(query.StartTime), "", []byte{}) parameters = append(parameters, startTimeDBKey.Bytes()) } @@ -310,7 +310,7 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err if query.EndTime != 0 { if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_FORWARD { conditions = append(conditions, "id <= ?") - endTimeDBKey := NewDBKey(uint64(query.EndTime), "", []byte{}) + endTimeDBKey := NewDBKey(uint64(query.EndTime), uint64(query.EndTime), "", []byte{}) parameters = append(parameters, endTimeDBKey.Bytes()) } }