Fix BLS setup
This commit is contained in:
parent
d59d5c4e11
commit
5950ed20eb
|
@ -365,11 +365,30 @@ func tick*(sm: var RaftStateMachine, now: times.DateTime) =
|
|||
elif sm.state.isFollower and sm.timeNow - sm.lastElectionTime > sm.randomizedElectionTime:
|
||||
sm.debug "Become candidate"
|
||||
sm.becomeCandidate()
|
||||
|
||||
|
||||
func commit(sm: var RaftStateMachine) =
|
||||
if not sm.state.isLeader:
|
||||
return
|
||||
var newIndex = sm.commitIndex
|
||||
var nextIndex = sm.commitIndex + 1
|
||||
while nextIndex < sm.log.lastIndex:
|
||||
var replicationCnt = 1
|
||||
for p in sm.leader.tracker.progress:
|
||||
if p.matchIndex > newIndex:
|
||||
replicationCnt += 1
|
||||
sm.debug "replication count" & $replicationCnt
|
||||
if replicationCnt >= (sm.leader.tracker.progress.len div 2 + 1):
|
||||
sm.commitIndex = nextIndex;
|
||||
nextIndex += 1
|
||||
newIndex += 1
|
||||
else:
|
||||
break
|
||||
|
||||
func poll*(sm: var RaftStateMachine): RaftStateMachineOutput =
|
||||
# Should initiate replication if we have new entries
|
||||
if sm.state.isLeader:
|
||||
sm.replicate()
|
||||
sm.commit()
|
||||
sm.output.term = sm.term
|
||||
if sm.observedState.commitIndex < sm.commitIndex:
|
||||
for i in (sm.observedState.commitIndex + 1)..<(sm.commitIndex + 1):
|
||||
|
@ -383,23 +402,6 @@ func poll*(sm: var RaftStateMachine): RaftStateMachineOutput =
|
|||
sm.output = RaftStateMachineOutput()
|
||||
return output
|
||||
|
||||
func commit(sm: var RaftStateMachine) =
|
||||
if not sm.state.isLeader:
|
||||
return
|
||||
var newIndex = sm.commitIndex
|
||||
var nextIndex = sm.commitIndex + 1
|
||||
while nextIndex < sm.log.lastIndex:
|
||||
var replicationCnt = 0
|
||||
for p in sm.leader.tracker.progress:
|
||||
if p.matchIndex > newIndex:
|
||||
replicationCnt += 1
|
||||
if replicationCnt >= (sm.leader.tracker.progress.len div 2 + 1):
|
||||
sm.commitIndex = nextIndex;
|
||||
nextIndex += 1
|
||||
newIndex += 1
|
||||
else:
|
||||
break
|
||||
|
||||
func appendEntryReply*(sm: var RaftStateMachine, fromId: RaftNodeId, reply: RaftRpcAppendReply) =
|
||||
if not sm.state.isLeader:
|
||||
sm.debug "You can't append append reply to the follower"
|
||||
|
@ -415,7 +417,6 @@ func appendEntryReply*(sm: var RaftStateMachine, fromId: RaftNodeId, reply: Raft
|
|||
sm.debug "Accpeted" & $fromId & " " & $lastIndex
|
||||
follower.get().accepted(lastIndex)
|
||||
# TODO: add leader stepping down logic here
|
||||
sm.commit()
|
||||
if not sm.state.isLeader:
|
||||
return
|
||||
of RaftRpcCode.Rejected:
|
||||
|
|
|
@ -22,35 +22,43 @@ import unittest2
|
|||
type
|
||||
UserStateMachine = object
|
||||
|
||||
Message* = object
|
||||
fieldInt: int
|
||||
|
||||
Hash = int
|
||||
|
||||
UserState* = object
|
||||
lastCommitedMsg: Message
|
||||
|
||||
SignedLogEntry = object
|
||||
hash: Hash
|
||||
signature: SignedShare
|
||||
|
||||
BLSTestNode* = ref object
|
||||
stm: RaftStateMachine
|
||||
keyShare: SecretShare
|
||||
us: UserState
|
||||
blockCommunication: bool
|
||||
debugLogs: seq[DebugLogEntry]
|
||||
messageSignatures: Table[Hash, seq[SignedShare]]
|
||||
signEntries: seq[SignedLogEntry]
|
||||
clusterPublicKey: PublicKey
|
||||
|
||||
BLSTestCluster* = object
|
||||
nodes*: Table[RaftnodeId, BLSTestNode]
|
||||
|
||||
Message* = object
|
||||
fieldInt: int
|
||||
fieldBool: bool
|
||||
|
||||
SecretShare = object
|
||||
secret: SecretKey
|
||||
id: ID
|
||||
|
||||
SignsShare = object
|
||||
SignedShare = object
|
||||
sign: Signature
|
||||
pubkey: PublicKey
|
||||
id: ID
|
||||
|
||||
SignedRpcMessage* = object
|
||||
raftMsg: RaftRpcMessage
|
||||
signature: SignsShare
|
||||
signEntries: seq[SignedLogEntry]
|
||||
|
||||
var secretKey = "1b500388741efd98239a9b3a689721a89a92e8b209aabb10fb7dc3f844976dc2"
|
||||
|
||||
|
@ -64,6 +72,12 @@ var test_ids_1 = @[
|
|||
RaftnodeId(parseUUID("a8409b39-f17b-4682-aaef-a19cc9f356fb")),
|
||||
]
|
||||
|
||||
proc signs(shares: openArray[SignedShare]): seq[Signature] =
|
||||
shares.mapIt(it.sign)
|
||||
|
||||
proc ids(shares: openArray[SignedShare]): seq[ID] =
|
||||
shares.mapIt(it.id)
|
||||
|
||||
func createConfigFromIds*(ids: seq[RaftnodeId]): RaftConfig =
|
||||
var config = RaftConfig()
|
||||
for id in ids:
|
||||
|
@ -78,6 +92,9 @@ proc toCommand(msg: Message): Command =
|
|||
var msgJson = $(msg.toJson)
|
||||
return Command(data: msgJson.toBytes)
|
||||
|
||||
proc toMessage(cmd: Command): Message =
|
||||
return to(parseJson(cmd.data.toString), Message)
|
||||
|
||||
proc toBytes(msg: Message): seq[byte] =
|
||||
var msgJson = $(msg.toJson)
|
||||
return msgJson.toBytes
|
||||
|
@ -86,33 +103,72 @@ proc toBytes(msg: RaftRpcMessage): seq[byte] =
|
|||
var msgJson = $(msg.toJson)
|
||||
return msgJson.toBytes
|
||||
|
||||
proc cmpLogs*(x, y: DebugLogEntry): int =
|
||||
cmp(x.time, y.time)
|
||||
|
||||
func `$`*(de: DebugLogEntry): string =
|
||||
return "[" & $de.level & "][" & de.time.format("HH:mm:ss:fff") & "][" & (($de.nodeId)[0..7]) & "...][" & $de.state & "]: " & de.msg
|
||||
|
||||
|
||||
proc pollMessages(node: BLSTestNode): seq[SignedRpcMessage] =
|
||||
var output = node.stm.poll()
|
||||
node.debugLogs.add(output.debugLogs)
|
||||
var debugLogs = output.debugLogs
|
||||
var msgs: seq[SignedRpcMessage]
|
||||
var pk: PublicKey
|
||||
discard pk.publicFromSecret(node.keyShare.secret)
|
||||
for msg in output.messages:
|
||||
msgs.add(SignedRpcMessage(
|
||||
raftMsg: msg,
|
||||
signature: SignsShare(
|
||||
sign: node.keyShare.secret.sign(msg.toBytes),
|
||||
pubkey: pk,
|
||||
id: node.keyShare.id,
|
||||
)
|
||||
))
|
||||
for msg in output.messages:
|
||||
if msg.kind == RaftRpcMessageType.AppendReply:
|
||||
msgs.add(SignedRpcMessage(
|
||||
raftMsg: msg,
|
||||
signEntries: node.signEntries
|
||||
))
|
||||
node.signEntries = @[]
|
||||
else:
|
||||
msgs.add(SignedRpcMessage(
|
||||
raftMsg: msg,
|
||||
signEntries: @[]
|
||||
))
|
||||
if node.stm.state.isLeader:
|
||||
for commitedMsg in output.committed:
|
||||
if commitedMsg.kind != rletCommand:
|
||||
continue
|
||||
var orgMsg = commitedMsg.command.toMessage
|
||||
var shares = node.messageSignatures[orgMsg.fieldInt]
|
||||
echo "Try to recover message" & $orgMsg.toBytes
|
||||
var recoveredSignature = recover(shares.signs, shares.ids).expect("valid shares")
|
||||
if not node.clusterPublicKey.verify(orgMsg.toBytes, recoveredSignature):
|
||||
node.us.lastCommitedMsg = orgMsg
|
||||
echo "State succesfuly changed"
|
||||
else:
|
||||
echo "Failed to reconstruct signature"
|
||||
|
||||
debugLogs.sort(cmpLogs)
|
||||
for msg in debugLogs:
|
||||
if msg.level <= DebugLogLevel.Debug:
|
||||
echo $msg
|
||||
return msgs
|
||||
|
||||
proc acceptMessage(node: BLSTestNode, msg: SignedRpcMessage, now: times.DateTime) =
|
||||
if msg.raftMsg.kind == RaftRpcMessageType.AppendReply and node.stm.state.isFollower:
|
||||
var pk: PublicKey
|
||||
discard pk.publicFromSecret(node.keyShare.secret)
|
||||
for entry in msg.raftMsg.appendRequest.entries:
|
||||
var orgMsg = entry.command.toMessage
|
||||
echo "Sign message" & $orgMsg.toBytes
|
||||
var share = SignedLogEntry(
|
||||
hash: orgMsg.fieldInt,
|
||||
signature: SignedShare(
|
||||
sign: node.keyShare.secret.sign(orgMsg.toBytes),
|
||||
pubkey: pk,
|
||||
id: node.keyShare.id,
|
||||
)
|
||||
)
|
||||
node.signEntries.add(share)
|
||||
node.stm.advance(msg.raftMsg, now)
|
||||
|
||||
proc tick(node: BLSTestNode, now: times.DateTime) =
|
||||
node.stm.tick(now)
|
||||
|
||||
proc fromCommand(cmd: Command): Message =
|
||||
return to(parseJson(cmd.data.toString), Message)
|
||||
|
||||
proc keyGen(seed: uint64): tuple[pubkey: PublicKey, seckey: SecretKey] =
|
||||
var ikm: array[32, byte]
|
||||
ikm[0 ..< 8] = seed.toBytesLE
|
||||
|
@ -145,7 +201,6 @@ proc createBLSCluster(ids: seq[RaftnodeId], now: times.DateTime) : BLSTestCluste
|
|||
|
||||
var blsShares = generateSecretShares(sk, 2, 3)
|
||||
|
||||
|
||||
var config = createConfigFromIds(ids)
|
||||
var cluster = BLSTestCluster()
|
||||
cluster.nodes = initTable[RaftnodeId, BLSTestNode]()
|
||||
|
@ -157,18 +212,13 @@ proc createBLSCluster(ids: seq[RaftnodeId], now: times.DateTime) : BLSTestCluste
|
|||
stm: initRaftStateMachine(id, 0, log, 0, config, now, initRand(i + 42)),
|
||||
keyShare: blsShares[i],
|
||||
blockCommunication: false,
|
||||
clusterPublicKey: pk,
|
||||
)
|
||||
|
||||
return cluster
|
||||
|
||||
proc cmpLogs*(x, y: DebugLogEntry): int =
|
||||
cmp(x.time, y.time)
|
||||
|
||||
func `$`*(de: DebugLogEntry): string =
|
||||
return "[" & $de.level & "][" & de.time.format("HH:mm:ss:fff") & "][" & (($de.nodeId)[0..7]) & "...][" & $de.state & "]: " & de.msg
|
||||
|
||||
proc advance*(tc: var BLSTestCluster, now: times.DateTime, logLevel: DebugLogLevel = DebugLogLevel.Error) =
|
||||
var debugLogs : seq[DebugLogEntry]
|
||||
for id, node in tc.nodes:
|
||||
node.tick(now)
|
||||
var msgs = node.pollMessages()
|
||||
|
@ -186,18 +236,54 @@ func getLeader*(tc: BLSTestCluster): Option[BLSTestNode] =
|
|||
proc submitMessage(tc: var BLSTestCluster, msg: Message): bool =
|
||||
var leader = tc.getLeader()
|
||||
if leader.isSome():
|
||||
var pk: PublicKey
|
||||
discard pk.publicFromSecret(leader.get.keyShare.secret)
|
||||
echo "Leader Sign message" & $msg.toBytes
|
||||
var share = SignedShare(
|
||||
sign: leader.get.keyShare.secret.sign(msg.toBytes),
|
||||
pubkey: pk,
|
||||
id: leader.get.keyShare.id,
|
||||
)
|
||||
if not leader.get.messageSignatures.hasKey(msg.fieldInt):
|
||||
leader.get.messageSignatures[msg.fieldInt] = @[]
|
||||
leader.get.messageSignatures[msg.fieldInt].add(share)
|
||||
leader.get().stm.addEntry(msg.toCommand())
|
||||
|
||||
|
||||
proc blsconsensusMain*() =
|
||||
suite "BLS consensus tests":
|
||||
test "create single node cluster":
|
||||
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
|
||||
var cluster = createBLSCluster(test_ids_1, timeNow)
|
||||
# test "create single node cluster":
|
||||
# var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
|
||||
# var cluster = createBLSCluster(test_ids_1, timeNow)
|
||||
|
||||
timeNow += 300.milliseconds
|
||||
# timeNow += 300.milliseconds
|
||||
# cluster.advance(timeNow)
|
||||
# echo cluster.getLeader().get().stm.state
|
||||
# discard cluster.submitMessage(Message(fieldInt: 1))
|
||||
# discard cluster.submitMessage(Message(fieldInt: 2))
|
||||
# for i in 0..<305:
|
||||
# timeNow += 5.milliseconds
|
||||
# cluster.advance(timeNow)
|
||||
|
||||
# echo "Helloo" & $cluster.getLeader().get.us.lastCommitedMsg
|
||||
|
||||
test "create 3 node cluster":
|
||||
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
|
||||
var cluster = createBLSCluster(test_ids_3, timeNow)
|
||||
|
||||
#timeNow += 300.milliseconds
|
||||
cluster.advance(timeNow)
|
||||
echo cluster.getLeader().get().stm
|
||||
discard cluster.submitMessage(Message(fieldInt: 1, fieldBool: false))
|
||||
var added = false
|
||||
for i in 0..<305:
|
||||
cluster.advance(timeNow)
|
||||
if cluster.getLeader().isSome() and not added:
|
||||
discard cluster.submitMessage(Message(fieldInt: 1))
|
||||
added = true
|
||||
echo "Add to the entry log"
|
||||
timeNow += 5.milliseconds
|
||||
|
||||
#echo $cluster.nodes
|
||||
echo "Last state" & $cluster.getLeader().get.us.lastCommitedMsg
|
||||
|
||||
|
||||
if isMainModule:
|
||||
|
|
Loading…
Reference in New Issue