mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-14 03:43:07 +00:00
connect to the store peer that is added via API
This commit is contained in:
parent
5c37e53205
commit
7d12cd0fb9
@ -713,7 +713,7 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro
|
||||
// AddPeer is used to add a peer and the protocols it support to the node peerstore
|
||||
// TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics.
|
||||
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
|
||||
return w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...)
|
||||
return w.peermanager.AddPeer(address, origin, pubSubTopics, false, protocols...)
|
||||
}
|
||||
|
||||
// AddDiscoveredPeer to add a discovered peer to the node peerStore
|
||||
|
||||
@ -420,7 +420,7 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig
|
||||
}
|
||||
|
||||
// AddPeer adds peer to the peerStore and also to service slots
|
||||
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
|
||||
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, connectNow bool, protocols ...protocol.ID) (peer.ID, error) {
|
||||
//Assuming all addresses have peerId
|
||||
info, err := peer.AddrInfoFromP2pAddr(address)
|
||||
if err != nil {
|
||||
@ -438,6 +438,14 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTo
|
||||
return "", err
|
||||
}
|
||||
|
||||
if connectNow {
|
||||
go pm.peerConnector.PushToChan(service.PeerData{
|
||||
Origin: origin,
|
||||
AddrInfo: *info,
|
||||
PubSubTopics: pubSubTopics,
|
||||
})
|
||||
}
|
||||
|
||||
return info.ID, nil
|
||||
}
|
||||
|
||||
|
||||
@ -71,7 +71,7 @@ func TestServiceSlots(t *testing.T) {
|
||||
|
||||
// add h2 peer to peer manager
|
||||
t.Log(h2.ID())
|
||||
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, libp2pProtocol.ID(protocol))
|
||||
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol))
|
||||
require.NoError(t, err)
|
||||
|
||||
///////////////
|
||||
@ -84,7 +84,7 @@ func TestServiceSlots(t *testing.T) {
|
||||
require.Equal(t, peerID, h2.ID())
|
||||
|
||||
// add h3 peer to peer manager
|
||||
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, libp2pProtocol.ID(protocol))
|
||||
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol))
|
||||
require.NoError(t, err)
|
||||
|
||||
// check that returned peer is h2 or h3 peer
|
||||
@ -108,7 +108,7 @@ func TestServiceSlots(t *testing.T) {
|
||||
require.Error(t, err, ErrNoPeersAvailable)
|
||||
|
||||
// add h4 peer for protocol1
|
||||
_, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1))
|
||||
_, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol1))
|
||||
require.NoError(t, err)
|
||||
|
||||
//Test peer selection for protocol1
|
||||
@ -134,10 +134,10 @@ func TestPeerSelection(t *testing.T) {
|
||||
defer h3.Close()
|
||||
|
||||
protocol := libp2pProtocol.ID("test/protocol")
|
||||
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol))
|
||||
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, false, libp2pProtocol.ID(protocol))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
|
||||
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, false, libp2pProtocol.ID(protocol))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
|
||||
@ -176,7 +176,7 @@ func TestDefaultProtocol(t *testing.T) {
|
||||
defer h5.Close()
|
||||
|
||||
//Test peer selection for relay protocol from peer store
|
||||
_, err = pm.AddPeer(getAddr(h5), wps.Static, []string{""}, relay.WakuRelayID_v200)
|
||||
_, err = pm.AddPeer(getAddr(h5), wps.Static, []string{""}, false, relay.WakuRelayID_v200)
|
||||
require.NoError(t, err)
|
||||
|
||||
// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
|
||||
@ -197,7 +197,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer h6.Close()
|
||||
|
||||
_, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2)
|
||||
_, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, false, protocol2)
|
||||
require.NoError(t, err)
|
||||
|
||||
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
|
||||
|
||||
@ -281,12 +281,11 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
|
||||
|
||||
//Add Peer to peerstore.
|
||||
if store.pm != nil && params.peerAddr != nil {
|
||||
peerId, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v20beta4)
|
||||
peerId, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, true, StoreID_v20beta4)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
params.selectedPeer = peerId
|
||||
|
||||
}
|
||||
if store.pm != nil && params.selectedPeer == "" {
|
||||
var err error
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user