From 5950ed20eb3f2704e71b2a7b08692d7684102f51 Mon Sep 17 00:00:00 2001 From: Marto Date: Wed, 21 Feb 2024 19:20:33 +0200 Subject: [PATCH] Fix BLS setup --- src/raft/consensus_state_machine.nim | 39 +++---- tests/test_bls_cluester.nim | 150 +++++++++++++++++++++------ 2 files changed, 138 insertions(+), 51 deletions(-) diff --git a/src/raft/consensus_state_machine.nim b/src/raft/consensus_state_machine.nim index 1ac3adb..2156bee 100644 --- a/src/raft/consensus_state_machine.nim +++ b/src/raft/consensus_state_machine.nim @@ -365,11 +365,30 @@ func tick*(sm: var RaftStateMachine, now: times.DateTime) = elif sm.state.isFollower and sm.timeNow - sm.lastElectionTime > sm.randomizedElectionTime: sm.debug "Become candidate" sm.becomeCandidate() - + +func commit(sm: var RaftStateMachine) = + if not sm.state.isLeader: + return + var newIndex = sm.commitIndex + var nextIndex = sm.commitIndex + 1 + while nextIndex < sm.log.lastIndex: + var replicationCnt = 1 + for p in sm.leader.tracker.progress: + if p.matchIndex > newIndex: + replicationCnt += 1 + sm.debug "replication count" & $replicationCnt + if replicationCnt >= (sm.leader.tracker.progress.len div 2 + 1): + sm.commitIndex = nextIndex; + nextIndex += 1 + newIndex += 1 + else: + break + func poll*(sm: var RaftStateMachine): RaftStateMachineOutput = # Should initiate replication if we have new entries if sm.state.isLeader: sm.replicate() + sm.commit() sm.output.term = sm.term if sm.observedState.commitIndex < sm.commitIndex: for i in (sm.observedState.commitIndex + 1)..<(sm.commitIndex + 1): @@ -383,23 +402,6 @@ func poll*(sm: var RaftStateMachine): RaftStateMachineOutput = sm.output = RaftStateMachineOutput() return output -func commit(sm: var RaftStateMachine) = - if not sm.state.isLeader: - return - var newIndex = sm.commitIndex - var nextIndex = sm.commitIndex + 1 - while nextIndex < sm.log.lastIndex: - var replicationCnt = 0 - for p in sm.leader.tracker.progress: - if p.matchIndex > newIndex: - replicationCnt += 1 - if replicationCnt >= (sm.leader.tracker.progress.len div 2 + 1): - sm.commitIndex = nextIndex; - nextIndex += 1 - newIndex += 1 - else: - break - func appendEntryReply*(sm: var RaftStateMachine, fromId: RaftNodeId, reply: RaftRpcAppendReply) = if not sm.state.isLeader: sm.debug "You can't append append reply to the follower" @@ -415,7 +417,6 @@ func appendEntryReply*(sm: var RaftStateMachine, fromId: RaftNodeId, reply: Raft sm.debug "Accpeted" & $fromId & " " & $lastIndex follower.get().accepted(lastIndex) # TODO: add leader stepping down logic here - sm.commit() if not sm.state.isLeader: return of RaftRpcCode.Rejected: diff --git a/tests/test_bls_cluester.nim b/tests/test_bls_cluester.nim index 06f879f..5421a58 100644 --- a/tests/test_bls_cluester.nim +++ b/tests/test_bls_cluester.nim @@ -22,35 +22,43 @@ import unittest2 type UserStateMachine = object + Message* = object + fieldInt: int + + Hash = int + UserState* = object lastCommitedMsg: Message + SignedLogEntry = object + hash: Hash + signature: SignedShare + BLSTestNode* = ref object stm: RaftStateMachine keyShare: SecretShare us: UserState blockCommunication: bool debugLogs: seq[DebugLogEntry] + messageSignatures: Table[Hash, seq[SignedShare]] + signEntries: seq[SignedLogEntry] + clusterPublicKey: PublicKey BLSTestCluster* = object nodes*: Table[RaftnodeId, BLSTestNode] - Message* = object - fieldInt: int - fieldBool: bool - SecretShare = object secret: SecretKey id: ID - SignsShare = object + SignedShare = object sign: Signature pubkey: PublicKey id: ID SignedRpcMessage* = object raftMsg: RaftRpcMessage - signature: SignsShare + signEntries: seq[SignedLogEntry] var secretKey = "1b500388741efd98239a9b3a689721a89a92e8b209aabb10fb7dc3f844976dc2" @@ -64,6 +72,12 @@ var test_ids_1 = @[ RaftnodeId(parseUUID("a8409b39-f17b-4682-aaef-a19cc9f356fb")), ] +proc signs(shares: openArray[SignedShare]): seq[Signature] = + shares.mapIt(it.sign) + +proc ids(shares: openArray[SignedShare]): seq[ID] = + shares.mapIt(it.id) + func createConfigFromIds*(ids: seq[RaftnodeId]): RaftConfig = var config = RaftConfig() for id in ids: @@ -78,6 +92,9 @@ proc toCommand(msg: Message): Command = var msgJson = $(msg.toJson) return Command(data: msgJson.toBytes) +proc toMessage(cmd: Command): Message = + return to(parseJson(cmd.data.toString), Message) + proc toBytes(msg: Message): seq[byte] = var msgJson = $(msg.toJson) return msgJson.toBytes @@ -86,33 +103,72 @@ proc toBytes(msg: RaftRpcMessage): seq[byte] = var msgJson = $(msg.toJson) return msgJson.toBytes +proc cmpLogs*(x, y: DebugLogEntry): int = + cmp(x.time, y.time) + +func `$`*(de: DebugLogEntry): string = + return "[" & $de.level & "][" & de.time.format("HH:mm:ss:fff") & "][" & (($de.nodeId)[0..7]) & "...][" & $de.state & "]: " & de.msg + + proc pollMessages(node: BLSTestNode): seq[SignedRpcMessage] = var output = node.stm.poll() - node.debugLogs.add(output.debugLogs) + var debugLogs = output.debugLogs var msgs: seq[SignedRpcMessage] var pk: PublicKey discard pk.publicFromSecret(node.keyShare.secret) - for msg in output.messages: - msgs.add(SignedRpcMessage( - raftMsg: msg, - signature: SignsShare( - sign: node.keyShare.secret.sign(msg.toBytes), - pubkey: pk, - id: node.keyShare.id, - ) - )) + for msg in output.messages: + if msg.kind == RaftRpcMessageType.AppendReply: + msgs.add(SignedRpcMessage( + raftMsg: msg, + signEntries: node.signEntries + )) + node.signEntries = @[] + else: + msgs.add(SignedRpcMessage( + raftMsg: msg, + signEntries: @[] + )) + if node.stm.state.isLeader: + for commitedMsg in output.committed: + if commitedMsg.kind != rletCommand: + continue + var orgMsg = commitedMsg.command.toMessage + var shares = node.messageSignatures[orgMsg.fieldInt] + echo "Try to recover message" & $orgMsg.toBytes + var recoveredSignature = recover(shares.signs, shares.ids).expect("valid shares") + if not node.clusterPublicKey.verify(orgMsg.toBytes, recoveredSignature): + node.us.lastCommitedMsg = orgMsg + echo "State succesfuly changed" + else: + echo "Failed to reconstruct signature" + debugLogs.sort(cmpLogs) + for msg in debugLogs: + if msg.level <= DebugLogLevel.Debug: + echo $msg return msgs proc acceptMessage(node: BLSTestNode, msg: SignedRpcMessage, now: times.DateTime) = + if msg.raftMsg.kind == RaftRpcMessageType.AppendReply and node.stm.state.isFollower: + var pk: PublicKey + discard pk.publicFromSecret(node.keyShare.secret) + for entry in msg.raftMsg.appendRequest.entries: + var orgMsg = entry.command.toMessage + echo "Sign message" & $orgMsg.toBytes + var share = SignedLogEntry( + hash: orgMsg.fieldInt, + signature: SignedShare( + sign: node.keyShare.secret.sign(orgMsg.toBytes), + pubkey: pk, + id: node.keyShare.id, + ) + ) + node.signEntries.add(share) node.stm.advance(msg.raftMsg, now) proc tick(node: BLSTestNode, now: times.DateTime) = node.stm.tick(now) -proc fromCommand(cmd: Command): Message = - return to(parseJson(cmd.data.toString), Message) - proc keyGen(seed: uint64): tuple[pubkey: PublicKey, seckey: SecretKey] = var ikm: array[32, byte] ikm[0 ..< 8] = seed.toBytesLE @@ -145,7 +201,6 @@ proc createBLSCluster(ids: seq[RaftnodeId], now: times.DateTime) : BLSTestCluste var blsShares = generateSecretShares(sk, 2, 3) - var config = createConfigFromIds(ids) var cluster = BLSTestCluster() cluster.nodes = initTable[RaftnodeId, BLSTestNode]() @@ -157,18 +212,13 @@ proc createBLSCluster(ids: seq[RaftnodeId], now: times.DateTime) : BLSTestCluste stm: initRaftStateMachine(id, 0, log, 0, config, now, initRand(i + 42)), keyShare: blsShares[i], blockCommunication: false, + clusterPublicKey: pk, ) return cluster -proc cmpLogs*(x, y: DebugLogEntry): int = - cmp(x.time, y.time) - -func `$`*(de: DebugLogEntry): string = - return "[" & $de.level & "][" & de.time.format("HH:mm:ss:fff") & "][" & (($de.nodeId)[0..7]) & "...][" & $de.state & "]: " & de.msg proc advance*(tc: var BLSTestCluster, now: times.DateTime, logLevel: DebugLogLevel = DebugLogLevel.Error) = - var debugLogs : seq[DebugLogEntry] for id, node in tc.nodes: node.tick(now) var msgs = node.pollMessages() @@ -186,18 +236,54 @@ func getLeader*(tc: BLSTestCluster): Option[BLSTestNode] = proc submitMessage(tc: var BLSTestCluster, msg: Message): bool = var leader = tc.getLeader() if leader.isSome(): + var pk: PublicKey + discard pk.publicFromSecret(leader.get.keyShare.secret) + echo "Leader Sign message" & $msg.toBytes + var share = SignedShare( + sign: leader.get.keyShare.secret.sign(msg.toBytes), + pubkey: pk, + id: leader.get.keyShare.id, + ) + if not leader.get.messageSignatures.hasKey(msg.fieldInt): + leader.get.messageSignatures[msg.fieldInt] = @[] + leader.get.messageSignatures[msg.fieldInt].add(share) leader.get().stm.addEntry(msg.toCommand()) + proc blsconsensusMain*() = suite "BLS consensus tests": - test "create single node cluster": - var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc()) - var cluster = createBLSCluster(test_ids_1, timeNow) + # test "create single node cluster": + # var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc()) + # var cluster = createBLSCluster(test_ids_1, timeNow) - timeNow += 300.milliseconds + # timeNow += 300.milliseconds + # cluster.advance(timeNow) + # echo cluster.getLeader().get().stm.state + # discard cluster.submitMessage(Message(fieldInt: 1)) + # discard cluster.submitMessage(Message(fieldInt: 2)) + # for i in 0..<305: + # timeNow += 5.milliseconds + # cluster.advance(timeNow) + + # echo "Helloo" & $cluster.getLeader().get.us.lastCommitedMsg + + test "create 3 node cluster": + var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc()) + var cluster = createBLSCluster(test_ids_3, timeNow) + + #timeNow += 300.milliseconds cluster.advance(timeNow) - echo cluster.getLeader().get().stm - discard cluster.submitMessage(Message(fieldInt: 1, fieldBool: false)) + var added = false + for i in 0..<305: + cluster.advance(timeNow) + if cluster.getLeader().isSome() and not added: + discard cluster.submitMessage(Message(fieldInt: 1)) + added = true + echo "Add to the entry log" + timeNow += 5.milliseconds + + #echo $cluster.nodes + echo "Last state" & $cluster.getLeader().get.us.lastCommitedMsg if isMainModule: