ServerPeer first draft introduced, difficulty calculation bugs.

This commit is contained in:
romanman 2014-11-01 20:49:41 -05:00
parent e436879e3f
commit 04c7a5c032
16 changed files with 322 additions and 26 deletions

View File

@ -4,7 +4,7 @@
<groupId>org.ethereum</groupId> <groupId>org.ethereum</groupId>
<artifactId>ethereumj</artifactId> <artifactId>ethereumj</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<version>0.7.2</version> <version>0.7.6</version>
<name>EthereumJ</name> <name>EthereumJ</name>
<url>http://www.ethereumj.org</url> <url>http://www.ethereumj.org</url>
@ -171,6 +171,13 @@
<version>2.2.0</version> <version>2.2.0</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.0</version>
</dependency>
</dependencies> </dependencies>

View File

@ -0,0 +1,28 @@
import org.ethereum.facade.Ethereum;
import org.ethereum.facade.EthereumImpl;
import org.ethereum.listener.EthereumListenerAdapter;
import org.ethereum.net.server.PeerServer;
/**
*
* @author: Roman Mandeleil
* Created on: 14/10/2014 14:39
*/
public class MainServer extends EthereumListenerAdapter{
Ethereum eth;
public MainServer(Ethereum eth) {
this.eth = eth;
}
public static void main(String[] args) {
Ethereum eth = new EthereumImpl();
PeerServer server = new PeerServer();
server.start(30303);
}
}

View File

@ -170,7 +170,7 @@ public class Block {
if (!parsed) parseRLP(); if (!parsed) parseRLP();
BigInteger calcDifficulty = new BigInteger(1, this.header.getDifficulty()); BigInteger calcDifficulty = new BigInteger(1, this.header.getDifficulty());
for (BlockHeader uncle : uncleList) { for (BlockHeader uncle : uncleList) {
calcDifficulty.add(new BigInteger(1, uncle.getDifficulty())); calcDifficulty = calcDifficulty.add(new BigInteger(1, uncle.getDifficulty()));
} }
return calcDifficulty; return calcDifficulty;
} }

View File

@ -1,5 +1,8 @@
package org.ethereum.core; package org.ethereum.core;
import org.apache.commons.collections4.BidiMap;
import org.apache.commons.collections4.bidimap.DualTreeBidiMap;
import org.ethereum.db.ByteArrayWrapper;
import org.ethereum.facade.Blockchain; import org.ethereum.facade.Blockchain;
import org.ethereum.facade.Repository; import org.ethereum.facade.Repository;
import org.ethereum.listener.EthereumListener; import org.ethereum.listener.EthereumListener;
@ -61,7 +64,7 @@ public class BlockchainImpl implements Blockchain {
// keep the index of the chain for // keep the index of the chain for
// convenient usage, <block_number, block_hash> // convenient usage, <block_number, block_hash>
private final Map<Long, byte[]> blockCache = new HashMap<>(); private final BidiMap<Long, ByteArrayWrapper> blockCache = new DualTreeBidiMap<>();
private final BlockQueue blockQueue = new BlockQueue(); private final BlockQueue blockQueue = new BlockQueue();
private boolean syncDoneCalled = false; private boolean syncDoneCalled = false;
@ -94,6 +97,36 @@ public class BlockchainImpl implements Blockchain {
return repository.getBlock(blockNr); return repository.getBlock(blockNr);
} }
@Override
public Block getBlockByHash(byte[] hash){
Long index = blockCache.inverseBidiMap().get(new ByteArrayWrapper(hash));
if (index == null) return null; // don't have such hash
return repository.getBlock(index);
}
@Override
public List<byte[]> getListOfHashesStartFrom(byte[] hash, int qty){
Long startIndex = blockCache.inverseBidiMap().get(new ByteArrayWrapper( hash ));
if (startIndex == null) return null; // strange but no such hashes in our chain
--startIndex;
Long endIndex = startIndex - qty;
if (endIndex < 0) endIndex = 0L;
Vector<byte[]> result = new Vector<>();
for (Long i = startIndex; i >= endIndex; --i){
ByteArrayWrapper baw = blockCache.get(i);
result.add(baw.getData());
}
return result;
}
@Override @Override
public void add(Block block) { public void add(Block block) {
@ -222,7 +255,7 @@ public class BlockchainImpl implements Blockchain {
} }
this.repository.saveBlock(block); this.repository.saveBlock(block);
this.blockCache.put(block.getNumber(), block.getHash()); this.blockCache.put(block.getNumber(), new ByteArrayWrapper(block.getHash()));
this.setLastBlock(block); this.setLastBlock(block);
if (logger.isDebugEnabled()) if (logger.isDebugEnabled())
@ -460,7 +493,7 @@ public class BlockchainImpl implements Blockchain {
} }
@Override @Override
public Map<Long, byte[]> getBlockCache() { public Map<Long, ByteArrayWrapper> getBlockCache() {
return this.blockCache; return this.blockCache;
} }
@ -489,6 +522,6 @@ public class BlockchainImpl implements Blockchain {
if (this.totalDifficulty == null) if (this.totalDifficulty == null)
this.totalDifficulty = block.getCumulativeDifficulty(); this.totalDifficulty = block.getCumulativeDifficulty();
else else
this.totalDifficulty.add(block.getCumulativeDifficulty()); this.totalDifficulty = totalDifficulty.add(block.getCumulativeDifficulty());
} }
} }

