Dynamically add handler and use String for peerId

This commit is contained in:
nicksavers 2014-10-08 09:56:39 +02:00
parent 4093fc9e65
commit d2bc29f32e
12 changed files with 60 additions and 60 deletions

View File

@ -39,7 +39,7 @@ public class SystemProperties {
private static Boolean DEFAULT_BLOCKCHAIN_ONLY = false;
private static int DEFAULT_TRACE_STARTBLOCK = -1;
private static int DEFAULT_MAX_HASHES_ASK = -1; // unlimited
private static byte DEFAULT_MAX_BLOCKS_ASK = 10;
private static int DEFAULT_MAX_BLOCKS_ASK = 10;
private static int DEFAULT_MAX_BLOCKS_QUEUED = 300;
private static String DEFAULT_PROJECT_VERSION = "";
private static String DEFAULT_HELLO_PHRASE = "Dev";
@ -192,9 +192,9 @@ public class SystemProperties {
return Integer.parseInt(prop.getProperty("max.hashes.ask"));
}
public Byte maxBlocksAsk() {
public Integer maxBlocksAsk() {
if (prop.isEmpty()) return DEFAULT_MAX_BLOCKS_ASK;
return Byte.parseByte(prop.getProperty("max.blocks.ask"));
return Integer.parseInt(prop.getProperty("max.blocks.ask"));
}
public Integer maxBlocksQueued() {

View File

@ -1,5 +1,7 @@
package org.ethereum.core;
import static org.ethereum.config.SystemProperties.CONFIG;
import org.ethereum.config.SystemProperties;
import org.ethereum.manager.WorldManager;
import org.slf4j.Logger;
@ -20,7 +22,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
public class BlockQueue {
private static Logger logger = LoggerFactory.getLogger("blockchain");
private static Logger logger = LoggerFactory.getLogger("blockqueue");
/** The list of hashes of the heaviest chain on the network,
* for which this client doesn't have the blocks yet */
@ -52,6 +54,7 @@ public class BlockQueue {
if (blockReceivedQueue.isEmpty())
return;
logger.debug("BlockQueue size: {}", blockReceivedQueue.size());
Block block = blockReceivedQueue.poll();
WorldManager.getInstance().getBlockchain().add(block);
}
@ -74,14 +77,14 @@ public class BlockQueue {
for (Block block : blockList) {
if (blockReceivedQueue.size() > SystemProperties.CONFIG.maxBlocksQueued())
if (blockReceivedQueue.size() <= SystemProperties.CONFIG.maxBlocksQueued())
return;
this.lastBlock = block;
logger.trace("Last block now index: [{}]", lastBlock.getNumber());
logger.debug("Last block now index: [{}]", lastBlock.getNumber());
blockReceivedQueue.add(lastBlock);
}
logger.trace("Blocks waiting to be proceed in the queue: [{}]",
logger.debug("Blocks waiting to be proceed in the queue: [{}]",
blockReceivedQueue.size());
}
@ -90,6 +93,8 @@ public class BlockQueue {
* this will return the last block added to the blockchain.
*
* @return The last known block this client on the network
* and will never return <code>null</code> as there is
* always the Genesis block at the start of the chain.
*/
public Block getLastBlock() {
if (blockReceivedQueue.isEmpty())
@ -128,12 +133,12 @@ public class BlockQueue {
* @param amount - the number of hashes to return
* @return A list of hashes for which blocks need to be retrieved.
*/
public List<byte[]> getHashes(int amount) {
public List<byte[]> getHashes() {
int amount = CONFIG.maxBlocksAsk();
List<byte[]> hashes = new ArrayList<>();
for (int i = 0; i < amount; i++) {
if (!blockHashQueue.isEmpty())
while (!blockHashQueue.isEmpty())
hashes.add(blockHashQueue.pollLast());
else break;
}
return hashes;
}

View File

@ -216,7 +216,8 @@ public class BlockchainImpl implements Blockchain {
if (logger.isDebugEnabled())
logger.debug("block added {}", block.toFlatString());
logger.info("*** Last block added [ #{} ]", block.getNumber());
if (block.getNumber() % 100 == 0)
logger.info("*** Last block added [ #{} ]", block.getNumber());
}
/**

View File

@ -3,8 +3,6 @@ package org.ethereum.net.client;
import org.ethereum.util.RLP;
import org.spongycastle.util.encoders.Hex;
import java.util.Arrays;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
@ -16,14 +14,14 @@ public class Peer {
private InetAddress address;
private int port;
private byte[] peerId;
private String peerId;
private List<String> capabilities;
private transient boolean isOnline = false;
private transient long lastCheckTime = 0;
public Peer(InetAddress ip, int port, byte[] peerId) {
public Peer(InetAddress ip, int port, String peerId) {
this.address = ip;
this.port = port;
this.peerId = peerId;
@ -39,7 +37,7 @@ public class Peer {
}
public String getPeerId() {
return peerId == null ? "" : Hex.toHexString(peerId);
return peerId == null ? "" : peerId;
}
public boolean isOnline() {
@ -67,7 +65,7 @@ public class Peer {
public byte[] getEncoded() {
byte[] ip = RLP.encodeElement(this.address.getAddress());
byte[] port = RLP.encodeInt(this.port);
byte[] peerId = RLP.encodeElement(this.peerId);
byte[] peerId = RLP.encodeElement(Hex.decode(this.peerId));
byte[][] encodedCaps = new byte[this.capabilities.size()][];
for (int i = 0; i < this.capabilities.size(); i++) {
encodedCaps[i] = RLP.encodeString(this.capabilities.get(i));
@ -87,7 +85,7 @@ public class Peer {
public boolean equals(Object obj) {
if (obj == null) return false;
Peer peerData = (Peer) obj;
return Arrays.equals(peerData.peerId, this.peerId)
return peerData.peerId.equals(this.peerId)
|| this.getAddress().equals(peerData.getAddress());
}

View File

@ -8,7 +8,6 @@ import io.netty.handler.timeout.ReadTimeoutHandler;
import org.ethereum.manager.WorldManager;
import org.ethereum.net.PeerListener;
import org.ethereum.net.handler.EthHandler;
import org.ethereum.net.handler.P2pHandler;
import org.ethereum.net.handler.PacketDecoder;
import org.ethereum.net.handler.PacketEncoder;
@ -30,7 +29,6 @@ public class PeerClient {
private PeerListener peerListener;
private P2pHandler p2pHandler;
private EthHandler ethHandler;
public PeerClient() {
}
@ -47,7 +45,6 @@ public class PeerClient {
peerListener.console("Connecting to: " + host + ":" + port);
p2pHandler = new P2pHandler(peerListener);
ethHandler = new EthHandler(peerListener);
try {
Bootstrap b = new Bootstrap();
@ -66,8 +63,7 @@ public class PeerClient {
new ReadTimeoutHandler(CONFIG.peerChannelReadTimeout(), TimeUnit.SECONDS));
ch.pipeline().addLast(new PacketEncoder());
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast("p2p", p2pHandler);
ch.pipeline().addLast("eth", ethHandler);
ch.pipeline().addLast(p2pHandler);
// limit the size of receiving buffer to 1024
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(32368));

View File

@ -58,10 +58,11 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
private int secToAskForBlocks = 1;
private String peerId;
private Blockchain blockchain;
private PeerListener peerListener;
private static boolean hashRetrievalLocked = false;
private static String hashRetrievalLock;
private MessageQueue msgQueue = null;
private Timer getBlocksTimer = new Timer("GetBlocksTimer");
@ -72,8 +73,9 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
this.blockchain = WorldManager.getInstance().getBlockchain();
}
public EthHandler(PeerListener peerListener) {
public EthHandler(String peerId, PeerListener peerListener) {
this();
this.peerId = peerId;
this.peerListener = peerListener;
}
@ -159,17 +161,16 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
msgQueue.sendMessage(new DisconnectMessage(ReasonCode.INCOMPATIBLE_NETWORK));
else {
BlockQueue chainQueue = this.blockchain.getQueue();
if(!hashRetrievalLocked) {
BigInteger peerTotalDifficulty = new BigInteger(1, msg.getTotalDifficulty());
BigInteger highestKnownTotalDifficulty = chainQueue.getHighestTotalDifficulty();
if (highestKnownTotalDifficulty == null
|| peerTotalDifficulty.compareTo(highestKnownTotalDifficulty) > 0) {
chainQueue.setHighestTotalDifficulty(peerTotalDifficulty);
chainQueue.setBestHash(msg.getBestHash());
}
hashRetrievalLocked = true;
BigInteger peerTotalDifficulty = new BigInteger(1, msg.getTotalDifficulty());
BigInteger highestKnownTotalDifficulty = chainQueue.getHighestTotalDifficulty();
if (highestKnownTotalDifficulty == null
|| peerTotalDifficulty.compareTo(highestKnownTotalDifficulty) > 0) {
hashRetrievalLock = this.peerId;
chainQueue.setHighestTotalDifficulty(peerTotalDifficulty);
chainQueue.setBestHash(msg.getBestHash());
sendGetBlockHashes();
}
} else
startGetBlockTimer();
}
}
@ -178,7 +179,9 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
BlockQueue chainQueue = this.blockchain.getQueue();
// result is empty, peer has no more hashes
if (receivedHashes.isEmpty()) {
// or peer doesn't have the best hash anymore
if (receivedHashes.isEmpty()
|| !this.peerId.equals(hashRetrievalLock)) {
startGetBlockTimer(); // start getting blocks from hash queue
return;
}
@ -250,13 +253,9 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
private void sendGetBlocks() {
BlockQueue queue = this.blockchain.getQueue();
if (queue.size() > CONFIG.maxBlocksQueued()) return;
Block lastBlock = queue.getLastBlock();
if (lastBlock == null) return;
// retrieve list of block hashes from queue
int blocksPerPeer = CONFIG.maxBlocksAsk();
List<byte[]> hashes = queue.getHashes(blocksPerPeer);
List<byte[]> hashes = queue.getHashes();
GetBlocksMessage msg = new GetBlocksMessage(hashes);
msgQueue.sendMessage(msg);

View File

@ -130,13 +130,13 @@ public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
else {
if (msg.getCapabilities().contains("eth")) {
// Activate EthHandler for this peer
ctx.pipeline().addLast(new EthHandler(msg.getPeerId(), peerListener));
ctx.fireChannelActive();
}
InetAddress address = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress();
int port = msg.getListenPort();
byte[] peerId = msg.getPeerId();
Peer confirmedPeer = new Peer(address, port, peerId);
Peer confirmedPeer = new Peer(address, port, msg.getPeerId());
confirmedPeer.setOnline(true);
confirmedPeer.getCapabilities().addAll(msg.getCapabilities());
WorldManager.getInstance().getPeerDiscovery().getPeers().add(confirmedPeer);

View File

@ -31,14 +31,14 @@ public class HelloMessage extends P2pMessage {
/** The port on which the peer is listening for an incoming connection */
private int listenPort;
/** The identity and public key of the peer */
private byte[] peerId;
private String peerId;
public HelloMessage(byte[] encoded) {
super(encoded);
}
public HelloMessage(byte p2pVersion, String clientId,
List<String> capabilities, int listenPort, byte[] peerId) {
List<String> capabilities, int listenPort, String peerId) {
this.p2pVersion = p2pVersion;
this.clientId = clientId;
this.capabilities = capabilities;
@ -69,8 +69,8 @@ public class HelloMessage extends P2pMessage {
byte[] peerPortBytes = ((RLPItem) paramsList.get(4)).getRLPData();
this.listenPort = ByteUtil.byteArrayToInt(peerPortBytes);
this.peerId = ((RLPItem) paramsList.get(5)).getRLPData();
byte[] peerIdBytes = ((RLPItem) paramsList.get(5)).getRLPData();
this.peerId = Hex.toHexString(peerIdBytes);
this.parsed = true;
}
@ -84,7 +84,7 @@ public class HelloMessage extends P2pMessage {
}
byte[] capabilityList = RLP.encodeList(capabilities);
byte[] peerPort = RLP.encodeInt(this.listenPort);
byte[] peerId = RLP.encodeElement(this.peerId);
byte[] peerId = RLP.encodeElement(Hex.decode(this.peerId));
this.encoded = RLP.encodeList(command, p2pVersion, clientId,
capabilityList, peerPort, peerId);
@ -121,7 +121,7 @@ public class HelloMessage extends P2pMessage {
return listenPort;
}
public byte[] getPeerId() {
public String getPeerId() {
if (!parsed) parse();
return peerId;
}
@ -137,6 +137,6 @@ public class HelloMessage extends P2pMessage {
+ this.p2pVersion + " clientId=" + this.clientId
+ " capabilities=[" + Joiner.on(" ").join(this.capabilities)
+ "]" + " peerPort=" + this.listenPort + " peerId="
+ Hex.toHexString(this.peerId) + "]";
+ this.peerId + "]";
}
}

View File

@ -14,6 +14,7 @@ import org.ethereum.util.ByteUtil;
import org.ethereum.util.RLP;
import org.ethereum.util.RLPItem;
import org.ethereum.util.RLPList;
import org.spongycastle.util.encoders.Hex;
/**
* Wrapper around an Ethereum Peers message on the network
@ -49,7 +50,7 @@ public class PeersMessage extends P2pMessage {
try {
int peerPort = ByteUtil.byteArrayToInt(portBytes);
InetAddress address = InetAddress.getByAddress(ipBytes);
Peer peer = new Peer(address, peerPort, peerId);
Peer peer = new Peer(address, peerPort, Hex.toHexString(peerId));
peers.add(peer);
} catch (UnknownHostException e) {
throw new RuntimeException("Malformed ip", e);

View File

@ -33,7 +33,7 @@ public class StaticMessages {
int listenPort = 30303;
return new HelloMessage(p2pVersion, helloAnnouncement,
capabilities, listenPort, Hex.decode(PEER_ID));
capabilities, listenPort, PEER_ID);
}
private static String buildHelloAnnouncement() {

View File

@ -38,7 +38,7 @@ public class HelloMessageTest {
assertEquals(30303, helloMessage.getListenPort());
assertEquals(
"2017B95D5586ADD053E7C5DCC8112DB1D557ED83589C4E0BD25442F39E4111655A487257AA7E4ED309E8B4D55BE5FA8D8D6E97B72C67D76AA03EB69AD981ED60",
Hex.toHexString(helloMessage.getPeerId()).toUpperCase());
helloMessage.getPeerId().toUpperCase());
}
@Test /* HelloMessage 2 from Node */
@ -63,7 +63,7 @@ public class HelloMessageTest {
assertEquals("eth", helloMessage.getCapabilities().get(0));
assertEquals(30303, helloMessage.getListenPort());
assertEquals("cadfb93d2bb5fbe2943584d93ed90e374667c9e8b2502e974693ccc6b3d370bd4cde7738d0b626e3d2f3caecc59e1302d1711bf595711060d7b4921e18b97656",
Hex.toHexString(helloMessage.getPeerId()));
helloMessage.getPeerId());
}
@Test /* HelloMessage 3 from new */
@ -72,12 +72,12 @@ public class HelloMessageTest {
byte p2pVersion = 0x00;
List<String> capabilities = new ArrayList<>(Arrays.asList("eth", "shh"));
int listenPort = 30303;
byte[] peerIdBytes = Hex.decode("CAB0D93EEE1F44EF1286367101F1553450E3DDCE"
String peerId = "CAB0D93EEE1F44EF1286367101F1553450E3DDCE"
+ "EA45ABCAB0AC21E1EFB48A6610EBE88CE7317EB09229558311BA8B7250911D"
+ "7E49562C3988CA3143329DA3EA");
+ "7E49562C3988CA3143329DA3EA";
HelloMessage helloMessage = new HelloMessage(p2pVersion, helloAnnouncement,
capabilities, listenPort, peerIdBytes);
capabilities, listenPort, peerId);
System.out.println(helloMessage);
// rlp encoded hello message
String expected = "F8738080A2457468657265756D284A292F302E362E302F6465762F5"

View File

@ -64,8 +64,8 @@ public class PeersMessageTest {
@Test /* PeersMessage 2 from constructor */
public void testPeers_2() throws UnknownHostException {
Set<Peer> peers = new HashSet<>();
peers.add(new Peer(InetAddress.getByName("82.217.72.169"), 30303, Hex.decode("585764a3c49a3838c69ad0855abfeb5672f71b072af62082b5679961781100814b8de88a8fbc1da7c73791f88159d73b5d2a13a5579535d603e045c3db5cbb75")));
peers.add(new Peer(InetAddress.getByName("192.168.1.193"), 30303, new byte[0]));
peers.add(new Peer(InetAddress.getByName("82.217.72.169"), 30303, "585764a3c49a3838c69ad0855abfeb5672f71b072af62082b5679961781100814b8de88a8fbc1da7c73791f88159d73b5d2a13a5579535d603e045c3db5cbb75"));
peers.add(new Peer(InetAddress.getByName("192.168.1.193"), 30303, ""));
PeersMessage peersMessage = new PeersMessage(peers);
System.out.println(peersMessage.toString());