Skip to content

Commit 5adbe5f

Browse files
authored
Merge pull request #1846 from tronprotocol/solid_restruct
Reconstruct solid node remove sync logic
2 parents d736c3f + 585da38 commit 5adbe5f

File tree

2 files changed

+67
-205
lines changed

2 files changed

+67
-205
lines changed

src/main/java/org/tron/common/overlay/discover/node/NodeManager.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ public class NodeManager implements EventHandler {
8282
private Map<DiscoverListener, ListenerHandler> listeners = new IdentityHashMap<>();
8383

8484
private boolean inited = false;
85-
private Timer logStatsTimer = new Timer();
8685
private Timer nodeManagerTasksTimer = new Timer("NodeManagerTasks");
8786
private ScheduledExecutorService pongTimer;
8887

@@ -103,13 +102,6 @@ public NodeManager(Manager dbManager) {
103102

104103
table = new NodeTable(homeNode);
105104

106-
logStatsTimer.scheduleAtFixedRate(new TimerTask() {
107-
@Override
108-
public void run() {
109-
logger.trace("Statistics:\n {}", dumpAllStatistics());
110-
}
111-
}, 1 * 1000L, 60 * 1000L);
112-
113105
this.pongTimer = Executors.newSingleThreadScheduledExecutor();
114106
}
115107

@@ -318,24 +310,6 @@ public synchronized void addDiscoverListener(DiscoverListener listener,
318310
listeners.put(listener, new ListenerHandler(listener, filter));
319311
}
320312

321-
public synchronized String dumpAllStatistics() {
322-
List<NodeHandler> l = new ArrayList<>(nodeHandlerMap.values());
323-
l.sort(Comparator.comparingInt((NodeHandler o) -> o.getNodeStatistics().getReputation())
324-
.reversed());
325-
326-
StringBuilder sb = new StringBuilder();
327-
int zeroReputCount = 0;
328-
for (NodeHandler nodeHandler : l) {
329-
if (nodeHandler.getNodeStatistics().getReputation() > 0) {
330-
sb.append(nodeHandler).append("\t").append(nodeHandler.getNodeStatistics()).append("\n");
331-
} else {
332-
zeroReputCount++;
333-
}
334-
}
335-
sb.append("0 reputation: ").append(zeroReputCount).append(" nodes.\n");
336-
return sb.toString();
337-
}
338-
339313
public Node getPublicHomeNode() {
340314
return homeNode;
341315
}
@@ -344,7 +318,6 @@ public void close() {
344318
try {
345319
nodeManagerTasksTimer.cancel();
346320
pongTimer.shutdownNow();
347-
logStatsTimer.cancel();
348321
} catch (Exception e) {
349322
logger.warn("close failed.", e);
350323
}

src/main/java/org/tron/program/SolidityNode.java

Lines changed: 67 additions & 178 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package org.tron.program;
22

3+
import static org.tron.core.config.Parameter.ChainConstant.BLOCK_PRODUCED_INTERVAL;
4+
35
import ch.qos.logback.classic.Level;
4-
import com.google.common.collect.Maps;
5-
import java.util.Map;
66
import java.util.concurrent.LinkedBlockingDeque;
77
import java.util.concurrent.atomic.AtomicLong;
88
import lombok.extern.slf4j.Slf4j;
@@ -20,8 +20,6 @@
2020
import org.tron.common.overlay.server.ChannelManager;
2121
import org.tron.core.Constant;
2222
import org.tron.core.capsule.BlockCapsule;
23-
import org.tron.core.capsule.TransactionCapsule;
24-
import org.tron.core.capsule.TransactionInfoCapsule;
2523
import org.tron.core.config.DefaultConfig;
2624
import org.tron.core.config.args.Args;
2725
import org.tron.core.db.Manager;
@@ -34,160 +32,52 @@ public class SolidityNode {
3432

3533
private Manager dbManager;
3634

37-
private Args cfgArgs;
38-
3935
private DatabaseGrpcClient databaseGrpcClient;
4036

4137
private AtomicLong ID = new AtomicLong();
4238

43-
private Map<Long, Block> blockMap = Maps.newConcurrentMap();
44-
45-
private LinkedBlockingDeque<Block> blockQueue = new LinkedBlockingDeque(10000);
46-
47-
private LinkedBlockingDeque<Block> blockBakQueue = new LinkedBlockingDeque(10000);
48-
49-
private AtomicLong remoteLastSolidityBlockNum = new AtomicLong();
50-
51-
private AtomicLong lastSolidityBlockNum = new AtomicLong();
39+
private AtomicLong remoteBlockNum = new AtomicLong();
5240

53-
private int maxBlockCacheSize = 10_000;
41+
private LinkedBlockingDeque<Block> blockQueue = new LinkedBlockingDeque(100);
5442

55-
private volatile boolean syncFlag = true;
43+
private int exceptionSleepTime = 1000;
5644

5745
private volatile boolean flag = true;
5846

59-
public SolidityNode(Manager dbManager, Args cfgArgs) {
47+
public SolidityNode(Manager dbManager) {
6048
this.dbManager = dbManager;
61-
this.cfgArgs = cfgArgs;
6249
resolveCompatibilityIssueIfUsingFullNodeDatabase();
63-
lastSolidityBlockNum.set(dbManager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum());
64-
ID.set(lastSolidityBlockNum.get());
65-
databaseGrpcClient = new DatabaseGrpcClient(cfgArgs.getTrustNodeAddr());
66-
remoteLastSolidityBlockNum.set(getLastSolidityBlockNum());
50+
ID.set(dbManager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum());
51+
databaseGrpcClient = new DatabaseGrpcClient(Args.getInstance().getTrustNodeAddr());
52+
remoteBlockNum.set(getLastSolidityBlockNum());
6753
}
6854

6955
private void start() {
7056
try {
71-
for (int i = 0; i < 50; i++) {
72-
new Thread(() -> getSyncBlock()).start();
73-
}
74-
new Thread(() -> getAdvBlock()).start();
75-
new Thread(() -> pushBlock()).start();
57+
new Thread(() -> getBlock()).start();
7658
new Thread(() -> processBlock()).start();
77-
new Thread(() -> processTrx()).start();
78-
logger.info(
79-
"Success to start solid node, lastSolidityBlockNum: {}, ID: {}, remoteLastSolidityBlockNum: {}.",
80-
lastSolidityBlockNum, ID.get(), remoteLastSolidityBlockNum);
59+
logger.info( "Success to start solid node, ID: {}, remoteBlockNum: {}.", ID.get(), remoteBlockNum);
8160
} catch (Exception e) {
82-
logger.error("Failed to start solid node, address: {}.", cfgArgs.getTrustNodeAddr());
61+
logger.error("Failed to start solid node, address: {}.", Args.getInstance().getTrustNodeAddr());
8362
System.exit(0);
8463
}
8564
}
8665

87-
private void getSyncBlock() {
88-
long blockNum = getNextSyncBlockId();
89-
while (blockNum != 0) {
90-
try {
91-
if (blockMap.size() > maxBlockCacheSize) {
92-
sleep(1000);
93-
continue;
94-
}
95-
Block block = getBlockByNum(blockNum);
96-
blockMap.put(blockNum, block);
97-
logger.info("Success to get sync block: {}.", blockNum);
98-
blockNum = getNextSyncBlockId();
99-
} catch (Exception e) {
100-
logger.error("Failed to get sync block {}, reason: {}", blockNum, e.getMessage());
101-
sleep(1000);
102-
}
103-
}
104-
logger.warn("Get sync block thread {} exit.", Thread.currentThread().getName());
105-
}
106-
107-
synchronized long getNextSyncBlockId() {
108-
109-
if (!syncFlag) {
110-
return 0;
111-
}
112-
113-
if (ID.get() < remoteLastSolidityBlockNum.get()) {
114-
return ID.incrementAndGet();
115-
}
116-
117-
long lastNum = getLastSolidityBlockNum();
118-
if (lastNum - remoteLastSolidityBlockNum.get() > 50) {
119-
remoteLastSolidityBlockNum.set(lastNum);
120-
return ID.incrementAndGet();
121-
}
122-
123-
logger.info("Sync mode switch to adv, ID = {}, lastNum = {}, remoteLastSolidityBlockNum = {}",
124-
ID.get(), lastNum, remoteLastSolidityBlockNum.get());
125-
126-
syncFlag = false;
127-
128-
return 0;
129-
}
130-
131-
private void getAdvBlock() {
132-
while (syncFlag) {
133-
sleep(5000);
134-
}
135-
logger.info("Get adv block thread start.");
66+
private void getBlock() {
13667
long blockNum = ID.incrementAndGet();
13768
while (flag) {
13869
try {
139-
if (blockNum > remoteLastSolidityBlockNum.get() || blockMap.size() > maxBlockCacheSize) {
140-
sleep(3000);
141-
remoteLastSolidityBlockNum.set(getLastSolidityBlockNum());
70+
if (blockNum > remoteBlockNum.get()) {
71+
sleep(BLOCK_PRODUCED_INTERVAL);
72+
remoteBlockNum.set(getLastSolidityBlockNum());
14273
continue;
14374
}
14475
Block block = getBlockByNum(blockNum);
145-
blockMap.put(blockNum, block);
146-
logger.info("Success to get adv block: {}.", blockNum);
147-
blockNum = ID.incrementAndGet();
148-
} catch (Exception e) {
149-
logger.error("Failed to get adv block {}, reason: {}", blockNum, e.getMessage());
150-
sleep(1000);
151-
}
152-
}
153-
}
154-
155-
private Block getBlockByNum(long blockNum) throws Exception {
156-
Block block = databaseGrpcClient.getBlock(blockNum);
157-
if (block.getBlockHeader().getRawData().getNumber() != blockNum) {
158-
logger.warn("Get adv block id not the same , {}, {}.", blockNum,
159-
block.getBlockHeader().getRawData().getNumber());
160-
throw new Exception();
161-
}
162-
return block;
163-
}
164-
165-
private long getLastSolidityBlockNum() {
166-
while (true) {
167-
try {
168-
long blockNum = databaseGrpcClient.getDynamicProperties().getLastSolidityBlockNum();
169-
logger.info("Get last remote solid blockNum: {}, remoteLastSolidityBlockNum: {}.",
170-
blockNum, remoteLastSolidityBlockNum);
171-
return blockNum;
172-
} catch (Exception e) {
173-
logger.error("Failed to get last solid blockNum: {}, reason: {}",
174-
remoteLastSolidityBlockNum.get(), e.getMessage());
175-
sleep(1000);
176-
}
177-
}
178-
}
179-
180-
private void pushBlock() {
181-
while (flag) {
182-
try {
183-
Block block = blockMap.remove(lastSolidityBlockNum.get() + 1);
184-
if (block == null) {
185-
sleep(1000);
186-
continue;
187-
}
18876
blockQueue.put(block);
189-
lastSolidityBlockNum.incrementAndGet();
77+
blockNum = ID.incrementAndGet();
19078
} catch (Exception e) {
79+
logger.error("Failed to get block {}, reason: {}.", blockNum, e.getMessage());
80+
sleep(exceptionSleepTime);
19181
}
19282
}
19383
}
@@ -197,74 +87,61 @@ private void processBlock() {
19787
try {
19888
Block block = blockQueue.take();
19989
loopProcessBlock(block);
200-
blockBakQueue.put(block);
201-
logger.info(
202-
"Success to process block: {}, blockMapSize: {}, blockQueueSize: {}, blockBakQueue: {}.",
203-
block.getBlockHeader().getRawData().getNumber(),
204-
blockMap.size(),
205-
blockQueue.size(),
206-
blockBakQueue.size());
20790
} catch (Exception e) {
20891
logger.error(e.getMessage());
209-
sleep(100);
92+
sleep(exceptionSleepTime);
21093
}
21194
}
21295
}
21396

214-
private void resolveCompatibilityIssueIfUsingFullNodeDatabase() {
215-
long lastSolidityBlockNum = dbManager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum();
216-
long headBlockNum = dbManager.getHeadBlockNum();
217-
logger.info("headBlockNum:{}, solidityBlockNum:{}, diff:{}",
218-
headBlockNum, lastSolidityBlockNum, headBlockNum - lastSolidityBlockNum);
219-
if (lastSolidityBlockNum < headBlockNum) {
220-
logger.info("use fullNode database, headBlockNum:{}, solidityBlockNum:{}, diff:{}",
221-
headBlockNum, lastSolidityBlockNum, headBlockNum - lastSolidityBlockNum);
222-
dbManager.getDynamicPropertiesStore().saveLatestSolidifiedBlockNum(headBlockNum);
223-
}
224-
}
225-
22697
private void loopProcessBlock(Block block) {
227-
while (true) {
98+
while (flag) {
22899
long blockNum = block.getBlockHeader().getRawData().getNumber();
229100
try {
230101
dbManager.pushVerifiedBlock(new BlockCapsule(block));
231102
dbManager.getDynamicPropertiesStore().saveLatestSolidifiedBlockNum(blockNum);
103+
logger.info("Success to process block: {}, blockQueueSize: {}.", blockNum, blockQueue.size());
232104
return;
233105
} catch (Exception e) {
234-
logger.error("Failed to process block {}", new BlockCapsule(block), e);
235-
try {
236-
sleep(100);
237-
block = getBlockByNum(blockNum);
238-
} catch (Exception e1) {
239-
logger.error("Failed to get block: {}, reason: {}", blockNum, e1.getMessage());
240-
}
106+
logger.error("Failed to process block {}.", new BlockCapsule(block), e);
107+
sleep(exceptionSleepTime);
108+
block = getBlockByNum(blockNum);
241109
}
242110
}
243111
}
244112

245-
private void processTrx() {
246-
while (flag) {
113+
private Block getBlockByNum(long blockNum) {
114+
while (true) {
247115
try {
248-
Block block = blockBakQueue.take();
249-
BlockCapsule blockCapsule = new BlockCapsule(block);
250-
for (TransactionCapsule trx : blockCapsule.getTransactions()) {
251-
TransactionInfoCapsule ret;
252-
try {
253-
ret = dbManager.getTransactionHistoryStore().get(trx.getTransactionId().getBytes());
254-
} catch (Exception ex) {
255-
logger.warn("Failed to get trx: {}, reason: {}", trx.getTransactionId(), ex.getMessage());
256-
continue;
257-
}
258-
if (ret == null) {
259-
continue;
260-
}
261-
ret.setBlockNumber(blockCapsule.getNum());
262-
ret.setBlockTimeStamp(blockCapsule.getTimeStamp());
263-
dbManager.getTransactionHistoryStore().put(trx.getTransactionId().getBytes(), ret);
116+
long time = System.currentTimeMillis();
117+
Block block = databaseGrpcClient.getBlock(blockNum);
118+
long num = block.getBlockHeader().getRawData().getNumber();
119+
if (num == blockNum) {
120+
logger.info("Success to get block: {}, cost: {}ms.",
121+
blockNum, System.currentTimeMillis() - time);
122+
return block;
123+
}else {
124+
logger.warn("Get block id not the same , {}, {}.", num, blockNum);
125+
sleep(exceptionSleepTime);
264126
}
127+
}catch (Exception e){
128+
logger.error("Failed to get block: {}, reason: {}.", blockNum, e.getMessage());
129+
sleep(exceptionSleepTime);
130+
}
131+
}
132+
}
133+
134+
private long getLastSolidityBlockNum() {
135+
while (true) {
136+
try {
137+
long time = System.currentTimeMillis();
138+
long blockNum = databaseGrpcClient.getDynamicProperties().getLastSolidityBlockNum();
139+
logger.info("Get last remote solid blockNum: {}, remoteBlockNum: {}, cost: {}.",
140+
blockNum, remoteBlockNum, System.currentTimeMillis() - time);
141+
return blockNum;
265142
} catch (Exception e) {
266-
logger.error(e.getMessage());
267-
sleep(100);
143+
logger.error("Failed to get last solid blockNum: {}, reason: {}.", remoteBlockNum.get(), e.getMessage());
144+
sleep(exceptionSleepTime);
268145
}
269146
}
270147
}
@@ -276,6 +153,18 @@ public void sleep(long time) {
276153
}
277154
}
278155

156+
private void resolveCompatibilityIssueIfUsingFullNodeDatabase() {
157+
long lastSolidityBlockNum = dbManager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum();
158+
long headBlockNum = dbManager.getHeadBlockNum();
159+
logger.info("headBlockNum:{}, solidityBlockNum:{}, diff:{}",
160+
headBlockNum, lastSolidityBlockNum, headBlockNum - lastSolidityBlockNum);
161+
if (lastSolidityBlockNum < headBlockNum) {
162+
logger.info("use fullNode database, headBlockNum:{}, solidityBlockNum:{}, diff:{}",
163+
headBlockNum, lastSolidityBlockNum, headBlockNum - lastSolidityBlockNum);
164+
dbManager.getDynamicPropertiesStore().saveLatestSolidifiedBlockNum(headBlockNum);
165+
}
166+
}
167+
279168
/**
280169
* Start the SolidityNode.
281170
*/
@@ -324,7 +213,7 @@ public static void main(String[] args) {
324213
NodeManager nodeManager = context.getBean(NodeManager.class);
325214
nodeManager.close();
326215

327-
SolidityNode node = new SolidityNode(appT.getDbManager(), cfgArgs);
216+
SolidityNode node = new SolidityNode(appT.getDbManager());
328217
node.start();
329218

330219
rpcApiService.blockUntilShutdown();

0 commit comments

Comments
 (0)