Skip to content

Commit 31e0723

Browse files
committed
Use non-transactional pipelines in bulk op
Fix #5265 Fix #4815
1 parent 0192455 commit 31e0723

File tree

6 files changed

+43
-86
lines changed

6 files changed

+43
-86
lines changed

src/app/models/treeoperations.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ void TreeOperations::getUsedMemory(const QList<QByteArray>& keys, int dbIndex,
500500
result->call(*totalMemory);
501501
}
502502
}
503-
});
503+
}, true);
504504
}
505505

506506
QString TreeOperations::mode() {

src/modules/bulk-operations/operations/copyoperation.cpp

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -66,40 +66,24 @@ void BulkOperations::CopyOperation::performOperation(
6666

6767
if (m_restoreBuffer.size() > RESTORE_BUFFER_LIMIT ||
6868
m_dumpedKeys == m_affectedKeys.size()) {
69+
int batchSize = m_restoreBuffer.size();
70+
6971
targetConnection->pipelinedCmd(
7072
m_restoreBuffer, this, targetDbIndex,
71-
[this, returnResults](const RedisClient::Response& r, QString err) {
72-
if (!err.isEmpty()) {
73-
return processError(err);
73+
[this, returnResults, batchSize](const RedisClient::Response& r, QString err) {
74+
if (!err.isEmpty() || r.isErrorMessage()) {
75+
return processError(err.isEmpty()? r.value().toByteArray() : err);
7476
}
75-
QVariant incrResult = r.value();
7677
{
7778
QMutexLocker l(&m_processedKeysMutex);
78-
79-
if (incrResult.canConvert(QVariant::ByteArray)) {
80-
if (r.isErrorMessage()) {
81-
return processError(incrResult.toString());
82-
}
83-
m_progress++;
84-
} else if (incrResult.canConvert(QVariant::List)) {
85-
auto responses = incrResult.toList();
86-
87-
for (auto resp : responses) {
88-
if (resp.toString().startsWith("ERR")) {
89-
return processError(resp.toString());
90-
}
91-
92-
m_progress++;
93-
}
94-
}
95-
79+
m_progress += batchSize;
9680
emit progress(m_progress);
9781
}
9882

9983
if (m_progress >= m_affectedKeys.size()) {
10084
returnResults();
10185
}
102-
});
86+
}, false);
10387
m_restoreBuffer.clear();
10488
}
10589
};
@@ -111,7 +95,7 @@ void BulkOperations::CopyOperation::performOperation(
11195
rawCmds.append({"DUMP", k});
11296
}
11397

114-
m_connection->pipelinedCmd(rawCmds, this, -1, processKeyDumps);
98+
m_connection->pipelinedCmd(rawCmds, this, -1, processKeyDumps, true);
11599
};
116100

117101
auto verifySourceConnection = [this, processKeys, targetConnection]() {

src/modules/bulk-operations/operations/deleteoperation.cpp

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -56,39 +56,27 @@ void BulkOperations::DeleteOperation::deleteKeys(
5656
rawCmds.append({rmCmd, k});
5757
}
5858

59+
int batchSize = m_connection->pipelineCommandsLimit();
5960
int expectedResponses = rawCmds.size();
6061

6162
m_connection->pipelinedCmd(
6263
rawCmds, this, m_dbIndex,
63-
[this, expectedResponses, callback](const RedisClient::Response &r,
64-
QString err) {
65-
if (!err.isEmpty()) {
66-
return processError(err);
64+
[this, expectedResponses, callback, batchSize](
65+
const RedisClient::Response &r, QString err) {
66+
if (!err.isEmpty() || r.isErrorMessage()) {
67+
return processError(err.isEmpty() ? r.value().toByteArray() : err);
6768
}
6869

69-
if (r.isErrorMessage()) {
70-
return processError(r.value().toByteArray());
71-
}
72-
7370
{
74-
QMutexLocker l(&m_processedKeysMutex);
75-
QVariant incrResult = r.value();
76-
77-
if (incrResult.canConvert(QVariant::ByteArray)) {
78-
m_progress++;
79-
} else if (incrResult.canConvert(QVariant::List)) {
80-
auto responses = incrResult.toList();
71+
QMutexLocker l(&m_processedKeysMutex);
72+
m_progress += batchSize;
8173

82-
for (auto resp : responses) {
83-
m_progress++;
84-
}
85-
}
86-
87-
emit progress(m_progress);
74+
emit progress(m_progress);
8875
}
8976

9077
if (m_progress >= expectedResponses) {
9178
callback();
9279
}
93-
});
80+
},
81+
false);
9482
}

