diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 535997c2..f35227cc 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -30,6 +30,9 @@ const StoreID_v20beta4 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta4") // MaxPageSize is the maximum number of waku messages to return per page const MaxPageSize = 100 +// MaxTimeVariance is the maximum duration in the future allowed for a message timestamp +const MaxTimeVariance = time.Duration(20) * time.Second + var ( // ErrNoPeersAvailable is returned when there are no store peers in the peer store // that could be used to retrieve message history @@ -44,6 +47,8 @@ var ( // ErrFailedQuery is emitted when the query fails to return results ErrFailedQuery = errors.New("failed to resolve the query") + + ErrFutureMessage = errors.New("message timestamp in the future") ) func findMessages(query *pb.HistoryQuery, msgProvider MessageProvider) ([]*pb.WakuMessage, *pb.PagingInfo, error) { @@ -199,6 +204,11 @@ func (store *WakuStore) Start(ctx context.Context) { } func (store *WakuStore) storeMessage(env *protocol.Envelope) error { + // Ensure that messages don't "jump" to the front of the queue with future timestamps + if env.Index().SenderTime-env.Index().ReceiverTime > int64(MaxTimeVariance) { + return ErrFutureMessage + } + err := store.msgProvider.Put(env) if err != nil { store.log.Error("storing message", zap.Error(err))