View File

@ -153,7 +153,7 @@ public class RepositoryImpl implements Repository {
for (iterator.seekToFirst(); iterator.hasNext();) { for (iterator.seekToFirst(); iterator.hasNext();) {
Block block = new Block(iterator.next().getValue()); Block block = new Block(iterator.next().getValue());
blockchain.getBlockCache().put(block.getNumber(), block.getHash()); blockchain.getBlockCache().put(block.getNumber(), new ByteArrayWrapper(block.getHash()));
blockchain.setLastBlock(block); blockchain.setLastBlock(block);
blockchain.updateTotalDifficulty(block); blockchain.updateTotalDifficulty(block);
EthereumListener listener = WorldManager.getInstance().getListener(); EthereumListener listener = WorldManager.getInstance().getListener();

View File

@ -1,9 +1,11 @@
package org.ethereum.facade; package org.ethereum.facade;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.ethereum.core.Block; import org.ethereum.core.Block;
import org.ethereum.db.ByteArrayWrapper;
import org.ethereum.net.BlockQueue; import org.ethereum.net.BlockQueue;
import org.ethereum.core.Genesis; import org.ethereum.core.Genesis;
@ -14,7 +16,7 @@ public interface Blockchain {
public int getSize(); public int getSize();
public void add(Block block); public void add(Block block);
public void storeBlock(Block block); public void storeBlock(Block block);
public Map<Long, byte[]> getBlockCache(); public Map<Long, ByteArrayWrapper> getBlockCache();
public Block getBlockByNumber(long blockNr); public Block getBlockByNumber(long blockNr);
public long getGasPrice(); public long getGasPrice();
public void setLastBlock(Block block); public void setLastBlock(Block block);
@ -24,4 +26,7 @@ public interface Blockchain {
public void updateTotalDifficulty(Block block); public void updateTotalDifficulty(Block block);
public BigInteger getTotalDifficulty(); public BigInteger getTotalDifficulty();
public byte[] getLatestBlockHash(); public byte[] getLatestBlockHash();
public List<byte[]> getListOfHashesStartFrom(byte[] hash, int qty);
public Block getBlockByHash(byte[] hash);
} }

View File

@ -6,6 +6,7 @@ import org.ethereum.core.Block;
import org.ethereum.manager.WorldManager; import org.ethereum.manager.WorldManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.spongycastle.util.encoders.Hex;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.*; import java.util.*;
@ -74,7 +75,8 @@ public class BlockQueue {
Block lastReceivedBlock = blockList.get(0); Block lastReceivedBlock = blockList.get(0);
if (lastReceivedBlock.getNumber() != getLastBlock().getNumber() + 1){ if (lastReceivedBlock.getNumber() != getLastBlock().getNumber() + 1){
logger.error("Block download out of sync"); logger.error("Block download out of sync: lastBlock.index: [{}], receivedBlock.index: [{}]",
getLastBlock().getNumber(), lastReceivedBlock.getNumber());
return; return;
} }
@ -94,7 +96,8 @@ public class BlockQueue {
public void addBlock(Block block){ public void addBlock(Block block){
if (block.getNumber() != getLastBlock().getNumber() + 1){ if (block.getNumber() != getLastBlock().getNumber() + 1){
logger.error("Block download out of sync"); logger.error("Block download out of sync: lastBlock.index: [{}], receivedBlock.index: [{}]",
getLastBlock().getNumber(), block.getNumber());
return; return;
} }
@ -143,6 +146,10 @@ public class BlockQueue {
public void addHash(byte[] hash) { public void addHash(byte[] hash) {
blockHashQueue.addLast(hash); blockHashQueue.addLast(hash);
if (logger.isTraceEnabled()){
logger.trace("Adding hash to a hashQueue: [{}]" , Hex.toHexString(hash));
}
} }
public void addNewBlockHash(byte[] hash){ public void addNewBlockHash(byte[] hash){

View File

@ -28,7 +28,7 @@ import static org.ethereum.config.SystemProperties.CONFIG;
*/ */
public class PeerClient { public class PeerClient {
private static final Logger logger = LoggerFactory.getLogger("wire"); private static final Logger logger = LoggerFactory.getLogger("net");
private PeerListener peerListener; private PeerListener peerListener;
private P2pHandler p2pHandler; private P2pHandler p2pHandler;

View File

@ -2,12 +2,14 @@ package org.ethereum.net.eth;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Vector;
import org.ethereum.core.Block; import org.ethereum.core.Block;
import org.ethereum.net.eth.EthMessage;
import org.ethereum.util.RLP; import org.ethereum.util.RLP;
import org.ethereum.util.RLPList; import org.ethereum.util.RLPList;
import static org.ethereum.net.eth.EthMessageCodes.BLOCKS;
/** /**
* Wrapper around an Ethereum Blocks message on the network * Wrapper around an Ethereum Blocks message on the network
* *
@ -21,6 +23,11 @@ public class BlocksMessage extends EthMessage {
super(encoded); super(encoded);
} }
public BlocksMessage(List<Block> blocks){
this.blocks = blocks;
parsed = true;
}
private void parse() { private void parse() {
RLPList paramsList = (RLPList) RLP.decode2(encoded).get(0); RLPList paramsList = (RLPList) RLP.decode2(encoded).get(0);
@ -33,8 +40,25 @@ public class BlocksMessage extends EthMessage {
parsed = true; parsed = true;
} }
private void encode() {
List<byte[]> encodedElements = new Vector<>();
encodedElements.add(RLP.encodeByte(BLOCKS.asByte()));
for (Block block : blocks){
encodedElements.add(block.getEncoded());
}
byte[][] encodedElementArray = encodedElements
.toArray(new byte[encodedElements.size()][]);
this.encoded = RLP.encodeList(encodedElementArray);
}
@Override @Override
public byte[] getEncoded() { public byte[] getEncoded() {
if (encoded == null) encode();
return encoded; return encoded;
} }

View File

@ -103,7 +103,7 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
break; break;
case GET_BLOCK_HASHES: case GET_BLOCK_HASHES:
msgQueue.receivedMessage(msg); msgQueue.receivedMessage(msg);
// sendBlockHashes(); processGetBlockHashes((GetBlockHashesMessage) msg);
break; break;
case BLOCK_HASHES: case BLOCK_HASHES:
msgQueue.receivedMessage(msg); msgQueue.receivedMessage(msg);
@ -111,7 +111,7 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
break; break;
case GET_BLOCKS: case GET_BLOCKS:
msgQueue.receivedMessage(msg); msgQueue.receivedMessage(msg);
// sendBlocks(); processGetBlocks( (GetBlocksMessage) msg );
break; break;
case BLOCKS: case BLOCKS:
msgQueue.receivedMessage(msg); msgQueue.receivedMessage(msg);
@ -182,7 +182,7 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
else { else {
BlockQueue chainQueue = blockchain.getQueue(); BlockQueue chainQueue = blockchain.getQueue();
BigInteger peerTotalDifficulty = new BigInteger(1, msg.getTotalDifficulty()); BigInteger peerTotalDifficulty = new BigInteger(1, msg.getTotalDifficulty());
BigInteger highestKnownTotalDifficulty = chainQueue.getHighestTotalDifficulty(); BigInteger highestKnownTotalDifficulty = blockchain.getTotalDifficulty();
if (highestKnownTotalDifficulty == null if (highestKnownTotalDifficulty == null
|| peerTotalDifficulty.compareTo(highestKnownTotalDifficulty) > 0) { || peerTotalDifficulty.compareTo(highestKnownTotalDifficulty) > 0) {
hashRetrievalLock = this.peerId; hashRetrievalLock = this.peerId;
@ -190,8 +190,10 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
chainQueue.setBestHash(msg.getBestHash()); chainQueue.setBestHash(msg.getBestHash());
syncStatus = SyncSatus.HASH_RETRIEVING; syncStatus = SyncSatus.HASH_RETRIEVING;
sendGetBlockHashes(); sendGetBlockHashes();
} else } else{
startGetBlockTimer(); logger.info(" *** The chain sync process fully complete ***");
syncStatus = SyncSatus.SYNC_DONE;
}
} }
} }
@ -217,6 +219,8 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
chainQueue.addHash(foundHash); // store unknown hashes in queue until known hash is found chainQueue.addHash(foundHash); // store unknown hashes in queue until known hash is found
} }
else { else {
logger.trace("Catch up with the hashes until: {[]}", foundHash);
// if known hash is found, ignore the rest // if known hash is found, ignore the rest
startGetBlockTimer(); // start getting blocks from hash queue startGetBlockTimer(); // start getting blocks from hash queue
return; return;
@ -333,14 +337,31 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
msgQueue.sendMessage(msg); msgQueue.sendMessage(msg);
} }
private void sendBlocks() { private void processGetBlockHashes(GetBlockHashesMessage msg) {
// TODO: Send blocks
Blockchain blockchain = WorldManager.getInstance().getBlockchain();
List<byte[]> hashes = blockchain.getListOfHashesStartFrom(msg.getBestHash(), msg.getMaxBlocks());
BlockHashesMessage msgHashes = new BlockHashesMessage(hashes);
msgQueue.sendMessage(msgHashes);
} }
private void sendBlockHashes() { private void processGetBlocks(GetBlocksMessage msg) {
// TODO: Send block hashes
List<byte[]> hashes = msg.getBlockHashes();
Blockchain blockchain = WorldManager.getInstance().getBlockchain();
Vector<Block> blocks = new Vector<>();
for (byte[] hash : hashes){
Block block = blockchain.getBlockByHash(hash);
blocks.add(block);
}
BlocksMessage bm = new BlocksMessage(blocks);
msgQueue.sendMessage(bm);
} }
private void startTxTimer() { private void startTxTimer() {
getTxTimer.scheduleAtFixedRate(new TimerTask() { getTxTimer.scheduleAtFixedRate(new TimerTask() {
public void run() { public void run() {

View File

@ -140,6 +140,7 @@ public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
logger.error(cause.getCause().toString()); logger.error(cause.getCause().toString());
super.exceptionCaught(ctx, cause); super.exceptionCaught(ctx, cause);
ctx.close(); ctx.close();
killTimers();
} }
private void processPeers(ChannelHandlerContext ctx, PeersMessage peersMessage) { private void processPeers(ChannelHandlerContext ctx, PeersMessage peersMessage) {
@ -273,4 +274,6 @@ public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
} }
} }

View File

@ -0,0 +1,11 @@
package org.ethereum.net.server;
/**
* www.etherj.com
*
* @author: Roman Mandeleil
* Created on: 01/11/2014 17:01
*/
public class Channel {
}

View File

@ -0,0 +1,70 @@
package org.ethereum.net.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.ethereum.net.MessageQueue;
import org.ethereum.net.client.Capability;
import org.ethereum.net.eth.EthHandler;
import org.ethereum.net.p2p.P2pHandler;
import org.ethereum.net.shh.ShhHandler;
import org.ethereum.net.wire.MessageDecoder;
import org.ethereum.net.wire.MessageEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import static org.ethereum.config.SystemProperties.CONFIG;
/**
* www.etherj.com
*
* @author: Roman Mandeleil
* Created on: 01/11/2014 10:58
*/
public class EthereumChannelInitializer extends ChannelInitializer<NioSocketChannel> {
private static final Logger logger = LoggerFactory.getLogger("net");
public void initChannel(NioSocketChannel ch) throws Exception {
MessageQueue msgQueue;
P2pHandler p2pHandler;
EthHandler ethHandler;
ShhHandler shhHandler;
msgQueue = new MessageQueue(null);
logger.info("Incoming connection from: {}", ch.toString());
ch.remoteAddress();
p2pHandler = new P2pHandler(msgQueue, null, false);
p2pHandler.activate();
ethHandler = new EthHandler(msgQueue, null, false);
shhHandler = new ShhHandler(msgQueue, null);
ch.pipeline().addLast("readTimeoutHandler",
new ReadTimeoutHandler(CONFIG.peerChannelReadTimeout(), TimeUnit.SECONDS));
ch.pipeline().addLast("out encoder", new MessageEncoder());
ch.pipeline().addLast("in encoder", new MessageDecoder());
ch.pipeline().addLast(Capability.P2P, p2pHandler);
ch.pipeline().addLast(Capability.ETH, ethHandler);
ch.pipeline().addLast(Capability.SHH, shhHandler);
// limit the size of receiving buffer to 1024
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(32368));
ch.config().setOption(ChannelOption.SO_RCVBUF, 32368);
// todo: check if have or not active peer if not set this one
}
}

View File

@ -0,0 +1,87 @@
package org.ethereum.net.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultMessageSizeEstimator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import org.ethereum.net.PeerListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.ethereum.config.SystemProperties.CONFIG;
/**
* This class establish a listener for incoming connections
* @see <a href="http://netty.io">http://netty.io</a>
*
* www.etherj.com
*
* @author: Roman Mandeleil
* Created on: 01/11/2014 10:11
*/
public class PeerServer {
private static final Logger logger = LoggerFactory.getLogger("net");
private PeerListener peerListener;
private boolean peerDiscoveryMode = false;
public PeerServer() {
}
public PeerServer(PeerListener peerListener) {
this();
this.peerListener = peerListener;
}
public void start(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
if (peerListener != null)
peerListener.console("Listening on port " + port);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, DefaultMessageSizeEstimator.DEFAULT);
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONFIG.peerConnectionTimeout());
b.handler(new LoggingHandler());
b.childHandler(new EthereumChannelInitializer());
// Start the client.
logger.info("Listening for incoming connections, port: [{}] ", port);
ChannelFuture f = b.bind(port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
logger.debug("Connection is closed");
} catch (Exception e) {
logger.debug("Exception: {} ({})", e.getMessage(), e.getClass().getName());
throw new Error("Disconnnected");
} finally {
workerGroup.shutdownGracefully();
}
}
public void setPeerListener(PeerListener peerListener) {
this.peerListener = peerListener;
}
}

View File

@ -25,14 +25,14 @@ log4j.logger.block = ERROR
log4j.logger.blockqueue = TRACE log4j.logger.blockqueue = TRACE
log4j.logger.wallet = ERROR log4j.logger.wallet = ERROR
log4j.logger.general = DEBUG log4j.logger.general = DEBUG
log4j.logger.net = ERROR log4j.logger.net = DEBUG
log4j.logger.db = ERROR log4j.logger.db = ERROR
log4j.logger.peerdiscovery = ERROR log4j.logger.peerdiscovery = ERROR
log4j.logger.peermonitor = ERROR log4j.logger.peermonitor = ERROR
log4j.logger.java.nio = ERROR log4j.logger.java.nio = ERROR
log4j.logger.io.netty = ERROR log4j.logger.io.netty = ERROR
log4j.logger.wire = ERROR log4j.logger.wire = ERROR
log4j.logger.VM = ERROR log4j.logger.VM = TRACE
log4j.logger.main = ERROR log4j.logger.main = ERROR
log4j.logger.trie = ERROR log4j.logger.trie = ERROR
log4j.logger.state = INFO log4j.logger.state = INFO

View File

@ -72,7 +72,7 @@ samples.dir = samples
# the existing database will be # the existing database will be
# destroyed and all the data will be # destroyed and all the data will be
# downloaded from peers again # downloaded from peers again
database.reset = false database.reset = true
# place to save physical storage files # place to save physical storage files
database.dir = database database.dir = database
@ -146,7 +146,7 @@ max.blocks.ask = 100
max.blocks.queued = 300 max.blocks.queued = 300
# project version auto copied during build phase # project version auto copied during build phase
project.version = 0.7.1 project.version = 0.7.6
# hello phrase will be included in # hello phrase will be included in
# the hello message of the peer # the hello message of the peer