src/modules/bulk-operations/operations/rdbimport.cpp

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -95,37 +95,27 @@ void BulkOperations::RDBImportOperation::performOperation(
9595
rawCmds.append(rawCmd);
9696
}
9797

98+
int batchSize = m_connection->pipelineCommandsLimit();
9899
int expectedResponses = rawCmds.size();
99100

100101
m_connection->pipelinedCmd(
101-
rawCmds, this, -1,
102-
[this, returnResults, expectedResponses](const RedisClient::Response& r,
102+
rawCmds, this, m_dbIndex,
103+
[this, returnResults, expectedResponses, batchSize](const RedisClient::Response& r,
103104
const QString& err) {
104-
if (!err.isEmpty()) {
105-
return processError(err);
105+
if (!err.isEmpty() || r.isErrorMessage()) {
106+
return processError(err.isEmpty()? r.value().toByteArray() : err);
106107
}
107108

108109
{
109-
QMutexLocker l(&m_processedKeysMutex);
110-
QVariant incrResult = r.value();
111-
112-
if (incrResult.canConvert(QVariant::ByteArray)) {
113-
m_progress++;
114-
} else if (incrResult.canConvert(QVariant::List)) {
115-
auto responses = incrResult.toList();
116-
117-
for (auto resp : responses) {
118-
m_progress++;
119-
}
120-
}
121-
110+
QMutexLocker l(&m_processedKeysMutex);
111+
m_progress += batchSize;
122112
emit progress(m_progress);
123113
}
124114

125115
if (m_progress >= expectedResponses) {
126116
returnResults();
127117
}
128-
});
118+
}, false);
129119
};
130120

131121
m_python->call_native(
@@ -136,10 +126,14 @@ void BulkOperations::RDBImportOperation::performOperation(
136126
QVariantList commands = v.toList();
137127

138128
m_connection->cmd(
139-
{"ping", QByteArray::number(m_dbIndex)}, this, -1,
140-
[processCommands, commands](const RedisClient::Response&) {
129+
{"ping"}, this, m_dbIndex,
130+
[processCommands, commands, this](const RedisClient::Response& r) {
131+
if (r.isErrorMessage()) {
132+
return processError(QCoreApplication::translate(
133+
"RESP", "Target connection error"));
134+
}
141135
QtConcurrent::run(processCommands, commands);
142136
},
143-
[](const QString&) {});
137+
[this](const QString& err) { processError(err); });
144138
});
145139
}

src/modules/bulk-operations/operations/ttloperation.cpp

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,35 +48,26 @@ void BulkOperations::TtlOperation::setTtl(const QList<QByteArray>& keys,
4848
rawCmds.append({"EXPIRE", k, ttl});
4949
}
5050

51+
int batchSize = m_connection->pipelineCommandsLimit();
5152
int expectedResponses = rawCmds.size();
5253

5354
m_connection->pipelinedCmd(
5455
rawCmds, this, -1,
55-
[this, expectedResponses, callback](const RedisClient::Response& r,
56-
QString err) {
57-
if (!err.isEmpty()) {
58-
return processError(err);
56+
[this, expectedResponses, callback, batchSize](
57+
const RedisClient::Response& r, QString err) {
58+
if (!err.isEmpty() || r.isErrorMessage()) {
59+
return processError(err.isEmpty() ? r.value().toByteArray() : err);
5960
}
6061

6162
{
6263
QMutexLocker l(&m_processedKeysMutex);
63-
QVariant incrResult = r.value();
64-
65-
if (incrResult.canConvert(QVariant::ByteArray)) {
66-
m_progress++;
67-
} else if (incrResult.canConvert(QVariant::List)) {
68-
auto responses = incrResult.toList();
69-
70-
for (auto resp : responses) {
71-
m_progress++;
72-
}
73-
}
74-
64+
m_progress += batchSize;
7565
emit progress(m_progress);
7666
}
7767

7868
if (m_progress >= expectedResponses) {
7969
callback();
8070
}
81-
});
71+
},
72+
false);
8273
}

0 commit comments

Comments
 (0)