Prepare Whisper and Waku for bridge implementation (#1850)

This commit is contained in:
Adam Babik 2020-02-13 15:30:20 +01:00 committed by GitHub
parent fdcefb8dc2
commit 7ba20cb5a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 137 additions and 3 deletions

View File

@ -51,6 +51,10 @@ import (
// TimeSyncError error for clock skew errors.
type TimeSyncError error
type Bridge interface {
Pipe() (<-chan *Envelope, chan<- *Envelope)
}
type settings struct {
MaxMsgSize uint32 // Maximal message length allowed by the waku node
EnableConfirmations bool // Enable sending message confirmations
@ -95,6 +99,10 @@ type Waku struct {
timeSource func() time.Time // source of time for waku
bridge Bridge
bridgeWg sync.WaitGroup
cancelBridge chan struct{}
logger *zap.Logger
}
@ -343,6 +351,47 @@ func (w *Waku) RegisterRateLimiter(r *PeerRateLimiter) {
w.rateLimiter = r
}
// RegisterBridge registers a new Bridge that moves envelopes
// between different subprotocols.
// It's important that a bridge is registered before the service
// is started, otherwise, it won't read and propagate envelopes.
func (w *Waku) RegisterBridge(b Bridge) {
if w.cancelBridge != nil {
close(w.cancelBridge)
}
w.bridge = b
w.cancelBridge = make(chan struct{})
w.bridgeWg.Add(1)
go w.readBridgeLoop()
}
func (w *Waku) readBridgeLoop() {
defer w.bridgeWg.Done()
out, _ := w.bridge.Pipe()
for {
select {
case <-w.cancelBridge:
return
case env := <-out:
_, err := w.addAndBridge(env, false, true)
if err != nil {
w.logger.Warn(
"failed to add a bridged envelope",
zap.Binary("ID", env.Hash().Bytes()),
zap.Error(err),
)
} else {
w.logger.Debug("bridged envelope successfully", zap.Binary("ID", env.Hash().Bytes()))
w.envelopeFeed.Send(EnvelopeEvent{
Event: EventEnvelopeReceived,
Topic: env.Topic,
Hash: env.Hash(),
})
}
}
}
}
// SubscribeEnvelopeEvents subscribes to envelopes feed.
// In order to prevent blocking waku producers events must be amply buffered.
func (w *Waku) SubscribeEnvelopeEvents(events chan<- EnvelopeEvent) event.Subscription {
@ -829,6 +878,11 @@ func (w *Waku) Start(*p2p.Server) error {
// Stop implements node.Service, stopping the background data propagation thread
// of the Waku protocol.
func (w *Waku) Stop() error {
if w.cancelBridge != nil {
close(w.cancelBridge)
w.cancelBridge = nil
w.bridgeWg.Wait()
}
close(w.quit)
return nil
}
@ -1145,11 +1199,15 @@ func (w *Waku) handleBatchAcknowledgeCode(p *Peer, packet p2p.Msg, logger *zap.L
return nil
}
// add inserts a new envelope into the message pool to be distributed within the
func (w *Waku) add(envelope *Envelope, isP2P bool) (bool, error) {
return w.addAndBridge(envelope, isP2P, false)
}
// addAndBridge inserts a new envelope into the message pool to be distributed within the
// waku network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp. In case of error, connection should be dropped.
// param isP2P indicates whether the message is peer-to-peer (should not be forwarded).
func (w *Waku) add(envelope *Envelope, isP2P bool) (bool, error) {
func (w *Waku) addAndBridge(envelope *Envelope, isP2P bool, bridged bool) (bool, error) {
now := uint32(w.timeSource().Unix())
sent := envelope.Expiry - envelope.TTL
@ -1232,6 +1290,13 @@ func (w *Waku) add(envelope *Envelope, isP2P bool) (bool, error) {
Event: EventMailServerEnvelopeArchived,
})
}
// Bridge only envelopes that are not p2p messages.
// In particular, if a node is a lightweight node,
// it should not bridge any envelopes.
if !isP2P && !bridged && w.bridge != nil {
_, in := w.bridge.Pipe()
in <- envelope
}
}
return true, nil
}

View File

@ -43,6 +43,10 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)
type Bridge interface {
Pipe() (<-chan *Envelope, chan<- *Envelope)
}
// TimeSyncError error for clock skew errors.
type TimeSyncError error
@ -113,6 +117,10 @@ type Whisper struct {
envelopeFeed event.Feed
timeSource func() time.Time // source of time for whisper
bridge Bridge
bridgeWg sync.WaitGroup
cancelBridge chan struct{}
}
// New creates a Whisper client ready to communicate through the Ethereum P2P network.
@ -268,6 +276,51 @@ func (whisper *Whisper) RegisterMailServer(server MailServer) {
whisper.mailServer = server
}
// RegisterBridge registers a new Bridge that moves envelopes
// between different subprotocols.
// It's important that a bridge is registered before the service
// is started, otherwise, it won't read and propagate envelopes.
func (whisper *Whisper) RegisterBridge(b Bridge) {
if whisper.cancelBridge != nil {
close(whisper.cancelBridge)
whisper.bridgeWg.Wait()
}
whisper.bridge = b
whisper.cancelBridge = make(chan struct{})
whisper.bridgeWg.Add(1)
go whisper.readBridgeLoop()
}
func (whisper *Whisper) readBridgeLoop() {
defer whisper.bridgeWg.Done()
out, _ := whisper.bridge.Pipe()
for {
select {
case <-whisper.cancelBridge:
return
case env := <-out:
_, err := whisper.addAndBridge(env, false, true)
if err != nil {
log.Warn(
"failed to add a bridged envelope",
"ID", env.Hash().Bytes(),
"err", err,
)
} else {
log.Debug(
"bridged envelope successfully",
"ID", env.Hash().Bytes(),
)
whisper.envelopeFeed.Send(EnvelopeEvent{
Event: EventEnvelopeReceived,
Topic: env.Topic,
Hash: env.Hash(),
})
}
}
}
}
// Protocols returns the whisper sub-protocols ran by this particular client.
func (whisper *Whisper) Protocols() []p2p.Protocol {
return []p2p.Protocol{whisper.protocol}
@ -877,6 +930,11 @@ func (whisper *Whisper) Start(*p2p.Server) error {
// Stop implements node.Service, stopping the background data propagation thread
// of the Whisper protocol.
func (whisper *Whisper) Stop() error {
if whisper.cancelBridge != nil {
close(whisper.cancelBridge)
whisper.cancelBridge = nil
whisper.bridgeWg.Wait()
}
close(whisper.quit)
log.Info("whisper stopped")
return nil
@ -1220,11 +1278,15 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
}
}
func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
return whisper.addAndBridge(envelope, isP2P, false)
}
// add inserts a new envelope into the message pool to be distributed within the
// whisper network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp. In case of error, connection should be dropped.
// param isP2P indicates whether the message is peer-to-peer (should not be forwarded).
func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
func (whisper *Whisper) addAndBridge(envelope *Envelope, isP2P bool, bridged bool) (bool, error) {
now := uint32(whisper.timeSource().Unix())
sent := envelope.Expiry - envelope.TTL
@ -1309,6 +1371,13 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
Event: EventMailServerEnvelopeArchived,
})
}
// Bridge only envelopes that are not p2p messages.
// In particular, if a node is a lightweight node,
// it should not bridge any envelopes.
if !isP2P && !bridged && whisper.bridge != nil {
_, in := whisper.bridge.Pipe()
in <- envelope
}
}
return true, nil
}