Add concurrent RPC calls

This commit is contained in:
Ivan Danyliuk 2018-06-20 19:17:33 +02:00
parent fa519f8d5f
commit 12ef7062f2
No known key found for this signature in database
GPG Key ID: 97ED33CE024E1DBF

View File

@ -4,6 +4,7 @@ import (
"context"
"log"
"sort"
"sync"
)
// ClusterSource represents knowledge source of
@ -49,28 +50,39 @@ func (f *Fetcher) Nodes(ctx context.Context, tag string) ([]*ClusterNode, error)
// NodePeers runs `admin_peers` command for each node.
func (f *Fetcher) NodePeers(ctx context.Context, nodes []*ClusterNode) ([]*Node, []*Link, error) {
m := make(map[string]*Node)
var links []*Link
var (
m sync.Map
wg sync.WaitGroup
linksMu sync.Mutex
links []*Link
)
for _, node := range nodes {
// TODO: run concurrently
peers, err := f.rpc.AdminPeers(ctx, node.IP)
if err != nil {
log.Printf("[ERROR] Failed to get peers from %s\n", node.IP)
continue
}
wg.Add(1)
go func(node *ClusterNode) {
defer wg.Done()
peers, err := f.rpc.AdminPeers(ctx, node.IP)
if err != nil {
log.Printf("[ERROR] Failed to get peers from %s\n", node.IP)
return
}
for _, peer := range peers {
m[peer.ID()] = peer
for _, peer := range peers {
m.Store(peer.ID(), peer)
link := NewLink(node.ID, peer.ID())
links = append(links, link)
}
link := NewLink(node.ID, peer.ID())
linksMu.Lock()
links = append(links, link)
linksMu.Unlock()
}
}(node)
}
wg.Wait()
var ret []*Node
for _, node := range m {
ret = append(ret, node)
}
m.Range(func(k, v interface{}) bool {
ret = append(ret, v.(*Node))
return true
})
sort.Slice(ret, func(i, j int) bool {
return ret[i].ID() < ret[j].ID()
})