只⽤120⾏Java代码写⼀个⾃⼰的区块链-4实现真正的p2p⽹络
在之前的中,我们模拟了节点⽹络通讯,很多朋友反馈说,他们想看真正的节点⽹络通讯⽽不是单节点的模拟。本章将满⾜你们。
我将本章的内容放在了al.p2p包中,⼤家可以在中到我更新的代码。
通过本⽂,你将可以做到:
创建⾃⼰的真实peer-to-peer⽹络
多个节点通过p2p⽹络同步区块内容
在⾃我节点实现RPC通讯,并向区块中写数据
在⾃我节点查看整个区块内容
不含虚拟货币在内的其他区块链知识
学习本章⾄少需要两台可ping通的机器(虚拟机也可以),并且安装了jdk8(如果只有jdk6,可以把代码改为jdk6⽀持的形式)。
基于之前的⽂章,本章将重写p2p部分的内容。我们⾸先要理清思路,提出区块链通讯需要解决的问题。
1.如果我第⼀次作为全节点启动,我需要⼲什么
2.如果我已经启动,别的节点和我通讯,我需要交互哪些信息
3.如果我已经启动,我怎么和⾃⼰的客户端交互
这些问题有⼀个很好的参考对象,就是⽐特币,我们先来看看⽐特币的通讯过程
⼀、⽐特币节点连接建⽴
1.寻⽐特币⽹络中的有效节点,此步骤通常有两种⽅法:
(1)使⽤“DNS种⼦”(DNS seeds),DNS种⼦提供⽐特币节点的IP地址列表,Bitcoin Core客户端提供五种不同的DNS种⼦,通常使⽤默认⽅式。这⾥不展开谈论。
(2)⼿动通过-seednode命令指定⼀个⽐特币节点的IP地址作为⽐特币种⼦节点(为什么叫种⼦,我的理解就是,根据种⼦,得到更多)
这⾥我们使⽤简单的⽅式来处理节点,⽤⼀个⽂件来存储地址列表-peers.list,第⼀次连接的时候更新这个⽂件,获取更多连接时也更新这个⽂件。简单演⽰我们直接把要测试的机器ip和端⼝加进去。
我有两台机,我把端⼝8015⽤作新的p2p⽹络的默认监听端⼝,那么我的peers.list的内容是:
10.16.0.205:8015
10.16.3.77:8015
2.与发现的有效⽐特币节点进⾏初始“握⼿”,建⽴连接
节点发送⼀条包含基本认证内容的version消息开始“握⼿”通信过程,该消息包括如下内容:
nVersion:客户端的⽐特币P2P协议所采⽤的版本(例如:70002)。
nLocalServices:⼀组该节点⽀持的本地服务列表,当前仅⽀持NODE_NETWORK
nTime:当前时间
addrYou:当前节点可见的远程节点的IP地址(上例中NodeB IP)
addrMe:当前节点的IP地址(上例中NodeA IP)
subver:指⽰当前节点运⾏的软件类型的⼦版本号(例如:”/Satoshi:0.9.2.1/”)
BestHeight:当前节点区块链的区块⾼度(初始为0,即只包含创世区块)
我们简化⼀下,这⾥我们最关⼼的就是区块⾼度bestHeight,我们就传递这个好了。区块⾼度就是区块链的长度。
1if ("VERSION".equalsIgnoreCase(cmd)) {
2// 对⽅发来握⼿信息,我⽅发给对⽅区块⾼度和最新区块的hash
3 pt.peerWriter.write("VERACK " + blockChain.size() + " " + (blockChain.size() - 1).getHash());
4 }else if ("VERACK".equalsIgnoreCase(cmd)) {
5// 获取区块⾼度
6 String[] parts = payload.split(" ");
7 bestHeight = Integer.parseInt(parts[0]);
8//哈希暂时不校验
9 }
3.新节点建⽴更多的连接,使节点在⽹络中被更多节点接收,保证连接更稳定
这⾥我们就两台机,如果你有更多机器,可以实现⼀下这个通讯,本章我们简单实现⼀下。
1if ("ADDR".equalsIgnoreCase(cmd)) {
2// 对⽅发来地址,建⽴连接并保存
3if (!ains(payload)) {
4 String peerAddr = payload.substring(0, payload.indexOf(":"));
5int peerPort = Integer.parseInt(payload.substring(payload.indexOf(":") + 1));
6 t(peerAddr, peerPort);
7 peers.add(payload);
8 PrintWriter out = new PrintWriter(peerFile);
9for (int k = 0; k < peers.size(); k++) {
10 out.(k));
11 }
12 out.close();
13 }
14 } else if ("GET_ADDR".equalsIgnoreCase(cmd)) {
15//对⽅请求更多peer地址,随机给⼀个
16 Random random = new Random();
17 pt.peerWriter.write("ADDR " + (Int(peers.size())));
18 }
4.交换“区块清单”(注:该步骤仅在全节点上会执⾏,且从与节点建⽴连接就开始进⾏)本系列内容只使⽤全节点。
全节点
全节点沿着区块链按时间倒叙⼀直追溯到创世区块,建⽴⼀个完整的UTXO数据库,通过查询UTXO是否未被⽀付来验证交易的有效性。
SPV节点
SPV节点通过向其他节点请求某笔交易的Merkle路径(Merkle树我可能会在后续章节讲到),如果路径正确⽆误,并且该交易之上已有6个或以上区块被确认,则证明该交易不是双重⽀付。
全节点在连接到其他节点后,需要构建完整的区块链,如果是新节点,它仅包含静态植⼊客户端中的0号区块(创世区块)。注意了,创世区块是静态的(硬编码)。
如前⽂所⾔,我们在区块链中取最长的链,区块⾼度⽐我⾼,我就向对⽅获取区块。
1else if ("BLOCK".equalsIgnoreCase(cmd)) {
2//把对⽅给的块存进链中
3 LOGGER.info("Attempting to ");
4 LOGGER.info("Block: " + payload);
5 Block newBlock = gson.fromJson(payload, Block.class);
6if (!ains(newBlock)) {
7// 校验区块,如果成功,将其写⼊本地区块链
8if (BlockUtils.isBlockValid(newBlock, (blockChain.size() - 1))) {
9if (blockChain.add(newBlock) && !catchupMode) {
10 LOGGER.info("Added block " + Index() + " with hash: ["+ Hash() + "]");
11 peerNetwork.broadcast("BLOCK " + payload);
12 }
13 }
14 }
15 } else if ("GET_BLOCK".equalsIgnoreCase(cmd)) {
16//把对⽅请求的块给对⽅
17 LOGGER.info("Sending block[" + payload + "] to peer");
18 Block block = (Integer.parseInt(payload));
19if (block != null) {
20 LOGGER.info("Sending block " + payload + " to peer");
21 pt.peerWriter.write("BLOCK " + Json(block));
22 }
23 }
到这⾥,我们基本上完成了p2p⽹络中关于区块的通讯。
其实⽐特币等虚拟货币中还有很多通讯,关于交易的,这⾥我们不需要,不做讨论。
让我们开始编码吧!
整合上⽂提到的所有通讯,Node.java的代码如下
1private static final Logger LOGGER = Logger(Node.class);
2
3/** 本地区块链 */
4private static List<Block> blockChain = new LinkedList<Block>();
5
6public static void main(String[] args) throws IOException, InterruptedException {
7int port = 8015;
8 LOGGER.info("Starting ");
9 PeerNetwork peerNetwork = new PeerNetwork(port);
10 peerNetwork.start();
11 LOGGER.info("[ Node is Started in port:"+port+" ]"); 17 ArrayList<String> peers = new ArrayList<>();
18 File peerFile = new File("peers.list");
19if (!ists()) {
20 String host = LocalHost().toString();
21 FileUtils.writeStringToFile(peerFile, host+":"+port);
22 }
23for (Object peer : adLines(peerFile)) {
24 String[] addr = String().split(":");
25 t(addr[0], Integer.parseInt(addr[1]));
26 }
27 TimeUnit.SECONDS.sleep(2);
28
29 peerNetwork.broadcast("VERSION");
30
31// hard code genesisBlock
32 Block genesisBlock = new Block();
33 genesisBlock.setIndex(0);
34 genesisBlock.setTimestamp("2017-07-13 22:32:00");//my son's birthday
35 genesisBlock.setVac(0);
36 genesisBlock.setPrevHash("");
37 genesisBlock.setHash(BlockUtils.calculateHash(genesisBlock));
38 blockChain.add(genesisBlock);
39
40final Gson gson = new GsonBuilder().create();
41 LOGGER.Json(blockChain));
42int bestHeight = 0;
43boolean catchupMode = true;
44
45/**
46 * p2p 通讯
47*/
48while (true) {
49//对新连接过的peer写⼊⽂件,下次启动直接连接
50for (String peer : peerNetwork.peers) {
51if (!ains(peer)) {
52 peers.add(peer);
53 FileUtils.writeStringToFile(peerFile, peer);
54 }
55 }
56 peerNetwork.peers.clear();
57
58// 处理通讯
59for (PeerThread pt : peerNetwork.peerThreads) {
60if (pt == null || pt.peerReader == null) {
61break;
62 }
63 List<String> dataList = adData();
64if (dataList == null) {
65 LOGGER.info("Null ret retry.");
66 it(-5);
67break;
68 }
69
70for (String data:dataList) {
71 LOGGER.info("Got data: " + data);
72int flag = data.indexOf(' ');
73 String cmd = flag >= 0 ? data.substring(0, flag) : data;
74 String payload = flag >= 0 ? data.substring(flag + 1) : "";
75if (StringUtils.isNotBlank(cmd)) {
76if ("VERSION".equalsIgnoreCase(cmd)) {
77// 对⽅发来握⼿信息,我⽅发给对⽅区块⾼度和最新区块的hash
78 pt.peerWriter.write("VERACK " + blockChain.size() + " " + (blockChain.size() - 1).getHash());
79 }else if ("VERACK".equalsIgnoreCase(cmd)) {
80// 获取区块⾼度
81 String[] parts = payload.split(" ");
82 bestHeight = Integer.parseInt(parts[0]);
83//哈希暂时不校验
84 } else if ("GET_BLOCK".equalsIgnoreCase(cmd)) {
85//把对⽅请求的块给对⽅
86 LOGGER.info("Sending block[" + payload + "] to peer");
87 Block block = (Integer.parseInt(payload));
88if (block != null) {
89 LOGGER.info("Sending block " + payload + " to peer");
90 pt.peerWriter.write("BLOCK " + Json(block));
91 }
92 } else if ("BLOCK".equalsIgnoreCase(cmd)) {
93//把对⽅给的块存进链中
94 LOGGER.info("Attempting to ");
95 LOGGER.info("Block: " + payload);
96 Block newBlock = gson.fromJson(payload, Block.class);
97if (!ains(newBlock)) {
98// 校验区块,如果成功,将其写⼊本地区块链
99if (BlockUtils.isBlockValid(newBlock, (blockChain.size() - 1))) {
100if (blockChain.add(newBlock) && !catchupMode) {
101 LOGGER.info("Added block " + Index() + " with hash: ["+ Hash() + "]");
中文写代码软件102 peerNetwork.broadcast("BLOCK " + payload);
103 }
104 }
105 }
106 }else if ("GET_ADDR".equalsIgnoreCase(cmd)) {
107//对⽅请求更多peer地址,随机给⼀个
108 Random random = new Random();
109 pt.peerWriter.write("ADDR " + (Int(peers.size())));
110 } else if ("ADDR".equalsIgnoreCase(cmd)) {
111// 对⽅发来地址,建⽴连接并保存
112if (!ains(payload)) {
113 String peerAddr = payload.substring(0, payload.indexOf(":"));
114int peerPort = Integer.parseInt(payload.substring(payload.indexOf(":") + 1));
115 t(peerAddr, peerPort);
116 peers.add(payload);
117 PrintWriter out = new PrintWriter(peerFile);
118for (int k = 0; k < peers.size(); k++) {
119 out.(k));
120 }
121 out.close();
122 }
123 }
124 }
125 }
126 }
127
128// ********************************
129// ⽐较区块⾼度,同步区块
130// ********************************
131
132int localHeight = blockChain.size();
133
134if (bestHeight > localHeight) {
135 catchupMode = true;
136 LOGGER.info("Local chain height: " + localHeight);
137 LOGGER.info("Best chain Height: " + bestHeight);
138 TimeUnit.MILLISECONDS.sleep(300);
139
140for (int i = localHeight; i < bestHeight; i++) {
141 LOGGER.info("请求块 " + i + "...");
142 peerNetwork.broadcast("GET_BLOCK " + i);
143 }
144 } else {
145if (catchupMode) {
146 LOGGER.info("[p2p] - Caught up with network.");
147 }
148 catchupMode = false;
149 }
150
151
152
153// ****************
154// 循环结束
155// ****************
156 TimeUnit.MILLISECONDS.sleep(200);
157 }
158 }
PeerNetwork简单封装了⼀下p2p通讯的细节,篇幅有限我这⾥只列出核⼼交互,具体实现可以去看我的中的类:PeerThread、PeerReader、PeerWriter。
RPC
接下来,我们将讨论关于本地节点客户端的概念。在⽐特币中,除了bitcoin-core,还有bitcoin-cli,这是做什么的呢。
它其实是⽤来做本地节点交互的,⽐如我作为本地节点,我需要发起交易,需要查看我的资产等等,后来发展出gui界⾯,就是⼤家俗称的钱包。
在本章中,我们也需要这样⼀个客户端通讯,⽤来将我们的vac写⼊链中(之前的⽂章,我们是⽤控制台输⼊的,实际的做法是提供加密的rpc调⽤)我们接下来实现RPC服务⾸先我们要在Node.java中加⼊通讯逻辑
LOGGER.info("Starting ");
RpcServer rpcAgent = new RpcServer(port+1);
rpcAgent.start();
LOGGER.info("[ RPC agent is Started in port:"+(port+1)+" ]");
for循环体中增加
/**
* 处理RPC服务
*/
for (RpcThread th:rpcAgent.rpcThreads) {
String request = th.req;
if (request != null) {
String[] parts = request.split(" ");
parts[0] = parts[0].toLowerCase();
if ("getinfo".equals(parts[0])) {
String res = Json(blockChain);
} else if ("send".equals(parts[0])) {
try {
int vac = Integer.parseInt(parts[1]);
// 根据vac创建区块
Block newBlock = (blockChain.size() - 1), vac);
if (BlockUtils.isBlockValid(newBlock, (blockChain.size() - 1))) {
blockChain.add(newBlock);
peerNetwork.broadcast("BLOCK " + Json(newBlock));
} else {
}
} catch (Exception e) {
<("invalid vac", e);
}
} else {
}
}
}
独⽴线程处理
RpcThread.java
package al.p2p;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.Socket;
import urrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 处理单个rpc连接
* @author Mignet
*/
public class RpcThread extends Thread {
private static final Logger LOGGER = Logger(RpcThread.class);
private Socket socket;
String res;
String req;
/**
* 默认构造函数
* @param socket
*/
public RpcThread(Socket socket){
this.socket = socket;
}
@Override
public void run(){
try{
req = null;
res = null;
PrintWriter out = new OutputStream(), true);
BufferedReader in = new BufferedReader(new InputStream()));
String input;
out.println("================Welcome RPC Daemon==============");
while((input = in.readLine()) != null){
if ("HELP".equalsIgnoreCase(input)){
out.println(">>>>>>>>># COMMANDS >>>>>>>>>##"); out.println("# 1) getinfo - Gets block chain infomations. #");
out.println("# 2) send <vac> - Write <vac> to blockChain #");
out.println(">>>>>>>>>>>>>>>>>>>>###"); } else {
req = input;
while (res == null){
TimeUnit.MILLISECONDS.sleep(25);
}
out.println(res);
req = null;
res = null;
}
}
} catch (Exception e){
LOGGER.info("An RPC client has disconnected.",e);
}
}
}
RpcServer.java
package al.p2p;
import java.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* RPC服务
*
* 注意:不要把这个端⼝开放给外⽹
* @author Mignet
*/
public class RpcServer extends Thread
{
private static final Logger LOGGER = Logger(RpcServer.class);
private int port;
private boolean runFlag = true;
List<RpcThread> rpcThreads;
/
**
* 默认配置
*/
public RpcServer()
{
this.port = 8016;
this.rpcThreads = new ArrayList<>();
}
/**
* 指定端⼝
* @param port Port to listen on
*/
public RpcServer(int port)
{
this.port = port;
this.rpcThreads = new ArrayList<>();
}
@Override
public void run()
{
try
{
ServerSocket socket = new ServerSocket(port);
while (runFlag)
{
RpcThread thread = new RpcThread(socket.accept());
rpcThreads.add(thread);
thread.start();
}
socket.close();
} catch (Exception e){
<("rpc error in port:" + port,e);
}
}
}
跑起来
1.使⽤mvn的install命令打包
2.新建peers.list,把要组建⽹络的ip地址填⼊去,在本机执⾏jar命令,启动第⼀个节点。注意,这时候它会尝试连接别的节点,连接不上
3.把jar包和peers.list上传到其他机器,启动第⼆个节点
4.我们⽤cmd在本机打开新的窗⼝,执⾏nc 127.0.0.1 8016,连接到本机节点的rpc服务,输⼊help查看⽀持的命令:
5.节点1(本机)增加⼀个区块:在rpc命令中,我们实现了1,查看区块链;2,写⼊vac数据,来验证⼀下
我们先输⼊getinfo查看⼀下,然后send 88,看到写⼊成功了,再输⼊getinfo,果然看到了新的块在链中。
我们也可以看到节点控制台的输出中关于新增区块的信息
6.验证是不是同步了:我们看⼀下另⼀台机器
我们在这台机器上也使⽤nc localhost 8016来连接看看区块
7.我们再从这个结点写⼊⼀个块(send 666)
看看本机接收到了没
很完美。
到此基本演⽰了区块链通讯真实的样⼦。当然,这⾥有很多可以改进的地⽅,⽐如安全性,⽐如命令的模式,⽐如不⽤sleep⽽是⽤Future,⽐如使⽤netty等更⾼效更成熟的通讯框架。如果想利⽤区块链来发⾏,那么在此基础上,还要有公私钥签名交易,交易通讯,校验,使⽤共识算法来选举及奖励货币等。
还有什么区块链知识是本系列没有提到的吗?有的,使⽤默克尔树来快速验证区块和整个链.
关于币的问题也可以问,⽐如UTXO模型可以讲⼀讲吗?如果有问题请⼤家留⾔
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
基于matlab的隐写术代码
« 上一篇
保定亦语写亦商贸有限公司介绍企业发展分析报告模板
下一篇 »
发表评论