diff --git a/waku/waku.go b/waku/waku.go index fe68e660d..ad06fc63f 100644 --- a/waku/waku.go +++ b/waku/waku.go @@ -51,6 +51,10 @@ import ( // TimeSyncError error for clock skew errors. type TimeSyncError error +type Bridge interface { + Pipe() (<-chan *Envelope, chan<- *Envelope) +} + type settings struct { MaxMsgSize uint32 // Maximal message length allowed by the waku node EnableConfirmations bool // Enable sending message confirmations @@ -95,6 +99,10 @@ type Waku struct { timeSource func() time.Time // source of time for waku + bridge Bridge + bridgeWg sync.WaitGroup + cancelBridge chan struct{} + logger *zap.Logger } @@ -343,6 +351,47 @@ func (w *Waku) RegisterRateLimiter(r *PeerRateLimiter) { w.rateLimiter = r } +// RegisterBridge registers a new Bridge that moves envelopes +// between different subprotocols. +// It's important that a bridge is registered before the service +// is started, otherwise, it won't read and propagate envelopes. +func (w *Waku) RegisterBridge(b Bridge) { + if w.cancelBridge != nil { + close(w.cancelBridge) + } + w.bridge = b + w.cancelBridge = make(chan struct{}) + w.bridgeWg.Add(1) + go w.readBridgeLoop() +} + +func (w *Waku) readBridgeLoop() { + defer w.bridgeWg.Done() + out, _ := w.bridge.Pipe() + for { + select { + case <-w.cancelBridge: + return + case env := <-out: + _, err := w.addAndBridge(env, false, true) + if err != nil { + w.logger.Warn( + "failed to add a bridged envelope", + zap.Binary("ID", env.Hash().Bytes()), + zap.Error(err), + ) + } else { + w.logger.Debug("bridged envelope successfully", zap.Binary("ID", env.Hash().Bytes())) + w.envelopeFeed.Send(EnvelopeEvent{ + Event: EventEnvelopeReceived, + Topic: env.Topic, + Hash: env.Hash(), + }) + } + } + } +} + // SubscribeEnvelopeEvents subscribes to envelopes feed. // In order to prevent blocking waku producers events must be amply buffered. func (w *Waku) SubscribeEnvelopeEvents(events chan<- EnvelopeEvent) event.Subscription { @@ -829,6 +878,11 @@ func (w *Waku) Start(*p2p.Server) error { // Stop implements node.Service, stopping the background data propagation thread // of the Waku protocol. func (w *Waku) Stop() error { + if w.cancelBridge != nil { + close(w.cancelBridge) + w.cancelBridge = nil + w.bridgeWg.Wait() + } close(w.quit) return nil } @@ -1145,11 +1199,15 @@ func (w *Waku) handleBatchAcknowledgeCode(p *Peer, packet p2p.Msg, logger *zap.L return nil } -// add inserts a new envelope into the message pool to be distributed within the +func (w *Waku) add(envelope *Envelope, isP2P bool) (bool, error) { + return w.addAndBridge(envelope, isP2P, false) +} + +// addAndBridge inserts a new envelope into the message pool to be distributed within the // waku network. It also inserts the envelope into the expiration pool at the // appropriate time-stamp. In case of error, connection should be dropped. // param isP2P indicates whether the message is peer-to-peer (should not be forwarded). -func (w *Waku) add(envelope *Envelope, isP2P bool) (bool, error) { +func (w *Waku) addAndBridge(envelope *Envelope, isP2P bool, bridged bool) (bool, error) { now := uint32(w.timeSource().Unix()) sent := envelope.Expiry - envelope.TTL @@ -1232,6 +1290,13 @@ func (w *Waku) add(envelope *Envelope, isP2P bool) (bool, error) { Event: EventMailServerEnvelopeArchived, }) } + // Bridge only envelopes that are not p2p messages. + // In particular, if a node is a lightweight node, + // it should not bridge any envelopes. + if !isP2P && !bridged && w.bridge != nil { + _, in := w.bridge.Pipe() + in <- envelope + } } return true, nil } diff --git a/whisper/whisper.go b/whisper/whisper.go index 8bd55ff81..f0c306fc6 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -43,6 +43,10 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) +type Bridge interface { + Pipe() (<-chan *Envelope, chan<- *Envelope) +} + // TimeSyncError error for clock skew errors. type TimeSyncError error @@ -113,6 +117,10 @@ type Whisper struct { envelopeFeed event.Feed timeSource func() time.Time // source of time for whisper + + bridge Bridge + bridgeWg sync.WaitGroup + cancelBridge chan struct{} } // New creates a Whisper client ready to communicate through the Ethereum P2P network. @@ -268,6 +276,51 @@ func (whisper *Whisper) RegisterMailServer(server MailServer) { whisper.mailServer = server } +// RegisterBridge registers a new Bridge that moves envelopes +// between different subprotocols. +// It's important that a bridge is registered before the service +// is started, otherwise, it won't read and propagate envelopes. +func (whisper *Whisper) RegisterBridge(b Bridge) { + if whisper.cancelBridge != nil { + close(whisper.cancelBridge) + whisper.bridgeWg.Wait() + } + whisper.bridge = b + whisper.cancelBridge = make(chan struct{}) + whisper.bridgeWg.Add(1) + go whisper.readBridgeLoop() +} + +func (whisper *Whisper) readBridgeLoop() { + defer whisper.bridgeWg.Done() + out, _ := whisper.bridge.Pipe() + for { + select { + case <-whisper.cancelBridge: + return + case env := <-out: + _, err := whisper.addAndBridge(env, false, true) + if err != nil { + log.Warn( + "failed to add a bridged envelope", + "ID", env.Hash().Bytes(), + "err", err, + ) + } else { + log.Debug( + "bridged envelope successfully", + "ID", env.Hash().Bytes(), + ) + whisper.envelopeFeed.Send(EnvelopeEvent{ + Event: EventEnvelopeReceived, + Topic: env.Topic, + Hash: env.Hash(), + }) + } + } + } +} + // Protocols returns the whisper sub-protocols ran by this particular client. func (whisper *Whisper) Protocols() []p2p.Protocol { return []p2p.Protocol{whisper.protocol} @@ -877,6 +930,11 @@ func (whisper *Whisper) Start(*p2p.Server) error { // Stop implements node.Service, stopping the background data propagation thread // of the Whisper protocol. func (whisper *Whisper) Stop() error { + if whisper.cancelBridge != nil { + close(whisper.cancelBridge) + whisper.cancelBridge = nil + whisper.bridgeWg.Wait() + } close(whisper.quit) log.Info("whisper stopped") return nil @@ -1220,11 +1278,15 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { } } +func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) { + return whisper.addAndBridge(envelope, isP2P, false) +} + // add inserts a new envelope into the message pool to be distributed within the // whisper network. It also inserts the envelope into the expiration pool at the // appropriate time-stamp. In case of error, connection should be dropped. // param isP2P indicates whether the message is peer-to-peer (should not be forwarded). -func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) { +func (whisper *Whisper) addAndBridge(envelope *Envelope, isP2P bool, bridged bool) (bool, error) { now := uint32(whisper.timeSource().Unix()) sent := envelope.Expiry - envelope.TTL @@ -1309,6 +1371,13 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) { Event: EventMailServerEnvelopeArchived, }) } + // Bridge only envelopes that are not p2p messages. + // In particular, if a node is a lightweight node, + // it should not bridge any envelopes. + if !isP2P && !bridged && whisper.bridge != nil { + _, in := whisper.bridge.Pipe() + in <- envelope + } } return true, nil }