mirror of https://github.com/status-im/go-waku.git
fix: remove channel for disconnections
Since a subscriber might not be connected always it makes no sense to automatically unsubscribe on disconnet
This commit is contained in:
parent
70e301f348
commit
761ae88bbd
|
@ -83,9 +83,7 @@ func (w *WakuNode) connectednessListener() {
|
||||||
return
|
return
|
||||||
case <-w.protocolEventSub.Out():
|
case <-w.protocolEventSub.Out():
|
||||||
case <-w.identificationEventSub.Out():
|
case <-w.identificationEventSub.Out():
|
||||||
case p := <-w.connectionNotif.DisconnectChan:
|
case <-w.connectionNotif.DisconnectChan:
|
||||||
// Notify filter of disconnection
|
|
||||||
w.filter.DisconnectChan <- p
|
|
||||||
}
|
}
|
||||||
w.sendConnStatus()
|
w.sendConnStatus()
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,8 +46,6 @@ type (
|
||||||
subscribers []Subscriber
|
subscribers []Subscriber
|
||||||
pushHandler MessagePushHandler
|
pushHandler MessagePushHandler
|
||||||
MsgC chan *protocol.Envelope
|
MsgC chan *protocol.Envelope
|
||||||
|
|
||||||
DisconnectChan chan peer.ID
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -158,24 +156,6 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilter) peerListener() {
|
|
||||||
for peerID := range wf.DisconnectChan {
|
|
||||||
log.Info("filter Notification received ", peerID)
|
|
||||||
i := 0
|
|
||||||
// Delete subscribers matching deleted peer
|
|
||||||
for _, s := range wf.subscribers {
|
|
||||||
if s.peer != peerID {
|
|
||||||
wf.subscribers[i] = s
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("filter, deleted subscribers: ", len(wf.subscribers)-i)
|
|
||||||
wf.subscribers = wf.subscribers[:i]
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewWakuFilter(ctx context.Context, host host.Host, handler MessagePushHandler) *WakuFilter {
|
func NewWakuFilter(ctx context.Context, host host.Host, handler MessagePushHandler) *WakuFilter {
|
||||||
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
|
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -187,11 +167,9 @@ func NewWakuFilter(ctx context.Context, host host.Host, handler MessagePushHandl
|
||||||
wf.MsgC = make(chan *protocol.Envelope)
|
wf.MsgC = make(chan *protocol.Envelope)
|
||||||
wf.h = host
|
wf.h = host
|
||||||
wf.pushHandler = handler
|
wf.pushHandler = handler
|
||||||
wf.DisconnectChan = make(chan peer.ID)
|
|
||||||
|
|
||||||
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
|
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
|
||||||
go wf.FilterListener()
|
go wf.FilterListener()
|
||||||
go wf.peerListener()
|
|
||||||
|
|
||||||
return wf
|
return wf
|
||||||
}
|
}
|
||||||
|
@ -220,7 +198,7 @@ func (wf *WakuFilter) FilterListener() {
|
||||||
log.Info("pushing a message to light node: ", pushRPC)
|
log.Info("pushing a message to light node: ", pushRPC)
|
||||||
|
|
||||||
conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1)
|
conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1)
|
||||||
|
// TODO: keep track of errors to automatically unsubscribe a peer?
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// @TODO more sophisticated error handling here
|
// @TODO more sophisticated error handling here
|
||||||
log.Error("failed to open peer stream")
|
log.Error("failed to open peer stream")
|
||||||
|
|
Loading…
Reference in New Issue