Dmitry Shulyak 14c513bd5a
Execute writes atomically only after request was processed without errors (#1454)
* Replace request ID when same request is restarted

* Remove unnecessary changes

* Execute all writes atomically only if request was processed succesfully

* Fix linter

* Fix shadowed errors

* Fix spelling

* Do not append same reference to a byte slice
2019-05-06 09:33:19 +03:00

351 lines
10 KiB
Go

package shhext
import (
"errors"
"fmt"
"sort"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/db"
"github.com/status-im/status-go/mailserver"
whisper "github.com/status-im/whisper/whisperv6"
)
const (
// WhisperTimeAllowance is needed to ensure that we won't miss envelopes that were
// delivered to mail server after we made a request.
WhisperTimeAllowance = 20 * time.Second
)
// NewHistoryUpdateReactor creates HistoryUpdateReactor instance.
func NewHistoryUpdateReactor() *HistoryUpdateReactor {
return &HistoryUpdateReactor{}
}
// HistoryUpdateReactor responsible for tracking progress for all history requests.
// It listens for 2 events:
// - when envelope from mail server is received we will update appropriate topic on disk
// - when confirmation for request completion is received - we will set last envelope timestamp as the last timestamp
// for all TopicLists in current request.
type HistoryUpdateReactor struct {
mu sync.Mutex
}
// UpdateFinishedRequest removes successfully finished request and updates every topic
// attached to the request.
func (reactor *HistoryUpdateReactor) UpdateFinishedRequest(ctx Context, id common.Hash) error {
reactor.mu.Lock()
defer reactor.mu.Unlock()
req, err := ctx.HistoryStore().GetRequest(id)
if err != nil {
return err
}
for i := range req.Histories() {
th := &req.Histories()[i]
th.RequestID = common.Hash{}
th.Current = th.End
th.End = time.Time{}
if err := th.Save(); err != nil {
return err
}
}
return req.Delete()
}
// UpdateTopicHistory updates Current timestamp for the TopicHistory with a given timestamp.
func (reactor *HistoryUpdateReactor) UpdateTopicHistory(ctx Context, topic whisper.TopicType, timestamp time.Time) error {
reactor.mu.Lock()
defer reactor.mu.Unlock()
histories, err := ctx.HistoryStore().GetHistoriesByTopic(topic)
if err != nil {
return err
}
if len(histories) == 0 {
return fmt.Errorf("no histories for topic 0x%x", topic)
}
for i := range histories {
th := &histories[i]
// this case could happen only iff envelopes were delivered out of order
// last envelope received, request completed, then others envelopes received
// request completed, last envelope received, and then all others envelopes received
if !th.Pending() {
continue
}
if timestamp.Before(th.End) && timestamp.After(th.Current) {
th.Current = timestamp
}
err := th.Save()
if err != nil {
return err
}
}
return nil
}
// TopicRequest defines what user has to provide.
type TopicRequest struct {
Topic whisper.TopicType
Duration time.Duration
}
// CreateRequests receives list of topic with desired timestamps and initiates both pending requests and requests
// that cover new topics.
func (reactor *HistoryUpdateReactor) CreateRequests(ctx Context, topicRequests []TopicRequest) ([]db.HistoryRequest, error) {
reactor.mu.Lock()
defer reactor.mu.Unlock()
seen := map[whisper.TopicType]struct{}{}
for i := range topicRequests {
if _, exist := seen[topicRequests[i].Topic]; exist {
return nil, errors.New("only one duration per topic is allowed")
}
seen[topicRequests[i].Topic] = struct{}{}
}
histories := map[whisper.TopicType]db.TopicHistory{}
for i := range topicRequests {
th, err := ctx.HistoryStore().GetHistory(topicRequests[i].Topic, topicRequests[i].Duration)
if err != nil {
return nil, err
}
histories[th.Topic] = th
}
requests, err := ctx.HistoryStore().GetAllRequests()
if err != nil {
return nil, err
}
filtered := []db.HistoryRequest{}
for i := range requests {
req := requests[i]
for _, th := range histories {
if th.Pending() {
delete(histories, th.Topic)
}
}
if !ctx.RequestRegistry().Has(req.ID) {
filtered = append(filtered, req)
}
}
adjusted, err := adjustRequestedHistories(ctx.HistoryStore(), mapToList(histories))
if err != nil {
return nil, err
}
filtered = append(filtered,
GroupHistoriesByRequestTimespan(ctx.HistoryStore(), adjusted)...)
return RenewRequests(filtered, ctx.Time()), nil
}
// for every history that is not included in any request check if there are other ranges with such topic in db
// if so check if they can be merged
// if not then adjust second part so that End of it will be equal to First of previous
func adjustRequestedHistories(store db.HistoryStore, histories []db.TopicHistory) ([]db.TopicHistory, error) {
adjusted := []db.TopicHistory{}
for i := range histories {
all, err := store.GetHistoriesByTopic(histories[i].Topic)
if err != nil {
return nil, err
}
th, err := adjustRequestedHistory(&histories[i], all...)
if err != nil {
return nil, err
}
if th != nil {
adjusted = append(adjusted, *th)
}
}
return adjusted, nil
}
func adjustRequestedHistory(th *db.TopicHistory, others ...db.TopicHistory) (*db.TopicHistory, error) {
sort.Slice(others, func(i, j int) bool {
return others[i].Duration > others[j].Duration
})
if len(others) == 1 && others[0].Duration == th.Duration {
return th, nil
}
for j := range others {
if others[j].Duration == th.Duration {
// skip instance with same duration
continue
} else if th.Duration > others[j].Duration {
if th.Current.Equal(others[j].First) {
// this condition will be reached when query for new index successfully finished
th.Current = others[j].Current
// FIXME next two db operations must be completed atomically
err := th.Save()
if err != nil {
return nil, err
}
err = others[j].Delete()
if err != nil {
return nil, err
}
} else if (others[j].First != time.Time{}) {
// select First timestamp with lowest value. if there are multiple indexes that cover such ranges:
// 6:00 - 7:00 Duration: 3h
// 7:00 - 8:00 2h
// 8:00 - 9:00 1h
// and client created new index with Duration 4h
// 4h index must have End value set to 6:00
if (others[j].First.Before(th.End) || th.End == time.Time{}) {
th.End = others[j].First
}
} else {
// remove previous if it is covered by new one
// client created multiple indexes without any succsefully executed query
err := others[j].Delete()
if err != nil {
return nil, err
}
}
} else if th.Duration < others[j].Duration {
if !others[j].Pending() {
th = &others[j]
} else {
return nil, nil
}
}
}
return th, nil
}
// RenewRequests re-sets current, first and end timestamps.
// Changes should not be persisted on disk in this method.
func RenewRequests(requests []db.HistoryRequest, now time.Time) []db.HistoryRequest {
zero := time.Time{}
for i := range requests {
req := requests[i]
histories := req.Histories()
for j := range histories {
history := &histories[j]
if history.Current == zero {
history.Current = now.Add(-(history.Duration))
}
if history.First == zero {
history.First = history.Current
}
if history.End == zero {
history.End = now
}
}
}
return requests
}
// CreateTopicOptionsFromRequest transforms histories attached to a single request to a simpler format - TopicOptions.
func CreateTopicOptionsFromRequest(req db.HistoryRequest) TopicOptions {
histories := req.Histories()
rst := make(TopicOptions, len(histories))
for i := range histories {
history := histories[i]
rst[i] = TopicOption{
Topic: history.Topic,
Range: Range{
Start: uint64(history.Current.Add(-(WhisperTimeAllowance)).Unix()),
End: uint64(history.End.Unix()),
},
}
}
return rst
}
func mapToList(topics map[whisper.TopicType]db.TopicHistory) []db.TopicHistory {
rst := make([]db.TopicHistory, 0, len(topics))
for key := range topics {
rst = append(rst, topics[key])
}
return rst
}
// GroupHistoriesByRequestTimespan creates requests from provided histories.
// Multiple histories will be included into the same request only if they share timespan.
func GroupHistoriesByRequestTimespan(store db.HistoryStore, histories []db.TopicHistory) []db.HistoryRequest {
requests := []db.HistoryRequest{}
for _, th := range histories {
var added bool
for i := range requests {
req := &requests[i]
histories := req.Histories()
if histories[0].SameRange(th) {
req.AddHistory(th)
added = true
}
}
if !added {
req := store.NewRequest()
req.AddHistory(th)
requests = append(requests, req)
}
}
return requests
}
// Range of the request.
type Range struct {
Start uint64
End uint64
}
// TopicOption request for a single topic.
type TopicOption struct {
Topic whisper.TopicType
Range Range
}
// TopicOptions is a list of topic-based requsts.
type TopicOptions []TopicOption
// ToBloomFilterOption creates bloom filter request from a list of topics.
func (options TopicOptions) ToBloomFilterOption() BloomFilterOption {
topics := make([]whisper.TopicType, len(options))
var start, end uint64
for i := range options {
opt := options[i]
topics[i] = opt.Topic
if opt.Range.Start > start {
start = opt.Range.Start
}
if opt.Range.End > end {
end = opt.Range.End
}
}
return BloomFilterOption{
Range: Range{Start: start, End: end},
Filter: topicsToBloom(topics...),
}
}
// Topics returns list of whisper TopicType attached to each TopicOption.
func (options TopicOptions) Topics() []whisper.TopicType {
rst := make([]whisper.TopicType, len(options))
for i := range options {
rst[i] = options[i].Topic
}
return rst
}
// BloomFilterOption is a request based on bloom filter.
type BloomFilterOption struct {
Range Range
Filter []byte
}
// ToMessagesRequestPayload creates mailserver.MessagesRequestPayload and encodes it to bytes using rlp.
func (filter BloomFilterOption) ToMessagesRequestPayload() ([]byte, error) {
// TODO fix this conversion.
// we start from time.Duration which is int64, then convert to uint64 for rlp-serilizability
// why uint32 here? max uint32 is smaller than max int64
payload := mailserver.MessagesRequestPayload{
Lower: uint32(filter.Range.Start),
Upper: uint32(filter.Range.End),
Bloom: filter.Filter,
// Client must tell the MailServer if it supports batch responses.
// This can be removed in the future.
Batch: true,
Limit: 10000,
}
return rlp.EncodeToBytes(payload)
}