diff --git a/cluster.go b/cluster.go index b46143f..e23f0a9 100644 --- a/cluster.go +++ b/cluster.go @@ -4,6 +4,7 @@ import ( "context" "log" "sort" + "sync" ) // ClusterSource represents knowledge source of @@ -49,28 +50,39 @@ func (f *Fetcher) Nodes(ctx context.Context, tag string) ([]*ClusterNode, error) // NodePeers runs `admin_peers` command for each node. func (f *Fetcher) NodePeers(ctx context.Context, nodes []*ClusterNode) ([]*Node, []*Link, error) { - m := make(map[string]*Node) - var links []*Link + var ( + m sync.Map + wg sync.WaitGroup + linksMu sync.Mutex + links []*Link + ) for _, node := range nodes { - // TODO: run concurrently - peers, err := f.rpc.AdminPeers(ctx, node.IP) - if err != nil { - log.Printf("[ERROR] Failed to get peers from %s\n", node.IP) - continue - } + wg.Add(1) + go func(node *ClusterNode) { + defer wg.Done() + peers, err := f.rpc.AdminPeers(ctx, node.IP) + if err != nil { + log.Printf("[ERROR] Failed to get peers from %s\n", node.IP) + return + } - for _, peer := range peers { - m[peer.ID()] = peer + for _, peer := range peers { + m.Store(peer.ID(), peer) - link := NewLink(node.ID, peer.ID()) - links = append(links, link) - } + link := NewLink(node.ID, peer.ID()) + linksMu.Lock() + links = append(links, link) + linksMu.Unlock() + } + }(node) } + wg.Wait() var ret []*Node - for _, node := range m { - ret = append(ret, node) - } + m.Range(func(k, v interface{}) bool { + ret = append(ret, v.(*Node)) + return true + }) sort.Slice(ret, func(i, j int) bool { return ret[i].ID() < ret[j].ID() })