Peer discovery full support:

1. Thread pool that manage number of workers to get
   the info if a peer is a live.
2. New peers discovered are auto backed by the workers
   to auto check them as well.
3. GUI got auto update from the repository
This commit is contained in:
romanman 2014-05-22 14:43:59 +03:00
parent 0a4272fc49
commit b20b95e4ea
14 changed files with 338 additions and 130 deletions

View File

@ -44,7 +44,6 @@ public class Wallet {
AddressState addressState = new AddressState();
String address = Hex.toHexString(addressState.getEcKey().getAddress());
rows.put(address, addressState);
for (WalletListener listener : listeners) listener.valueChanged();
}
@ -53,7 +52,6 @@ public class Wallet {
AddressState addressState = new AddressState(ECKey.fromPrivate(privKey));
String address = Hex.toHexString(addressState.getEcKey().getAddress());
rows.put(address, addressState);
notifyListeners();
}

View File

@ -69,15 +69,14 @@ public class ConnectionConsoleWindow extends JFrame implements PeerListener{
Thread t = new Thread() {
public void run() {
new ClientPeer(thisConsole).connect("54.201.28.117", 30303); // peer discovery
// new ClientPeer(thisConsole).connect("54.201.28.117", 30303); // peer discovery
// new ClientPeer(thisConsole).connect("82.217.72.169", 30303); // Nick
// new ClientPeer(thisConsole).connect("54.204.10.41", 30303);
// new ClientPeer(thisConsole).connect("54.211.14.10", 30303);
// new ClientPeer(thisConsole).connect("54.204.10.41", 30303); // CPP: ZeroGox Poc5
// new ClientPeer(thisConsole).connect("54.211.14.10", 40404); // CPP: ver16
new ClientPeer(thisConsole).connect("54.211.14.10", 40404); // RomanJ
// new ClientPeer(thisConsole).connect("192.168.1.102", 30303);
}

View File

@ -3,14 +3,14 @@ package org.ethereum.gui;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.*;
import javax.swing.ImageIcon;
import javax.swing.table.AbstractTableModel;
import org.ethereum.geodb.IpGeoDB;
import org.ethereum.manager.MainData;
import org.ethereum.net.client.PeerData;
import org.ethereum.util.Utils;
import com.maxmind.geoip.Location;
@ -22,12 +22,16 @@ import com.maxmind.geoip.Location;
*/
public class PeersTableModel extends AbstractTableModel {
private static final long serialVersionUID = -6984988938009834569L;
private List<PeerInfo> peerInfoList = new ArrayList<PeerInfo>();
Timer updater = new Timer();
public PeersTableModel() {
generateRandomData();
updater.scheduleAtFixedRate(new TimerTask() {
public void run() {
updateModel();
}
}, 0, 1000);
}
public String getColumnName(int column) {
@ -81,38 +85,14 @@ public class PeersTableModel extends AbstractTableModel {
return 3;
}
// todo: delete it when stabilized
private void generateRandomData(){
public void updateModel(){
synchronized (peerInfoList){
List<String> ips = new ArrayList<String>();
ips.add("206.223.168.190");
ips.add("94.210.200.192");
ips.add("88.69.198.198");
ips.add("62.78.198.208");
ips.add("71.202.162.40");
ips.add("78.55.236.218");
ips.add("94.197.120.80");
ips.add("85.65.126.45");
ips.add("110.77.217.185");
ips.add("64.231.9.30");
ips.add("162.243.203.121");
ips.add("82.217.72.169");
ips.add("99.231.80.166");
ips.add("131.104.252.4");
ips.add("54.204.10.41");
ips.add("54.201.28.117");
ips.add("82.240.16.5");
ips.add("74.79.23.119");
for (String peer : ips){
try {
InetAddress addr = InetAddress.getByName(peer);
peerInfoList.clear();
for (PeerData peer : MainData.instance.getPeers()){
InetAddress addr = peer.getInetAddress();
Location cr = IpGeoDB.getLocationForIp(addr);
peerInfoList.add(new PeerInfo(cr, addr));
} catch (UnknownHostException e) {
e.printStackTrace();
peerInfoList.add(new PeerInfo(cr, addr, peer.isOnline()));
}
}
}
@ -123,12 +103,10 @@ public class PeersTableModel extends AbstractTableModel {
InetAddress ip;
boolean connected;
private PeerInfo(Location location, InetAddress ip) {
private PeerInfo(Location location, InetAddress ip, boolean isConnected) {
this.location = location;
this.ip = ip;
Random random = new Random();
connected = random.nextBoolean();
this.connected = isConnected;
}
private InetAddress getIp() {

View File

@ -1,10 +1,8 @@
package org.ethereum.gui;
import java.awt.BorderLayout;
import java.awt.Color;
import java.awt.Font;
import java.awt.Image;
import java.awt.Toolkit;
import java.awt.*;
import java.util.Timer;
import java.util.TimerTask;
import javax.swing.JFrame;
import javax.swing.JPanel;
@ -26,6 +24,8 @@ public class PeersTableWindow extends JFrame{
private JPanel topPanel;
private JTable table;
private JScrollPane scrollPane;
private Timer updater = new Timer();
// Constructor of main frame
public PeersTableWindow() {
@ -71,6 +71,15 @@ public class PeersTableWindow extends JFrame{
// Add the table to a scrolling pane
scrollPane = new JScrollPane(table);
topPanel.add(scrollPane, BorderLayout.CENTER);
updater.scheduleAtFixedRate(new TimerTask() {
public void run() {
table.revalidate();
table.repaint();
System.out.println("repaint");
}
}, 1000, 1000);
}
public static void main(String args[]) {

View File

@ -1,5 +1,7 @@
package org.ethereum.gui;
import org.ethereum.manager.MainData;
import javax.swing.*;
import java.awt.*;
@ -193,6 +195,9 @@ public class ToolBar extends JFrame {
cp.add(peersToggle);
cp.add(chainToggle);
cp.add(walletToggle);
MainData.instance.toString();
}
public static void main(String args[]){

View File

@ -1,18 +1,25 @@
package org.ethereum.manager;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import com.maxmind.geoip.Location;
import org.ethereum.core.Block;
import org.ethereum.core.Genesis;
import org.ethereum.core.Transaction;
import org.ethereum.core.Wallet;
import org.ethereum.crypto.ECKey;
import org.ethereum.crypto.HashUtil;
import org.ethereum.geodb.IpGeoDB;
import org.ethereum.net.client.ClientPeer;
import org.ethereum.net.client.PeerData;
import org.ethereum.net.message.StaticMessages;
import org.ethereum.net.peerdiscovery.PeerDiscovery;
import org.ethereum.net.peerdiscovery.WorkerThread;
import org.ethereum.wallet.AddressState;
import org.spongycastle.util.encoders.Hex;
/**
@ -27,29 +34,29 @@ public class MainData {
private Wallet wallet = new Wallet();
private ClientPeer activePeer;
PeerDiscovery peerDiscovery;
public static MainData instance = new MainData();
public MainData() {
wallet.importKey(HashUtil.sha3("cow".getBytes()));
PeerData peer = new PeerData(
new byte[]{54 , (byte)201, 28, 117}, (short) 30303, new byte[]{00});
peers.add(peer);
byte[] cowAddr = HashUtil.sha3("cow".getBytes());
ECKey key = ECKey.fromPrivate(cowAddr);
wallet.importKey(cowAddr);
AddressState state = wallet.getAddressState(key.getAddress());
state.addToBalance(new BigInteger("1606938044258990275541962092341162602522202993782792835301376"));
wallet.importKey(HashUtil.sha3("cat".getBytes()));
peerDiscovery = new PeerDiscovery(peers);
peerDiscovery.start();
}
public void addPeers(List<PeerData> newPeers){
for (PeerData peer : newPeers){
if (this.peers.indexOf(peer) == -1){
this.peers.add(peer);
}
}
// for (PeerData peerData : this.peers){
// Location location = IpGeoDB.getLocationForIp(peerData.getInetAddress());
// if (location != null)
// System.out.println("Hello: " + " [" + peerData.getInetAddress().toString()
// + "] " + location.countryName);
// }
}
public void addBlocks(List<Block> blocks) {
@ -113,4 +120,33 @@ public class MainData {
public List<PeerData> getPeers() {
return peers;
}
public void updatePeerIsDead(String ip, short port){
for (PeerData peer : peers) {
if (peer.getInetAddress().getHostAddress().equals(ip) && (peer.getPort() == port)){
System.out.println("update peer is dead: " + ip + ":" + port);
peer.setOnline(false);
break;
}
}
}
public void addPeers(List<PeerData> newPeers){
for (PeerData peer : newPeers){
if (this.peers.indexOf(peer) == -1){
Location location = IpGeoDB.getLocationForIp(peer.getInetAddress());
if (location != null){
this.peers.add(peer);
peerDiscovery.addNewPeerData(peer);
}
}
}
}
}

View File

@ -13,7 +13,7 @@ import java.net.UnknownHostException;
public class PeerData {
private byte[] ip;
private short port;
private int port;
private byte[] peerId;
private transient boolean isOnline = false;
@ -21,7 +21,7 @@ public class PeerData {
public PeerData(byte[] ip, short port, byte[] peerId) {
this.ip = ip;
this.port = port;
this.port = port & 0xFFFF;
this.peerId = peerId;
}
@ -40,7 +40,7 @@ public class PeerData {
return ip;
}
public short getPort() {
public int getPort() {
return port;
}

View File

@ -13,7 +13,7 @@ import io.netty.handler.timeout.ReadTimeoutHandler;
import org.ethereum.gui.PeerListener;
import org.ethereum.manager.MainData;
import org.ethereum.net.client.EthereumFrameDecoder;
import org.ethereum.net.client.EthereumPeerTasterHandler;
import org.ethereum.net.peerdiscovery.EthereumPeerTasterHandler;
import org.ethereum.net.client.PeerData;
/**

View File

@ -1,4 +1,4 @@
package org.ethereum.net.client;
package org.ethereum.net.peerdiscovery;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@ -74,8 +74,6 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter {
sendPing(ctx);
}
}, 2000, 5000);
}
@Override
@ -141,8 +139,6 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter {
ctx.close().sync();
ctx.disconnect().sync();
}
}
@ -164,16 +160,8 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter {
ctx.close().sync();
ctx.disconnect().sync();
}
private void sendMsg(Message msg, ChannelHandlerContext ctx){
byte[] data = msg.getPayload();
final ByteBuf buffer = ctx.alloc().buffer(data.length + 8);
byte[] packetLen = ByteUtil.calcPacketLength(data);
buffer.writeBytes(MAGIC_PREFIX);
buffer.writeBytes(packetLen);
ctx.writeAndFlush(buffer);
throw new Error("Peer is dead");
}
private void sendPing(ChannelHandlerContext ctx){
@ -202,30 +190,4 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter {
ctx.writeAndFlush(buffer);
}
private void sendGetTransactions(ChannelHandlerContext ctx){
ByteBuf buffer = ctx.alloc().buffer(StaticMessages.GET_TRANSACTIONS.length);
buffer.writeBytes(StaticMessages.GET_TRANSACTIONS);
ctx.writeAndFlush(buffer);
}
private void sendGetChain(ChannelHandlerContext ctx){
byte[] hash = MainData.instance.getLatestBlockHash();
GetChainMessage chainMessage = new GetChainMessage((byte)100, hash);
ByteBuf buffer = ctx.alloc().buffer(chainMessage.getPayload().length + 8);
buffer.writeBytes(StaticMessages.MAGIC_PACKET);
buffer.writeBytes(ByteUtil.calcPacketSize(chainMessage.getPayload()));
buffer.writeBytes(chainMessage.getPayload());
ctx.writeAndFlush(buffer);
}
private void sendTx(ChannelHandlerContext ctx){
byte[] TX_MSG =
Hex.decode("2240089100000070F86E12F86B80881BC16D674EC8000094CD2A3D9F938E13CD947EC05ABC7FE734DF8DD8268609184E72A00064801BA0C52C114D4F5A3BA904A9B3036E5E118FE0DBB987FE3955DA20F2CD8F6C21AB9CA06BA4C2874299A55AD947DBC98A25EE895AABF6B625C26C435E84BFD70EDF2F69");
ByteBuf buffer = ctx.alloc().buffer(TX_MSG.length);
buffer.writeBytes(TX_MSG);
ctx.writeAndFlush(buffer);
}
}

View File

@ -0,0 +1,100 @@
package org.ethereum.net.peerdiscovery;
import org.ethereum.manager.MainData;
import org.ethereum.net.client.PeerData;
import samples.threadpool.MyMonitorThread;
import samples.threadpool.RejectedExecutionHandlerImpl;
import java.util.List;
import java.util.concurrent.*;
/**
* www.ethereumJ.com
* User: Roman Mandeleil
* Created on: 22/05/2014 09:10
*/
public class PeerDiscovery {
RejectedExecutionHandlerImpl rejectionHandler;
ThreadFactory threadFactory;
ThreadPoolExecutor executorPool;
PeerDiscoveryMonitorThread monitor;
List<PeerData> peers;
public PeerDiscovery(List<PeerData> peers) {
this.peers = peers;
}
public void start(){
//RejectedExecutionHandler implementation
rejectionHandler = new RejectedExecutionHandlerImpl();
//Get the ThreadFactory implementation to use
threadFactory = Executors.defaultThreadFactory();
//creating the ThreadPoolExecutor
executorPool = new ThreadPoolExecutor(1, 5, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);
//start the monitoring thread
monitor = new PeerDiscoveryMonitorThread(executorPool, 3);
Thread monitorThread = new Thread(monitor);
monitorThread.start();
for (PeerData peerData : this.peers) {
executorPool.execute(new WorkerThread(peerData, executorPool));
}
}
public void addNewPeerData(PeerData peerData){
executorPool.execute(new WorkerThread(peerData, executorPool));
}
public void stop(){
executorPool.shutdown();
monitor.shutdown();
}
// todo: this main here for test erase it once upon a time
public static void main(String args[]) throws InterruptedException{
//RejectedExecutionHandler implementation
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
//Get the ThreadFactory implementation to use
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//creating the ThreadPoolExecutor
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);
//start the monitoring thread
PeerDiscoveryMonitorThread monitor = new PeerDiscoveryMonitorThread(executorPool, 3);
Thread monitorThread = new Thread(monitor);
monitorThread.start();
//submit work to the thread pool
PeerData peer = new PeerData(new byte[]{54, (byte)211, 14, 10}, (short) 30303, new byte[]{00});
executorPool.execute(new WorkerThread(peer, executorPool));
PeerData peer2 = new PeerData(new byte[]{54 , (byte)201, 28, 117}, (short) 30303, new byte[]{00});
executorPool.execute(new WorkerThread(peer2, executorPool));
PeerData peer3 = new PeerData(new byte[]{54, (byte)211, 14, 10}, (short) 40404, new byte[]{00});
executorPool.execute(new WorkerThread(peer3, executorPool));
Thread.sleep(30000);
//shut down the pool
executorPool.shutdown();
//shut down the monitor thread
Thread.sleep(5000);
monitor.shutdown();
}
}

View File

@ -0,0 +1,47 @@
package org.ethereum.net.peerdiscovery;
import org.ethereum.manager.MainData;
import java.util.concurrent.ThreadPoolExecutor;
public class PeerDiscoveryMonitorThread implements Runnable
{
private ThreadPoolExecutor executor;
private int seconds;
private boolean run=true;
public PeerDiscoveryMonitorThread(ThreadPoolExecutor executor, int delay)
{
this.executor = executor;
this.seconds=delay;
}
public void shutdown(){
this.run=false;
}
@Override
public void run()
{
while(run){
System.out.println(
String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s, peersDiscovered: %d ",
this.executor.getPoolSize(),
this.executor.getCorePoolSize(),
this.executor.getActiveCount(),
this.executor.getCompletedTaskCount(),
this.executor.getTaskCount(),
this.executor.isShutdown(),
this.executor.isTerminated(),
MainData.instance.getPeers().size()));
try {
Thread.sleep(seconds*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

View File

@ -1,19 +1,18 @@
package org.ethereum.net.client;
package org.ethereum.net.peerdiscovery;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.ethereum.core.Transaction;
import org.ethereum.gui.PeerListener;
import org.ethereum.manager.MainData;
import org.ethereum.net.message.StaticMessages;
import org.ethereum.net.message.TransactionsMessage;
import org.ethereum.util.Utils;
import org.spongycastle.util.encoders.Hex;
import org.ethereum.net.client.EthereumFrameDecoder;
import org.ethereum.net.client.PeerData;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@ -39,6 +38,7 @@ public class PeerTaster {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
@ -83,19 +83,30 @@ public class PeerTaster {
PeerTaster peerTaster = new PeerTaster();
ArrayList<PeerData> peers = new ArrayList<PeerData>();
peers.add(new PeerData(new byte[]{54, (byte)211, 14, 10}, (short) 30303, null));
MainData.instance.addPeers(peers);
try {peerTaster.connect("54.211.14.10", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("82.217.72.169", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("54.201.28.117", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("54.2.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("0.204.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("54.204.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("54.211.14.10", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("82.217.72.169", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("54.201.28.117", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("54.2.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("0.204.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("54.204.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
String ip = "54.211.14.10";
short port = 30303;
try {
peerTaster.connect(ip, port);}
catch (Throwable e) {
e.printStackTrace();
MainData.instance.updatePeerIsDead(ip, port);
}
// try {peerTaster.connect("82.217.72.169", 30303);} catch (Exception e) {e.printStackTrace();}
// try {peerTaster.connect("54.201.28.117", 30303);} catch (Exception e) {e.printStackTrace();}
// try {peerTaster.connect("54.2.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
// try {peerTaster.connect("0.204.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
// try {peerTaster.connect("54.204.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
// try {peerTaster.connect("54.211.14.10", 30303);} catch (Exception e) {e.printStackTrace();}
// try {peerTaster.connect("82.217.72.169", 30303);} catch (Exception e) {e.printStackTrace();}
// try {peerTaster.connect("54.201.28.117", 30303);} catch (Exception e) {e.printStackTrace();}
// try {peerTaster.connect("54.2.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
// try {peerTaster.connect("0.204.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
// try {peerTaster.connect("54.204.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
System.out.println("End of the roaad");

View File

@ -0,0 +1,50 @@
package org.ethereum.net.peerdiscovery;
import org.ethereum.manager.MainData;
import org.ethereum.net.client.PeerData;
import java.util.concurrent.ThreadPoolExecutor;
/**
* www.ethereumJ.com
* User: Roman Mandeleil
* Created on: 22/05/2014 09:26
*/
public class WorkerThread implements Runnable {
ThreadPoolExecutor poolExecutor;
private PeerData peerData;
public WorkerThread(PeerData peerData, ThreadPoolExecutor poolExecutor){
this.poolExecutor = poolExecutor;
this.peerData = peerData;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" Start. Command = "+ peerData.toString());
processCommand();
System.out.println(Thread.currentThread().getName()+" End.");
poolExecutor.execute(this);
}
private void processCommand() {
try {
PeerTaster peerTaster = new PeerTaster();
peerTaster.connect(peerData.getInetAddress().getHostName(), peerData.getPort());
peerData.setOnline(true);
System.out.println("Peer: " + peerData.toString() + " isOnline: true");
}
catch (Throwable e) {
System.out.println("Peer: " + peerData.toString() + " isOnline: false");
peerData.setOnline(false);
}
}
@Override
public String toString(){
return " Worker for: " + this.peerData.toString();
}
}

View File

@ -13,4 +13,17 @@ server.acceptConnections = false
# and read new wire msg if the timeout
# fires the system will be looking for
# another peer. [seconds]
connection.timeout = 10
connection.timeout = 10
# specify if the mechanism
# to discover more and more
# peers and check the already
# discovered peers is on [true/false]
peer.discovery = true
# number of workers that
# tastes the peers for being
# online [1..10]
peer.discover.workers = 5