{.used.}

import
  std/[options, sets, tables, sequtils],
  chronicles,
  testutils/unittests, stew/shims/net as stewNet,
  json_rpc/[rpcserver, rpcclient],
  eth/[keys, rlp], eth/common/eth_types,
  libp2p/[builders, switch, multiaddress],
  libp2p/protobuf/minprotobuf,
  libp2p/stream/[bufferstream, connection],
  libp2p/crypto/crypto,
  libp2p/protocols/pubsub/pubsub,
  libp2p/protocols/pubsub/rpc/message,
  ../../waku/v2/node/wakunode2,
  ../../waku/v2/node/peer_manager/peer_manager,
  ../../waku/v2/node/storage/peer/waku_peer_storage,
  ../test_helpers

procSuite "Peer Manager":
  asyncTest "Peer dialing works":
    let
      nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
      node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
        Port(60000))
      nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
      node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
        Port(60002))
      peerInfo2 = node2.switch.peerInfo
    
    await allFutures([node1.start(), node2.start()])

    node1.mountRelay()
    node2.mountRelay()

    # Dial node2 from node1
    let conn = (await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec)).get()

    # Check connection
    check:
      conn.activity
      conn.peerId == peerInfo2.peerId
    
    # Check that node2 is being managed in node1
    check:
      node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)

    # Check connectedness
    check:
      node1.peerManager.connectedness(peerInfo2.peerId) == Connectedness.Connected
    
    await allFutures([node1.stop(), node2.stop()])
  
  asyncTest "Dialing fails gracefully":
    let
      nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
      node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
        Port(60000))
      nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
      node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
        Port(60002))
      peerInfo2 = node2.switch.peerInfo
    
    await node1.start()
    # Purposefully don't start node2

    node1.mountRelay()
    node2.mountRelay()

    # Dial node2 from node1
    let connOpt = await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)

    # Check connection failed gracefully
    check:
      connOpt.isNone()
    
    await node1.stop()

  asyncTest "Adding, selecting and filtering peers work":
    let
      nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
      node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"),
        Port(60000))
      # Create filter peer
      filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
      filterKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
      filterPeer = PeerInfo.new(filterKey, @[filterLoc])
      # Create swap peer
      swapLoc = MultiAddress.init("/ip4/127.0.0.2/tcp/2").tryGet()
      swapKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
      swapPeer = PeerInfo.new(swapKey, @[swapLoc])
      # Create store peer
      storeLoc = MultiAddress.init("/ip4/127.0.0.3/tcp/4").tryGet()
      storeKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
      storePeer = PeerInfo.new(storeKey, @[storeLoc])
    
    await node.start()

    node.mountFilter()
    node.mountSwap()
    node.mountStore(persistMessages = true)

    node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo())
    node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())
    node.wakuStore.setPeer(storePeer.toRemotePeerInfo())

    # Check peers were successfully added to peer manager
    check:
      node.peerManager.peers().len == 3
      node.peerManager.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
                                                    it.addrs.contains(filterLoc) and
                                                    it.protos.contains(WakuFilterCodec))
      node.peerManager.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and
                                                  it.addrs.contains(swapLoc) and
                                                  it.protos.contains(WakuSwapCodec))
      node.peerManager.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
                                                   it.addrs.contains(storeLoc) and
                                                   it.protos.contains(WakuStoreCodec))
    
    await node.stop()
  

  asyncTest "Peer manager keeps track of connections":
    let
      nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
      node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
        Port(60000))
      nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
      node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
        Port(60002))
      peerInfo2 = node2.switch.peerInfo
    
    await node1.start()

    node1.mountRelay()
    node2.mountRelay()

    # Test default connectedness for new peers
    node1.peerManager.addPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec)
    check:
      # No information about node2's connectedness
      node1.peerManager.connectedness(peerInfo2.peerId) == NotConnected

    # Purposefully don't start node2
    # Attempt dialing node2 from node1
    discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
    check:
      # Cannot connect to node2
      node1.peerManager.connectedness(peerInfo2.peerId) == CannotConnect

    # Successful connection
    await node2.start()
    discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
    check:
      # Currently connected to node2
      node1.peerManager.connectedness(peerInfo2.peerId) == Connected

    # Stop node. Gracefully disconnect from all peers.
    await node1.stop()
    check:
      # Not currently connected to node2, but had recent, successful connection.
      node1.peerManager.connectedness(peerInfo2.peerId) == CanConnect
    
    await node2.stop()

  asyncTest "Peer manager can use persistent storage and survive restarts":
    let
      database = SqliteDatabase.init("1", inMemory = true)[]
      storage = WakuPeerStorage.new(database)[]
      nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
      node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
        Port(60000), peerStorage = storage)
      nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
      node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
        Port(60002))
      peerInfo2 = node2.switch.peerInfo
    
    await node1.start()
    await node2.start()

    node1.mountRelay()
    node2.mountRelay()

    discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
    check:
      # Currently connected to node2
      node1.peerManager.peers().len == 1
      node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
      node1.peerManager.connectedness(peerInfo2.peerId) == Connected

    # Simulate restart by initialising a new node using the same storage
    let
      nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
      node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"),
        Port(60004), peerStorage = storage)
    
    await node3.start()
    check:
      # Node2 has been loaded after "restart", but we have not yet reconnected
      node3.peerManager.peers().len == 1
      node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
      node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected

    node3.mountRelay()  # This should trigger a reconnect
    
    check:
      # Reconnected to node2 after "restart"
      node3.peerManager.peers().len == 1
      node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
      node3.peerManager.connectedness(peerInfo2.peerId) == Connected
    
    await allFutures([node1.stop(), node2.stop(), node3.stop()])

asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
    let
      database = SqliteDatabase.init("2", inMemory = true)[]
      storage = WakuPeerStorage.new(database)[]
      nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
      node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
        Port(60000), peerStorage = storage)
      nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
      node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
        Port(60002))
      peerInfo2 = node2.switch.peerInfo
      betaCodec = "/vac/waku/relay/2.0.0-beta2"
      stableCodec = "/vac/waku/relay/2.0.0"
    
    await node1.start()
    await node2.start()

    node1.mountRelay()
    node1.wakuRelay.codec = betaCodec
    node2.mountRelay()
    node2.wakuRelay.codec = betaCodec

    discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), node2.wakuRelay.codec, 2.seconds)
    check:
      # Currently connected to node2
      node1.peerManager.peers().len == 1
      node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
      node1.peerManager.peers().anyIt(it.protos.contains(node2.wakuRelay.codec))
      node1.peerManager.connectedness(peerInfo2.peerId) == Connected

    # Simulate restart by initialising a new node using the same storage
    let
      nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
      node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"),
        Port(60004), peerStorage = storage)
    
    node3.mountRelay()
    node3.wakuRelay.codec = stableCodec
    check:
      # Node 2 and 3 have differing codecs
      node2.wakuRelay.codec == betaCodec
      node3.wakuRelay.codec == stableCodec
      # Node2 has been loaded after "restart", but we have not yet reconnected
      node3.peerManager.peers().len == 1
      node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
      node3.peerManager.peers().anyIt(it.protos.contains(betaCodec))
      node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected
    
    await node3.start() # This should trigger a reconnect

    check:
      # Reconnected to node2 after "restart"
      node3.peerManager.peers().len == 1
      node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
      node3.peerManager.peers().anyIt(it.protos.contains(betaCodec))
      node3.peerManager.peers().anyIt(it.protos.contains(stableCodec))
      node3.peerManager.connectedness(peerInfo2.peerId) == Connected
    
    await allFutures([node1.stop(), node2.stop(), node3.stop()])