Add UnsubscribeFilter

This commit is contained in:
Vitaliy Vlasov 2021-06-14 15:54:50 +03:00
parent ee65ebc32c
commit 0c3f109d9e
3 changed files with 85 additions and 4 deletions

View File

@ -92,6 +92,12 @@ func main() {
go writeLoop(ctx, fullNode) go writeLoop(ctx, fullNode)
go readLoop(fullNode) go readLoop(fullNode)
go func() {
// Unsubscribe filter after 5 seconds
time.Sleep(5 * time.Second)
filterRequest.Subscribe = false
lightNode.UnsubscribeFilter(ctx, filterRequest)
}()
// Wait for a SIGINT or SIGTERM signal // Wait for a SIGINT or SIGTERM signal
ch := make(chan os.Signal, 1) ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)

View File

@ -357,14 +357,13 @@ func (node *WakuNode) SubscribeFilter(ctx context.Context, request pb.FilterRequ
// Sanity check for well-formed subscribe FilterRequest // Sanity check for well-formed subscribe FilterRequest
//doAssert(request.subscribe, "invalid subscribe request") //doAssert(request.subscribe, "invalid subscribe request")
log.Info("subscribe content", request) log.Info("SubscribeFilter, request: ", request)
var id string var id string
if node.filter != nil { if node.filter != nil {
id, err := node.filter.Subscribe(ctx, request) id, err := node.filter.Subscribe(ctx, request)
// TODO
if id == "" || err != nil { if id == "" || err != nil {
// Failed to subscribe // Failed to subscribe
log.Error("remote subscription to filter failed", request) log.Error("remote subscription to filter failed", request)
@ -374,7 +373,45 @@ func (node *WakuNode) SubscribeFilter(ctx context.Context, request pb.FilterRequ
} }
// Register handler for filter, whether remote subscription succeeded or not // Register handler for filter, whether remote subscription succeeded or not
node.filters[string(id)] = filter.Filter{ContentFilters: request.ContentFilters, Chan: ch} node.filters[id] = filter.Filter{ContentFilters: request.ContentFilters, Chan: ch}
}
func (node *WakuNode) UnsubscribeFilter(ctx context.Context, request pb.FilterRequest) {
log.Info("UnsubscribeFilter, request: ", request)
// Send message to full node in order to unsubscribe
node.filter.Unsubscribe(ctx, request)
// Remove local filter
var idsToRemove []string
for id, f := range node.filters {
// Iterate filter entries to remove matching content topics
// make sure we delete the content filter
// if no more topics are left
for _, cfToDelete := range request.ContentFilters {
for i, cf := range f.ContentFilters {
if cf.ContentTopic == cfToDelete.ContentTopic {
l := len(f.ContentFilters) - 1
f.ContentFilters[l], f.ContentFilters[i] = f.ContentFilters[i], f.ContentFilters[l]
f.ContentFilters = f.ContentFilters[:l]
break
}
}
if len(f.ContentFilters) == 0 {
idsToRemove = append(idsToRemove, id)
}
}
}
for _, rId := range idsToRemove {
for id, _ := range node.filters {
if id == rId {
delete(node.filters, id)
break
}
}
}
} }
func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic) ([]byte, error) { func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic) ([]byte, error) {

View File

@ -135,7 +135,45 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
wf.subscribers = append(wf.subscribers, subscriber) wf.subscribers = append(wf.subscribers, subscriber)
log.Info("Full node, add a filter subscriber ", subscriber) log.Info("Full node, add a filter subscriber ", subscriber)
} else { } else {
// TODO wf.subscribers.unsubscribeFilters(filterRPCRequest.Request, conn.peerInfo.peerId) peerId := string(s.Conn().RemotePeer())
log.Info("Full node, remove a filter subscriber ", peerId)
contentFilters := filterRPCRequest.Request.ContentFilters
var peerIdsToRemove []string
for _, subscriber := range wf.subscribers {
if subscriber.peer != peerId {
continue
}
// make sure we delete the content filter
// if no more topics are left
for i, contentFilter := range contentFilters {
subCfs := subscriber.filter.ContentFilters
for _, cf := range subCfs {
if cf.ContentTopic == contentFilter.ContentTopic {
l := len(subCfs) - 1
subCfs[l], subCfs[i] = subCfs[i], subCfs[l]
subscriber.filter.ContentFilters = subCfs[:l]
}
}
}
if len(subscriber.filter.ContentFilters) == 0 {
peerIdsToRemove = append(peerIdsToRemove, subscriber.peer)
}
}
// make sure we delete the subscriber
// if no more content filters left
for _, peerId := range peerIdsToRemove {
for i, s := range wf.subscribers {
if s.peer == peerId {
l := len(wf.subscribers) - 1
wf.subscribers[l], wf.subscribers[i] = wf.subscribers[i], wf.subscribers[l]
wf.subscribers = wf.subscribers[:l]
break
}
}
}
} }
} else if filterRPCRequest.Push != nil { } else if filterRPCRequest.Push != nil {
// We're on a light node. // We're on a light node.