Skip to content

Commit fa23d59

Browse files
fengjiachunkillme2008
authored andcommitted
fixbug/install snapshot bug (#80)
* (fix) required eof * (fix) typo * (fix) read file bug * (fix) format * (fix) code format * (fix) NodeTest's code format * (fix) FileService unit test * (fix) large snapshot unit test * (fix) add install snapshot rpc timeout, default is 5 min * (fix) code format * (fix) an error log is needed on copy failed * (fix) code format * (fix) typo * (fix) add unit test: testInstallLargeSnapshot() * (fix) minor fix * (fix) add more detailed error log on copy failed
1 parent 688a7ec commit fa23d59

File tree

13 files changed

+488
-288
lines changed

13 files changed

+488
-288
lines changed

jraft-core/src/main/java/com/alipay/sofa/jraft/option/RpcOptions.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ public class RpcOptions {
3232
*/
3333
private int rpcDefaultTimeout = 5000;
3434

35+
/**
36+
* Install snapshot RPC request default timeout in milliseconds
37+
* Default: 5 * 60 * 1000(5min)
38+
*/
39+
private int rpcInstallSnapshotTimeout = 5 * 60 * 1000;
40+
3541
/**
3642
* Rpc process thread pool size
3743
* Default: 80
@@ -75,10 +81,19 @@ public void setRpcDefaultTimeout(int rpcDefaultTimeout) {
7581
this.rpcDefaultTimeout = rpcDefaultTimeout;
7682
}
7783

84+
public int getRpcInstallSnapshotTimeout() {
85+
return rpcInstallSnapshotTimeout;
86+
}
87+
88+
public void setRpcInstallSnapshotTimeout(int rpcInstallSnapshotTimeout) {
89+
this.rpcInstallSnapshotTimeout = rpcInstallSnapshotTimeout;
90+
}
91+
7892
@Override
7993
public String toString() {
8094
return "RpcOptions{" + "rpcConnectTimeoutMs=" + rpcConnectTimeoutMs + ", rpcDefaultTimeout="
81-
+ rpcDefaultTimeout + ", rpcProcessorThreadPoolSize=" + rpcProcessorThreadPoolSize + ", metricRegistry="
82-
+ metricRegistry + '}';
95+
+ rpcDefaultTimeout + ", rpcInstallSnapshotTimeout=" + rpcInstallSnapshotTimeout
96+
+ ", rpcProcessorThreadPoolSize=" + rpcProcessorThreadPoolSize + ", metricRegistry=" + metricRegistry
97+
+ '}';
8398
}
8499
}

jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public Future<Message> getFile(Endpoint endpoint, GetFileRequest request, int ti
100100
@Override
101101
public Future<Message> installSnapshot(Endpoint endpoint, InstallSnapshotRequest request,
102102
RpcResponseClosure<InstallSnapshotResponse> done) {
103-
return invokeWithDone(endpoint, request, done, rpcOptions.getRpcDefaultTimeout());
103+
return invokeWithDone(endpoint, request, done, rpcOptions.getRpcInstallSnapshotTimeout());
104104
}
105105

106106
@Override

jraft-core/src/main/java/com/alipay/sofa/jraft/storage/FileService.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public final class FileService {
5555
private static final FileService INSTANCE = new FileService();
5656

5757
private final ConcurrentMap<Long, FileReader> fileReaderMap = new ConcurrentHashMap<>();
58-
private final AtomicLong nextId;
58+
private final AtomicLong nextId = new AtomicLong();
5959

6060
/**
6161
* Retrieve the singleton instance of FileService.
@@ -72,18 +72,16 @@ void clear() {
7272
}
7373

7474
private FileService() {
75-
this.nextId = new AtomicLong();
76-
final long initValue = Math.abs(Utils.getProcessId(ThreadLocalRandom.current().nextLong(10000,
77-
Integer.MAX_VALUE)) << 45
78-
| System.nanoTime() << 17 >> 17);
79-
this.nextId.set(initValue);
80-
LOG.info("Initial file reader id in FileService is {}", this.nextId);
75+
final long processId = Utils.getProcessId(ThreadLocalRandom.current().nextLong(10000, Integer.MAX_VALUE));
76+
final long initialValue = Math.abs(processId << 45 | System.nanoTime() << 17 >> 17);
77+
this.nextId.set(initialValue);
78+
LOG.info("Initial file reader id in FileService is {}", initialValue);
8179
}
8280

8381
/**
84-
* Handle GetFileRequest ,run the response or set the response with done.
82+
* Handle GetFileRequest, run the response or set the response with done.
8583
*/
86-
public Message handleGetFile(GetFileRequest request, RpcRequestClosure done) {
84+
public Message handleGetFile(final GetFileRequest request, final RpcRequestClosure done) {
8785
if (request.getCount() <= 0 || request.getOffset() < 0) {
8886
return RpcResponseFactory.newResponse(RaftError.EREQUEST, "Invalid request: %s", request);
8987
}
@@ -92,23 +90,24 @@ public Message handleGetFile(GetFileRequest request, RpcRequestClosure done) {
9290
return RpcResponseFactory.newResponse(RaftError.ENOENT, "Fail to find reader=%d", request.getReaderId());
9391
}
9492

95-
LOG.debug("GetFile from {} path={} filename={} offset={} count={}", done.getBizContext().getRemoteAddress(),
96-
reader.getPath(), request.getFilename(), request.getOffset(), request.getCount());
93+
if (LOG.isDebugEnabled()) {
94+
LOG.debug("GetFile from {} path={} filename={} offset={} count={}",
95+
done.getBizContext().getRemoteAddress(), reader.getPath(), request.getFilename(), request.getOffset(),
96+
request.getCount());
97+
}
9798

9899
final ByteBufferCollector dataBuffer = ByteBufferCollector.allocate();
99100
final GetFileResponse.Builder responseBuilder = GetFileResponse.newBuilder();
100101
try {
101102
final int read = reader
102103
.readFile(dataBuffer, request.getFilename(), request.getOffset(), request.getCount());
103104
responseBuilder.setReadSize(read);
104-
if (read == -1) {
105-
responseBuilder.setEof(true);
106-
}
105+
responseBuilder.setEof(read == FileReader.EOF);
107106
final ByteBuffer buf = dataBuffer.getBuffer();
108107
buf.flip();
109108
if (!buf.hasRemaining()) {
110109
// skip empty data
111-
return responseBuilder.setData(ByteString.EMPTY).build();
110+
responseBuilder.setData(ByteString.EMPTY);
112111
} else {
113112
// TODO check hole
114113
responseBuilder.setData(ZeroByteStringHelper.wrap(buf));
@@ -128,9 +127,9 @@ public Message handleGetFile(GetFileRequest request, RpcRequestClosure done) {
128127
/**
129128
* Adds a file reader and return it's generated readerId.
130129
*/
131-
public long addReader(FileReader reader) {
130+
public long addReader(final FileReader reader) {
132131
final long readerId = this.nextId.getAndIncrement();
133-
if (fileReaderMap.putIfAbsent(readerId, reader) == null) {
132+
if (this.fileReaderMap.putIfAbsent(readerId, reader) == null) {
134133
return readerId;
135134
} else {
136135
return -1L;
@@ -140,7 +139,7 @@ public long addReader(FileReader reader) {
140139
/**
141140
* Remove the reader by readerId.
142141
*/
143-
public boolean removeReader(long readerId) {
142+
public boolean removeReader(final long readerId) {
144143
return this.fileReaderMap.remove(readerId) != null;
145144
}
146145
}

jraft-core/src/main/java/com/alipay/sofa/jraft/storage/io/FileReader.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
*/
3131
public interface FileReader {
3232

33+
int EOF = -1;
34+
3335
/**
3436
* Get the file path.
3537
*
@@ -39,8 +41,17 @@ public interface FileReader {
3941

4042
/**
4143
* Read file into buf starts from offset at most maxCount.
42-
* Returns -1 if reaches end, else return read count.
44+
*
45+
* @param buf read bytes into this buf
46+
* @param fileName file name
47+
* @param offset the offset of file
48+
* @param maxCount max read bytes
49+
* @return -1 if reaches end, else return read count.
50+
* @throws IOException if some I/O error occurs
51+
* @throws RetryAgainException if it's not allowed to read partly
52+
* or it's allowed but throughput is throttled to 0, try again.
4353
*/
44-
int readFile(ByteBufferCollector buf, String fileName, long offset, long maxCount) throws IOException,
45-
RetryAgainException;
54+
int readFile(final ByteBufferCollector buf, final String fileName, final long offset, final long maxCount)
55+
throws IOException,
56+
RetryAgainException;
4657
}

jraft-core/src/main/java/com/alipay/sofa/jraft/storage/io/LocalDirReader.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@
2929
import com.google.protobuf.Message;
3030

3131
/**
32-
* read a file data form local dir by fileName.
32+
* Read a file data form local dir by fileName.
33+
*
3334
* @author boyan ([email protected])
3435
*
3536
* 2018-Apr-06 9:25:12 PM
3637
*/
3738
public class LocalDirReader implements FileReader {
39+
3840
private static final Logger LOG = LoggerFactory.getLogger(LocalDirReader.class);
3941

4042
private final String path;
@@ -46,44 +48,45 @@ public LocalDirReader(String path) {
4648

4749
@Override
4850
public String getPath() {
49-
return this.path;
51+
return path;
5052
}
5153

5254
@Override
53-
public int readFile(ByteBufferCollector buf, String fileName, long offset, long maxCount) throws IOException,
54-
RetryAgainException {
55-
return this.readFileWithMeta(buf, fileName, null, offset, maxCount);
55+
public int readFile(final ByteBufferCollector buf, final String fileName, final long offset, final long maxCount)
56+
throws IOException,
57+
RetryAgainException {
58+
return readFileWithMeta(buf, fileName, null, offset, maxCount);
5659
}
5760

5861
@SuppressWarnings("unused")
59-
protected int readFileWithMeta(ByteBufferCollector buf, String fileName, Message fileMeta, long offset,
60-
long maxCount) throws IOException, RetryAgainException {
62+
protected int readFileWithMeta(final ByteBufferCollector buf, final String fileName, final Message fileMeta,
63+
long offset, final long maxCount) throws IOException, RetryAgainException {
6164
buf.expandIfNecessary();
6265
final String filePath = this.path + File.separator + fileName;
6366
final File file = new File(filePath);
64-
try (FileInputStream input = new FileInputStream(file); FileChannel fc = input.getChannel()) {
67+
try (final FileInputStream input = new FileInputStream(file); final FileChannel fc = input.getChannel()) {
6568
int totalRead = 0;
6669
while (true) {
6770
final int nread = fc.read(buf.getBuffer(), offset);
6871
if (nread <= 0) {
69-
return -1;
72+
return EOF;
7073
}
7174
totalRead += nread;
7275
if (totalRead < maxCount) {
7376
if (buf.hasRemaining()) {
74-
return -1;
77+
return EOF;
7578
} else {
7679
buf.expandAtMost((int) (maxCount - totalRead));
7780
offset += nread;
7881
}
7982
} else {
8083
final long fsize = file.length();
8184
if (fsize < 0) {
82-
LOG.warn("Invlaid file length {}", filePath);
83-
return -1;
85+
LOG.warn("Invalid file length {}", filePath);
86+
return EOF;
8487
}
85-
if (fsize == offset + maxCount) {
86-
return -1;
88+
if (fsize == offset + nread) {
89+
return EOF;
8790
} else {
8891
return totalRead;
8992
}

jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/SnapshotFileReader.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,20 @@ public SnapshotFileReader(String path, SnapshotThrottle snapshotThrottle) {
5454
}
5555

5656
public boolean open() {
57-
final File file = new File(this.getPath());
57+
final File file = new File(getPath());
5858
return file.exists();
5959
}
6060

6161
@Override
62-
public int readFile(ByteBufferCollector metaBufferCollector, String fileName, long offset, long maxCount)
63-
throws IOException,
64-
RetryAgainException {
62+
public int readFile(final ByteBufferCollector metaBufferCollector, final String fileName, final long offset,
63+
final long maxCount) throws IOException, RetryAgainException {
6564
// read the whole meta file.
6665
if (fileName.equals(Snapshot.JRAFT_SNAPSHOT_META_FILE)) {
6766
final ByteBuffer metaBuf = this.metaTable.saveToByteBufferAsRemote();
68-
//because bufRef will flip the buffer before using, so we must set the meta buffer position to it's limit.
67+
// because bufRef will flip the buffer before using, so we must set the meta buffer position to it's limit.
6968
metaBuf.position(metaBuf.limit());
7069
metaBufferCollector.setBuffer(metaBuf);
71-
return -1;
70+
return EOF;
7271
}
7372
final LocalFileMeta fileMeta = this.metaTable.getFileMeta(fileName);
7473
if (fileMeta == null) {
@@ -78,7 +77,7 @@ public int readFile(ByteBufferCollector metaBufferCollector, String fileName, lo
7877
// go through throttle
7978
long newMaxCount = maxCount;
8079
if (this.snapshotThrottle != null) {
81-
newMaxCount = snapshotThrottle.throttledByThroughput(maxCount);
80+
newMaxCount = this.snapshotThrottle.throttledByThroughput(maxCount);
8281
if (newMaxCount < maxCount) {
8382
// if it's not allowed to read partly or it's allowed but
8483
// throughput is throttled to 0, try again.
@@ -88,7 +87,6 @@ public int readFile(ByteBufferCollector metaBufferCollector, String fileName, lo
8887
}
8988
}
9089

91-
return this.readFileWithMeta(metaBufferCollector, fileName, fileMeta, offset, newMaxCount);
90+
return readFileWithMeta(metaBufferCollector, fileName, fileMeta, offset, newMaxCount);
9291
}
93-
9492
}

0 commit comments

Comments
 (0)