1
1
package org .tron .program ;
2
2
3
+ import static org .tron .core .config .Parameter .ChainConstant .BLOCK_PRODUCED_INTERVAL ;
4
+
3
5
import ch .qos .logback .classic .Level ;
4
- import com .google .common .collect .Maps ;
5
- import java .util .Map ;
6
6
import java .util .concurrent .LinkedBlockingDeque ;
7
7
import java .util .concurrent .atomic .AtomicLong ;
8
8
import lombok .extern .slf4j .Slf4j ;
20
20
import org .tron .common .overlay .server .ChannelManager ;
21
21
import org .tron .core .Constant ;
22
22
import org .tron .core .capsule .BlockCapsule ;
23
- import org .tron .core .capsule .TransactionCapsule ;
24
- import org .tron .core .capsule .TransactionInfoCapsule ;
25
23
import org .tron .core .config .DefaultConfig ;
26
24
import org .tron .core .config .args .Args ;
27
25
import org .tron .core .db .Manager ;
@@ -34,161 +32,52 @@ public class SolidityNode {
34
32
35
33
private Manager dbManager ;
36
34
37
- private Args cfgArgs = Args .getInstance ();
38
-
39
35
private DatabaseGrpcClient databaseGrpcClient ;
40
36
41
37
private AtomicLong ID = new AtomicLong ();
42
38
43
- private Map <Long , Block > blockMap = Maps .newConcurrentMap ();
44
-
45
- private int syncThreadNum = cfgArgs .getSolidSyncThreadNum ();
46
-
47
- private int blockCacheSize = cfgArgs .getSolidBlockCacheSize ();
48
-
49
- private LinkedBlockingDeque <Block > blockQueue = new LinkedBlockingDeque (blockCacheSize );
50
-
51
- private LinkedBlockingDeque <Block > blockBakQueue = new LinkedBlockingDeque (blockCacheSize );
39
+ private AtomicLong remoteBlockNum = new AtomicLong ();
52
40
53
- private AtomicLong remoteLastSolidityBlockNum = new AtomicLong ( );
41
+ private LinkedBlockingDeque < Block > blockQueue = new LinkedBlockingDeque ( 100 );
54
42
55
- private AtomicLong lastSolidityBlockNum = new AtomicLong ();
56
-
57
- private volatile boolean syncFlag = true ;
43
+ private int exceptionSleepTime = 1000 ;
58
44
59
45
private volatile boolean flag = true ;
60
46
61
47
public SolidityNode (Manager dbManager ) {
62
48
this .dbManager = dbManager ;
63
49
resolveCompatibilityIssueIfUsingFullNodeDatabase ();
64
- lastSolidityBlockNum .set (dbManager .getDynamicPropertiesStore ().getLatestSolidifiedBlockNum ());
65
- ID .set (lastSolidityBlockNum .get ());
66
- databaseGrpcClient = new DatabaseGrpcClient (cfgArgs .getTrustNodeAddr ());
67
- remoteLastSolidityBlockNum .set (getLastSolidityBlockNum ());
50
+ ID .set (dbManager .getDynamicPropertiesStore ().getLatestSolidifiedBlockNum ());
51
+ databaseGrpcClient = new DatabaseGrpcClient (Args .getInstance ().getTrustNodeAddr ());
52
+ remoteBlockNum .set (getLastSolidityBlockNum ());
68
53
}
69
54
70
55
private void start () {
71
56
try {
72
- for (int i = 0 ; i < syncThreadNum ; i ++) {
73
- new Thread (() -> getSyncBlock ()).start ();
74
- }
75
- new Thread (() -> getAdvBlock ()).start ();
76
- new Thread (() -> pushBlock ()).start ();
57
+ new Thread (() -> getBlock ()).start ();
77
58
new Thread (() -> processBlock ()).start ();
78
- new Thread (() -> processTrx ()).start ();
79
- logger .info ( "Success to start solid node, lastSolidityBlockNum: {}, ID: {}, "
80
- + "remoteLastSolidityBlockNum: {}, syncThreadNum: {}, blockCacheSize: {}." ,
81
- lastSolidityBlockNum , ID .get (), remoteLastSolidityBlockNum , syncThreadNum , blockCacheSize );
59
+ logger .info ( "Success to start solid node, ID: {}, remoteBlockNum: {}." , ID .get (), remoteBlockNum );
82
60
} catch (Exception e ) {
83
- logger .error ("Failed to start solid node, address: {}." , cfgArgs .getTrustNodeAddr ());
61
+ logger .error ("Failed to start solid node, address: {}." , Args . getInstance () .getTrustNodeAddr ());
84
62
System .exit (0 );
85
63
}
86
64
}
87
65
88
- private void getSyncBlock () {
89
- long blockNum = getNextSyncBlockId ();
90
- while (blockNum != 0 ) {
91
- try {
92
- if (blockMap .size () > blockCacheSize ) {
93
- sleep (1000 );
94
- continue ;
95
- }
96
- Block block = getBlockByNum (blockNum );
97
- blockMap .put (blockNum , block );
98
- logger .info ("Success to get sync block: {}." , blockNum );
99
- blockNum = getNextSyncBlockId ();
100
- } catch (Exception e ) {
101
- logger .error ("Failed to get sync block {}, reason: {}" , blockNum , e .getMessage ());
102
- sleep (1000 );
103
- }
104
- }
105
- logger .warn ("Get sync block thread {} exit." , Thread .currentThread ().getName ());
106
- }
107
-
108
- synchronized long getNextSyncBlockId () {
109
-
110
- if (!syncFlag ) {
111
- return 0 ;
112
- }
113
-
114
- if (ID .get () < remoteLastSolidityBlockNum .get ()) {
115
- return ID .incrementAndGet ();
116
- }
117
-
118
- long lastNum = getLastSolidityBlockNum ();
119
- if (lastNum - remoteLastSolidityBlockNum .get () > 50 ) {
120
- remoteLastSolidityBlockNum .set (lastNum );
121
- return ID .incrementAndGet ();
122
- }
123
-
124
- logger .info ("Sync mode switch to adv, ID = {}, lastNum = {}, remoteLastSolidityBlockNum = {}" ,
125
- ID .get (), lastNum , remoteLastSolidityBlockNum .get ());
126
-
127
- syncFlag = false ;
128
-
129
- return 0 ;
130
- }
131
-
132
- private void getAdvBlock () {
133
- while (syncFlag ) {
134
- sleep (5000 );
135
- }
136
- logger .info ("Get adv block thread start." );
66
+ private void getBlock () {
137
67
long blockNum = ID .incrementAndGet ();
138
68
while (flag ) {
139
69
try {
140
- if (blockNum > remoteLastSolidityBlockNum .get () || blockMap . size () > blockCacheSize ) {
141
- sleep (3000 );
142
- remoteLastSolidityBlockNum .set (getLastSolidityBlockNum ());
70
+ if (blockNum > remoteBlockNum .get ()) {
71
+ sleep (BLOCK_PRODUCED_INTERVAL );
72
+ remoteBlockNum .set (getLastSolidityBlockNum ());
143
73
continue ;
144
74
}
145
75
Block block = getBlockByNum (blockNum );
146
- blockMap .put (blockNum , block );
147
- logger .info ("Success to get adv block: {}." , blockNum );
148
- blockNum = ID .incrementAndGet ();
149
- } catch (Exception e ) {
150
- logger .error ("Failed to get adv block {}, reason: {}" , blockNum , e .getMessage ());
151
- sleep (1000 );
152
- }
153
- }
154
- }
155
-
156
- private Block getBlockByNum (long blockNum ) throws Exception {
157
- Block block = databaseGrpcClient .getBlock (blockNum );
158
- if (block .getBlockHeader ().getRawData ().getNumber () != blockNum ) {
159
- logger .warn ("Get adv block id not the same , {}, {}." , blockNum ,
160
- block .getBlockHeader ().getRawData ().getNumber ());
161
- throw new Exception ();
162
- }
163
- return block ;
164
- }
165
-
166
- private long getLastSolidityBlockNum () {
167
- while (true ) {
168
- try {
169
- long blockNum = databaseGrpcClient .getDynamicProperties ().getLastSolidityBlockNum ();
170
- logger .info ("Get last remote solid blockNum: {}, remoteLastSolidityBlockNum: {}." ,
171
- blockNum , remoteLastSolidityBlockNum );
172
- return blockNum ;
173
- } catch (Exception e ) {
174
- logger .error ("Failed to get last solid blockNum: {}, reason: {}" ,
175
- remoteLastSolidityBlockNum .get (), e .getMessage ());
176
- sleep (1000 );
177
- }
178
- }
179
- }
180
-
181
- private void pushBlock () {
182
- while (flag ) {
183
- try {
184
- Block block = blockMap .remove (lastSolidityBlockNum .get () + 1 );
185
- if (block == null ) {
186
- sleep (1000 );
187
- continue ;
188
- }
189
76
blockQueue .put (block );
190
- lastSolidityBlockNum .incrementAndGet ();
77
+ blockNum = ID .incrementAndGet ();
191
78
} catch (Exception e ) {
79
+ logger .error ("Failed to get block {}, reason: {}." , blockNum , e .getMessage ());
80
+ sleep (exceptionSleepTime );
192
81
}
193
82
}
194
83
}
@@ -198,74 +87,61 @@ private void processBlock() {
198
87
try {
199
88
Block block = blockQueue .take ();
200
89
loopProcessBlock (block );
201
- blockBakQueue .put (block );
202
- logger .info (
203
- "Success to process block: {}, blockMapSize: {}, blockQueueSize: {}, blockBakQueue: {}." ,
204
- block .getBlockHeader ().getRawData ().getNumber (),
205
- blockMap .size (),
206
- blockQueue .size (),
207
- blockBakQueue .size ());
208
90
} catch (Exception e ) {
209
91
logger .error (e .getMessage ());
210
- sleep (100 );
92
+ sleep (exceptionSleepTime );
211
93
}
212
94
}
213
95
}
214
96
215
- private void resolveCompatibilityIssueIfUsingFullNodeDatabase () {
216
- long lastSolidityBlockNum = dbManager .getDynamicPropertiesStore ().getLatestSolidifiedBlockNum ();
217
- long headBlockNum = dbManager .getHeadBlockNum ();
218
- logger .info ("headBlockNum:{}, solidityBlockNum:{}, diff:{}" ,
219
- headBlockNum , lastSolidityBlockNum , headBlockNum - lastSolidityBlockNum );
220
- if (lastSolidityBlockNum < headBlockNum ) {
221
- logger .info ("use fullNode database, headBlockNum:{}, solidityBlockNum:{}, diff:{}" ,
222
- headBlockNum , lastSolidityBlockNum , headBlockNum - lastSolidityBlockNum );
223
- dbManager .getDynamicPropertiesStore ().saveLatestSolidifiedBlockNum (headBlockNum );
224
- }
225
- }
226
-
227
97
private void loopProcessBlock (Block block ) {
228
- while (true ) {
98
+ while (flag ) {
229
99
long blockNum = block .getBlockHeader ().getRawData ().getNumber ();
230
100
try {
231
101
dbManager .pushVerifiedBlock (new BlockCapsule (block ));
232
102
dbManager .getDynamicPropertiesStore ().saveLatestSolidifiedBlockNum (blockNum );
103
+ logger .info ("Success to process block: {}, blockQueueSize: {}." , blockNum , blockQueue .size ());
233
104
return ;
234
105
} catch (Exception e ) {
235
- logger .error ("Failed to process block {}" , new BlockCapsule (block ), e );
236
- try {
237
- sleep (100 );
238
- block = getBlockByNum (blockNum );
239
- } catch (Exception e1 ) {
240
- logger .error ("Failed to get block: {}, reason: {}" , blockNum , e1 .getMessage ());
241
- }
106
+ logger .error ("Failed to process block {}." , new BlockCapsule (block ), e );
107
+ sleep (exceptionSleepTime );
108
+ block = getBlockByNum (blockNum );
242
109
}
243
110
}
244
111
}
245
112
246
- private void processTrx ( ) {
247
- while (flag ) {
113
+ private Block getBlockByNum ( long blockNum ) {
114
+ while (true ) {
248
115
try {
249
- Block block = blockBakQueue .take ();
250
- BlockCapsule blockCapsule = new BlockCapsule (block );
251
- for (TransactionCapsule trx : blockCapsule .getTransactions ()) {
252
- TransactionInfoCapsule ret ;
253
- try {
254
- ret = dbManager .getTransactionHistoryStore ().get (trx .getTransactionId ().getBytes ());
255
- } catch (Exception ex ) {
256
- logger .warn ("Failed to get trx: {}, reason: {}" , trx .getTransactionId (), ex .getMessage ());
257
- continue ;
258
- }
259
- if (ret == null ) {
260
- continue ;
261
- }
262
- ret .setBlockNumber (blockCapsule .getNum ());
263
- ret .setBlockTimeStamp (blockCapsule .getTimeStamp ());
264
- dbManager .getTransactionHistoryStore ().put (trx .getTransactionId ().getBytes (), ret );
116
+ long time = System .currentTimeMillis ();
117
+ Block block = databaseGrpcClient .getBlock (blockNum );
118
+ long num = block .getBlockHeader ().getRawData ().getNumber ();
119
+ if (num == blockNum ) {
120
+ logger .info ("Success to get block: {}, cost: {}ms." ,
121
+ blockNum , System .currentTimeMillis () - time );
122
+ return block ;
123
+ }else {
124
+ logger .warn ("Get block id not the same , {}, {}." , num , blockNum );
125
+ sleep (exceptionSleepTime );
265
126
}
127
+ }catch (Exception e ){
128
+ logger .error ("Failed to get block: {}, reason: {}." , blockNum , e .getMessage ());
129
+ sleep (exceptionSleepTime );
130
+ }
131
+ }
132
+ }
133
+
134
+ private long getLastSolidityBlockNum () {
135
+ while (true ) {
136
+ try {
137
+ long time = System .currentTimeMillis ();
138
+ long blockNum = databaseGrpcClient .getDynamicProperties ().getLastSolidityBlockNum ();
139
+ logger .info ("Get last remote solid blockNum: {}, remoteBlockNum: {}, cost: {}." ,
140
+ blockNum , remoteBlockNum , System .currentTimeMillis () - time );
141
+ return blockNum ;
266
142
} catch (Exception e ) {
267
- logger .error (e .getMessage ());
268
- sleep (100 );
143
+ logger .error ("Failed to get last solid blockNum: {}, reason: {}." , remoteBlockNum . get (), e .getMessage ());
144
+ sleep (exceptionSleepTime );
269
145
}
270
146
}
271
147
}
@@ -277,6 +153,18 @@ public void sleep(long time) {
277
153
}
278
154
}
279
155
156
+ private void resolveCompatibilityIssueIfUsingFullNodeDatabase () {
157
+ long lastSolidityBlockNum = dbManager .getDynamicPropertiesStore ().getLatestSolidifiedBlockNum ();
158
+ long headBlockNum = dbManager .getHeadBlockNum ();
159
+ logger .info ("headBlockNum:{}, solidityBlockNum:{}, diff:{}" ,
160
+ headBlockNum , lastSolidityBlockNum , headBlockNum - lastSolidityBlockNum );
161
+ if (lastSolidityBlockNum < headBlockNum ) {
162
+ logger .info ("use fullNode database, headBlockNum:{}, solidityBlockNum:{}, diff:{}" ,
163
+ headBlockNum , lastSolidityBlockNum , headBlockNum - lastSolidityBlockNum );
164
+ dbManager .getDynamicPropertiesStore ().saveLatestSolidifiedBlockNum (headBlockNum );
165
+ }
166
+ }
167
+
280
168
/**
281
169
* Start the SolidityNode.
282
170
*/
0 commit comments