whisper: use syncmap for dynamic configuration options

This commit is contained in:
Bas van Kervel 2017-06-21 11:15:47 +02:00
parent c4cef83e3a
commit 5b14175f89

View File

@ -26,8 +26,6 @@ import (
"sync" "sync"
"time" "time"
"sync/atomic"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
@ -35,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
"golang.org/x/crypto/pbkdf2" "golang.org/x/crypto/pbkdf2"
"golang.org/x/sync/syncmap"
set "gopkg.in/fatih/set.v0" set "gopkg.in/fatih/set.v0"
) )
@ -46,38 +45,12 @@ type Statistics struct {
totalMessagesCleared int totalMessagesCleared int
} }
type settingType byte
type settingsMap map[settingType]interface{}
const ( const (
minPowIdx settingType = iota // Minimal PoW required by the whisper node minPowIdx = iota // Minimal PoW required by the whisper node
maxMsgSizeIdx settingType = iota // Maximal message length allowed by the whisper node maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node
OverflowIdx settingType = iota // Indicator of message queue overflow overflowIdx = iota // Indicator of message queue overflow
) )
type settingsVault struct {
vaultMu sync.Mutex
vault atomic.Value
}
func (s *settingsVault) get(idx settingType) interface{} {
m := s.vault.Load().(settingsMap)
return m[idx]
}
func (s *settingsVault) store(idx settingType, val interface{}) {
s.vaultMu.Lock()
defer s.vaultMu.Unlock()
m1 := s.vault.Load().(settingsMap)
m2 := make(settingsMap)
for k, v := range m1 {
m2[k] = v
}
m2[idx] = val
s.vault.Store(m2)
}
// Whisper represents a dark communication interface through the Ethereum // Whisper represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer. // network, using its very own P2P communication layer.
type Whisper struct { type Whisper struct {
@ -99,7 +72,7 @@ type Whisper struct {
p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further) p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further)
quit chan struct{} // Channel used for graceful exit quit chan struct{} // Channel used for graceful exit
settings settingsVault // holds configuration settings that can be dynamically changed settings syncmap.Map // holds configuration settings that can be dynamically changed
statsMu sync.Mutex // guard stats statsMu sync.Mutex // guard stats
stats Statistics // Statistics of whisper node stats Statistics // Statistics of whisper node
@ -126,10 +99,9 @@ func New(cfg *Config) *Whisper {
whisper.filters = NewFilters(whisper) whisper.filters = NewFilters(whisper)
whisper.settings.vault.Store(make(settingsMap)) whisper.settings.Store(minPowIdx, cfg.MinimumAcceptedPOW)
whisper.settings.store(minPowIdx, cfg.MinimumAcceptedPOW) whisper.settings.Store(maxMsgSizeIdx, cfg.MaxMessageSize)
whisper.settings.store(maxMsgSizeIdx, cfg.MaxMessageSize) whisper.settings.Store(overflowIdx, false)
whisper.settings.store(OverflowIdx, false)
// p2p whisper sub protocol handler // p2p whisper sub protocol handler
whisper.protocol = p2p.Protocol{ whisper.protocol = p2p.Protocol{
@ -150,17 +122,20 @@ func New(cfg *Config) *Whisper {
} }
func (w *Whisper) MinPow() float64 { func (w *Whisper) MinPow() float64 {
return w.settings.get(minPowIdx).(float64) val, _ := w.settings.Load(minPowIdx)
return val.(float64)
} }
// MaxMessageSize returns the maximum accepted message size. // MaxMessageSize returns the maximum accepted message size.
func (w *Whisper) MaxMessageSize() uint32 { func (w *Whisper) MaxMessageSize() uint32 {
return w.settings.get(maxMsgSizeIdx).(uint32) val, _ := w.settings.Load(maxMsgSizeIdx)
return val.(uint32)
} }
// Overflow returns an indication if the message queue is full. // Overflow returns an indication if the message queue is full.
func (w *Whisper) Overflow() bool { func (w *Whisper) Overflow() bool {
return w.settings.get(OverflowIdx).(bool) val, _ := w.settings.Load(overflowIdx)
return val.(bool)
} }
// APIs returns the RPC descriptors the Whisper implementation offers // APIs returns the RPC descriptors the Whisper implementation offers
@ -196,7 +171,7 @@ func (w *Whisper) SetMaxMessageSize(size uint32) error {
if size > MaxMessageSize { if size > MaxMessageSize {
return fmt.Errorf("message size too large [%d>%d]", size, MaxMessageSize) return fmt.Errorf("message size too large [%d>%d]", size, MaxMessageSize)
} }
w.settings.store(maxMsgSizeIdx, uint32(size)) w.settings.Store(maxMsgSizeIdx, uint32(size))
return nil return nil
} }
@ -205,7 +180,7 @@ func (w *Whisper) SetMinimumPoW(val float64) error {
if val <= 0.0 { if val <= 0.0 {
return fmt.Errorf("invalid PoW: %f", val) return fmt.Errorf("invalid PoW: %f", val)
} }
w.settings.store(minPowIdx, val) w.settings.Store(minPowIdx, val)
return nil return nil
} }
@ -679,12 +654,12 @@ func (w *Whisper) checkOverflow() {
if queueSize == messageQueueLimit { if queueSize == messageQueueLimit {
if !w.Overflow() { if !w.Overflow() {
w.settings.store(OverflowIdx, true) w.settings.Store(overflowIdx, true)
log.Warn("message queue overflow") log.Warn("message queue overflow")
} }
} else if queueSize <= messageQueueLimit/2 { } else if queueSize <= messageQueueLimit/2 {
if w.Overflow() { if w.Overflow() {
w.settings.store(OverflowIdx, false) w.settings.Store(overflowIdx, false)
log.Warn("message queue overflow fixed (back to normal)") log.Warn("message queue overflow fixed (back to normal)")
} }
} }