fix: insert receiver timestamp if sender timestamp is 0

This commit is contained in:
Richard Ramos 2022-12-15 20:55:13 -04:00 committed by RichΛrd
parent f1fd8b354e
commit a0c2f96c55
3 changed files with 13 additions and 11 deletions

View File

@ -134,7 +134,7 @@ func main() {
}
envelope := protocol.NewEnvelope(&msg, msg.Timestamp, pubsubTopic)
dbKey := persistence.NewDBKey(uint64(msg.Timestamp), pubsubTopic, envelope.Index().Digest)
dbKey := persistence.NewDBKey(uint64(msg.Timestamp), uint64(time.Now().UnixNano()), pubsubTopic, envelope.Index().Digest)
_, err = stmt.Exec(dbKey.Bytes(), msg.Timestamp, msg.Timestamp, msg.ContentTopic, pubsubTopic, msg.Payload, msg.Version)
if err != nil {

View File

@ -32,17 +32,19 @@ func (k *DBKey) Bytes() []byte {
return k.raw
}
func (k *DBKey) Digest() []byte {
return k.raw[TimestampLength+PubsubTopicLength : TimestampLength+PubsubTopicLength+DigestLength]
}
// NewDBKey creates a new DBKey with the given values.
func NewDBKey(timestamp uint64, pubsubTopic string, digest []byte) *DBKey {
func NewDBKey(senderTimestamp uint64, receiverTimestamp uint64, pubsubTopic string, digest []byte) *DBKey {
pubSubHash := sha256.Sum256([]byte(pubsubTopic))
var k DBKey
k.raw = make([]byte, DBKeyLength)
binary.BigEndian.PutUint64(k.raw, timestamp)
if senderTimestamp == 0 {
binary.BigEndian.PutUint64(k.raw, receiverTimestamp)
} else {
binary.BigEndian.PutUint64(k.raw, senderTimestamp)
}
copy(k.raw[TimestampLength:], pubSubHash[:])
copy(k.raw[TimestampLength+PubsubTopicLength:], digest)

View File

@ -224,7 +224,7 @@ func (d *DBStore) Put(env *protocol.Envelope) error {
}
cursor := env.Index()
dbKey := NewDBKey(uint64(cursor.SenderTime), env.PubsubTopic(), env.Index().Digest)
dbKey := NewDBKey(uint64(cursor.SenderTime), uint64(cursor.ReceiverTime), env.PubsubTopic(), env.Index().Digest)
_, err = stmt.Exec(dbKey.Bytes(), cursor.ReceiverTime, env.Message().Timestamp, env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().Version)
if err != nil {
return err
@ -275,7 +275,7 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
if query.PagingInfo.Cursor != nil {
usesCursor = true
var exists bool
cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest)
cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest)
err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = ?)",
cursorDBKey.Bytes(),
@ -301,7 +301,7 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
if query.StartTime != 0 {
if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
conditions = append(conditions, "id >= ?")
startTimeDBKey := NewDBKey(uint64(query.StartTime), "", []byte{})
startTimeDBKey := NewDBKey(uint64(query.StartTime), uint64(query.StartTime), "", []byte{})
parameters = append(parameters, startTimeDBKey.Bytes())
}
@ -310,7 +310,7 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
if query.EndTime != 0 {
if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_FORWARD {
conditions = append(conditions, "id <= ?")
endTimeDBKey := NewDBKey(uint64(query.EndTime), "", []byte{})
endTimeDBKey := NewDBKey(uint64(query.EndTime), uint64(query.EndTime), "", []byte{})
parameters = append(parameters, endTimeDBKey.Bytes())
}
}