Skip to content

Commit a54d227

Browse files
committed
Rework ZSTD dictionary compression logic to create a trainer per training
patch by Stefan Miklosovic; reviewed by Yifan Cai for CASSANDRA-21209
1 parent 922c8fb commit a54d227

16 files changed

Lines changed: 357 additions & 428 deletions

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
6.0-alpha1
2+
* Rework ZSTD dictionary compression logic to create a trainer per training (CASSANDRA-21209)
23
* Improve performance when calculating settled placements during range movements (CASSANDRA-21144)
34
* Make shadow gossip round parameters configurable for testing (CASSANDRA-21149)
45
* Avoid potential gossip thread deadlock during decommission (CASSANDRA-21143)

src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java

Lines changed: 30 additions & 180 deletions
Large diffs are not rendered by default.

src/java/org/apache/cassandra/db/compression/CompressionDictionaryScheduler.java

Lines changed: 74 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.concurrent.ScheduledFuture;
2222
import java.util.concurrent.TimeUnit;
2323
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
import java.util.function.Consumer;
2426

2527
import com.google.common.annotations.VisibleForTesting;
2628

@@ -30,6 +32,7 @@
3032
import org.apache.cassandra.concurrent.ScheduledExecutors;
3133
import org.apache.cassandra.config.DatabaseDescriptor;
3234
import org.apache.cassandra.db.ColumnFamilyStore;
35+
import org.apache.cassandra.schema.CompressionParams;
3336
import org.apache.cassandra.schema.SystemDistributedKeyspace;
3437

3538
/**
@@ -48,7 +51,9 @@ public class CompressionDictionaryScheduler implements ICompressionDictionarySch
4851
private final String tableName;
4952
private final String tableId;
5053
private final ICompressionDictionaryCache cache;
51-
private final AtomicBoolean manualTrainingInProgress = new AtomicBoolean(false);
54+
private final AtomicBoolean trainingInProgress = new AtomicBoolean(false);
55+
private final AtomicReference<TrainingState> lastTrainingState = new AtomicReference<>(TrainingState.notStarted());
56+
private volatile ICompressionDictionaryTrainer activeTrainer;
5257

5358
private volatile ScheduledFuture<?> scheduledRefreshTask;
5459
private volatile boolean isEnabled;
@@ -83,31 +88,59 @@ public void scheduleRefreshTask()
8388
}
8489

8590
@Override
86-
public void scheduleSSTableBasedTraining(ICompressionDictionaryTrainer trainer,
87-
ColumnFamilyStore.RefViewFragment refViewFragment,
91+
public void scheduleSSTableBasedTraining(ColumnFamilyStore.RefViewFragment refViewFragment,
92+
CompressionParams compressionParams,
8893
CompressionDictionaryTrainingConfig config,
94+
Consumer<CompressionDictionary> listener,
8995
boolean force)
9096
{
91-
if (!manualTrainingInProgress.compareAndSet(false, true))
97+
if (!trainingInProgress.compareAndSet(false, true))
9298
{
9399
refViewFragment.close();
94100
throw new IllegalStateException("Training already in progress for table " + keyspaceName + '.' + tableName);
95101
}
96102

97-
logger.info("Starting SSTable-based dictionary training for {}.{} from {} SSTables",
98-
keyspaceName, tableName, refViewFragment.sstables.size());
103+
ICompressionDictionaryTrainer trainer;
99104

100-
// Run the SSTableSamplingTask asynchronously
101-
SSTableSamplingTask task = new SSTableSamplingTask(refViewFragment, trainer, config, force);
102-
ScheduledExecutors.nonPeriodicTasks.submit(task);
105+
try
106+
{
107+
trainer = ICompressionDictionaryTrainer.create(keyspaceName, tableName, compressionParams);
108+
trainer.setDictionaryTrainedListener(listener);
109+
}
110+
catch (Throwable t)
111+
{
112+
trainingInProgress.set(false);
113+
refViewFragment.close();
114+
throw t;
115+
}
116+
117+
if (trainer.start(config))
118+
{
119+
activeTrainer = trainer;
120+
lastTrainingState.set(trainer.getTrainingState());
121+
logger.info("Starting SSTable-based dictionary training for {}.{} from {} SSTables",
122+
keyspaceName, tableName, refViewFragment.sstables.size());
123+
124+
SSTableSamplingTask task = new SSTableSamplingTask(refViewFragment, trainer, config, force);
125+
// trainer is eventually closed here, as well as indicating
126+
// in manualTrainingInProgress that it was finished
127+
ScheduledExecutors.nonPeriodicTasks.submit(task);
128+
}
129+
else
130+
{
131+
finishTraining(trainer.getTrainingState());
132+
cleanup(refViewFragment, trainer);
133+
}
103134
}
104135

105136
/**
106137
* Cancels the in-progress manual training task.
107138
*/
108-
private void cancelManualTraining()
139+
private void finishTraining(TrainingState trainingState)
109140
{
110-
manualTrainingInProgress.compareAndSet(true, false);
141+
lastTrainingState.set(trainingState);
142+
activeTrainer = null;
143+
trainingInProgress.compareAndSet(true, false);
111144
}
112145

113146
/**
@@ -121,6 +154,15 @@ public void setEnabled(boolean enabled)
121154
this.isEnabled = enabled;
122155
}
123156

157+
@Override
158+
public TrainingState getLastTrainingState()
159+
{
160+
ICompressionDictionaryTrainer trainer = activeTrainer;
161+
if (trainer != null)
162+
return trainer.getTrainingState();
163+
return lastTrainingState.get();
164+
}
165+
124166
/**
125167
* Refreshes dictionary from system table and updates the cache.
126168
* This method is called periodically by the scheduled refresh task.
@@ -153,7 +195,7 @@ public void close()
153195
scheduledRefreshTask = null;
154196
}
155197

156-
cancelManualTraining();
198+
finishTraining(TrainingState.notStarted());
157199
}
158200

159201
/**
@@ -195,7 +237,6 @@ public void run()
195237
// Use the force parameter from the task
196238
trainer.trainDictionaryAsync(force)
197239
.addCallback((dictionary, throwable) -> {
198-
cancelManualTraining();
199240
if (throwable != null)
200241
{
201242
logger.error("SSTable-based dictionary training failed for {}.{}: {}",
@@ -206,23 +247,36 @@ public void run()
206247
logger.info("SSTable-based dictionary training completed for {}.{}",
207248
keyspaceName, tableName);
208249
}
250+
251+
finishTraining(trainer.getTrainingState());
252+
cleanup(refViewFragment, trainer);
209253
});
210254
}
211255
catch (Exception e)
212256
{
213257
logger.error("Failed to sample from SSTables for {}.{}", keyspaceName, tableName, e);
214-
cancelManualTraining();
215-
}
216-
finally
217-
{
218-
refViewFragment.close();
258+
finishTraining(trainer.getTrainingState());
259+
cleanup(refViewFragment, trainer);
219260
}
220261
}
221262
}
222263

264+
private void cleanup(ColumnFamilyStore.RefViewFragment refViewFragment, ICompressionDictionaryTrainer trainer)
265+
{
266+
try
267+
{
268+
trainer.close();
269+
}
270+
catch (Throwable t)
271+
{
272+
logger.debug("Unable to close trainer.", t);
273+
}
274+
refViewFragment.close();
275+
}
276+
223277
@VisibleForTesting
224-
boolean isManualTrainingRunning()
278+
boolean isTrainingRunning()
225279
{
226-
return manualTrainingInProgress.get();
280+
return trainingInProgress.get();
227281
}
228282
}

src/java/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfig.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,23 @@
1818

1919
package org.apache.cassandra.db.compression;
2020

21+
import java.util.Map;
22+
2123
import com.google.common.base.Preconditions;
2224

25+
import org.apache.cassandra.config.DataStorageSpec;
26+
import org.apache.cassandra.config.DurationSpec;
27+
import org.apache.cassandra.exceptions.ConfigurationException;
28+
import org.apache.cassandra.schema.CompressionParams;
29+
30+
import static java.lang.String.format;
31+
import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE;
32+
import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE;
33+
import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MIN_FREQUENCY;
34+
import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME;
35+
import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME;
36+
import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MIN_FREQUENCY_PARAMETER_NAME;
37+
2338
/**
2439
* Configuration for dictionary training parameters.
2540
*/
@@ -29,13 +44,15 @@ public class CompressionDictionaryTrainingConfig
2944
public final int maxTotalSampleSize;
3045
public final int acceptableTotalSampleSize;
3146
public final int chunkSize;
47+
public final int minTrainingFrequency;
3248

3349
private CompressionDictionaryTrainingConfig(Builder builder)
3450
{
3551
this.maxDictionarySize = builder.maxDictionarySize;
3652
this.maxTotalSampleSize = builder.maxTotalSampleSize;
3753
this.acceptableTotalSampleSize = builder.maxTotalSampleSize / 10 * 8;
3854
this.chunkSize = builder.chunkSize;
55+
this.minTrainingFrequency = builder.minTrainingFrequency;
3956
}
4057

4158
public static Builder builder()
@@ -48,6 +65,7 @@ public static class Builder
4865
private int maxDictionarySize = 65536; // 64KB default
4966
private int maxTotalSampleSize = 10 * 1024 * 1024; // 10MB total
5067
private int chunkSize = 64 * 1024; // 64KB default
68+
private int minTrainingFrequency = 0; // in minutes
5169

5270
public Builder maxDictionarySize(int size)
5371
{
@@ -67,12 +85,121 @@ public Builder chunkSize(int chunkSize)
6785
return this;
6886
}
6987

88+
public Builder minTrainingFrequency(int minTrainingFrequency)
89+
{
90+
this.minTrainingFrequency = minTrainingFrequency;
91+
return this;
92+
}
93+
7094
public CompressionDictionaryTrainingConfig build()
7195
{
7296
Preconditions.checkArgument(maxDictionarySize > 0, "maxDictionarySize must be positive");
7397
Preconditions.checkArgument(maxTotalSampleSize > 0, "maxTotalSampleSize must be positive");
7498
Preconditions.checkArgument(chunkSize > 0, "chunkSize must be positive");
99+
Preconditions.checkArgument(minTrainingFrequency >= 0, "min training frequency must be non-negative");
75100
return new CompressionDictionaryTrainingConfig(this);
76101
}
77102
}
103+
104+
public static int getMaxDictionarySize(Map<String, String> params)
105+
{
106+
return validateSizeBasedTrainingParameter(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
107+
params.getOrDefault(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
108+
DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE));
109+
}
110+
111+
public static int getMaxTotalSampleSize(Map<String, String> params)
112+
{
113+
return validateSizeBasedTrainingParameter(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
114+
params.getOrDefault(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
115+
DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE));
116+
}
117+
118+
public static int getMinTrainingFrequency(Map<String, String> params)
119+
{
120+
return validateDurationBasedTrainingParameter(TRAINING_MIN_FREQUENCY_PARAMETER_NAME,
121+
params.getOrDefault(TRAINING_MIN_FREQUENCY_PARAMETER_NAME,
122+
DEFAULT_TRAINING_MIN_FREQUENCY));
123+
}
124+
125+
public static int getMaxDictionarySizeWithUserSuppliedParams(CompressionParams compressionParams, Map<String, String> parameters)
126+
{
127+
return internalTrainingParameterResolution(compressionParams,
128+
parameters.get(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME),
129+
TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
130+
DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE);
131+
}
132+
133+
public static int getMaxTotalSampleSizeWithUserSuppliedParams(CompressionParams compressionParams, Map<String, String> parameters)
134+
{
135+
return internalTrainingParameterResolution(compressionParams,
136+
parameters.get(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME),
137+
TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
138+
DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE);
139+
}
140+
141+
private static int internalTrainingParameterResolution(CompressionParams compressionParams,
142+
String userSuppliedValue,
143+
String parameterName,
144+
String defaultParameterValue)
145+
{
146+
String resolvedValue = null;
147+
try
148+
{
149+
if (userSuppliedValue == null)
150+
resolvedValue = compressionParams.getOtherOptions().getOrDefault(parameterName, defaultParameterValue);
151+
else
152+
resolvedValue = userSuppliedValue;
153+
154+
return new DataStorageSpec.IntBytesBound(resolvedValue).toBytes();
155+
}
156+
catch (Throwable t)
157+
{
158+
throw new IllegalArgumentException(String.format("Invalid value for %s: %s", parameterName, resolvedValue));
159+
}
160+
}
161+
162+
/**
163+
* Validates value of a parameter for training purposes. The value to validate should
164+
* be accepted by {@link DataStorageSpec.IntKibibytesBound}. This method is used upon validation
165+
* of input parameters in the implementations of dictionary compressor.
166+
*
167+
* @param parameterName name of a parameter to validate
168+
* @param resolvedValue value to validate
169+
* @return resolved value in bytes
170+
*/
171+
static int validateSizeBasedTrainingParameter(String parameterName, String resolvedValue)
172+
{
173+
try
174+
{
175+
return new DataStorageSpec.IntBytesBound(resolvedValue).toBytes();
176+
}
177+
catch (Throwable t)
178+
{
179+
throw new ConfigurationException(format("Unable to set value to parameter %s: %s. Reason: %s",
180+
parameterName, resolvedValue, t.getMessage()));
181+
}
182+
}
183+
184+
/**
185+
* Validates value of a parameter for training purposes. The value to validate should
186+
* be accepted by {@link DurationSpec.IntMinutesBound}. This method is used upon validation of input parameters
187+
* in the implementation of dictionary compressor.
188+
*
189+
* @param parameterName name of a parameter to validate
190+
* @param resolvedValue value to validate
191+
* @return resolved value in minutes
192+
*/
193+
static int validateDurationBasedTrainingParameter(String parameterName, String resolvedValue)
194+
{
195+
try
196+
{
197+
return new DurationSpec.IntMinutesBound(resolvedValue).toMinutes();
198+
}
199+
catch (Throwable t)
200+
{
201+
throw new ConfigurationException(format("Unable to set value to parameter %s: %s. Reason: %s",
202+
parameterName, resolvedValue, t.getMessage()));
203+
}
204+
}
78205
}

src/java/org/apache/cassandra/db/compression/ICompressionDictionaryScheduler.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818

1919
package org.apache.cassandra.db.compression;
2020

21+
import java.util.function.Consumer;
22+
2123
import org.apache.cassandra.db.ColumnFamilyStore.RefViewFragment;
24+
import org.apache.cassandra.schema.CompressionParams;
2225

2326
/**
2427
* Interface for managing scheduled tasks for compression dictionary operations.
@@ -38,15 +41,24 @@ public interface ICompressionDictionaryScheduler extends AutoCloseable
3841
/**
3942
* Schedules SSTable-based training that samples from existing SSTables.
4043
*
41-
* @param trainer the trainer to use
44+
* A caller of this method should ensure that SSTables referred in {@code refViewFragment} are closed
45+
* eventually, either directly at the end of that method or by other means, when training is running
46+
* asynchronously.
47+
*
48+
* A caller of this method might assume that {@code trainer} might be closed after this method finishes, either
49+
* directly in this method or indirectly when training is running asynchronously.
50+
*
4251
* @param refViewFragment the view of SSTables to sample from
52+
* @param compressionParams parameters for compression
4353
* @param config the training configuration
54+
* @param listener listener invoked when a dictionary is trained
4455
* @param force force the dictionary training even if there are not enough samples
4556
* @throws IllegalStateException if training is already in progress
4657
*/
47-
void scheduleSSTableBasedTraining(ICompressionDictionaryTrainer trainer,
48-
RefViewFragment refViewFragment,
58+
void scheduleSSTableBasedTraining(RefViewFragment refViewFragment,
59+
CompressionParams compressionParams,
4960
CompressionDictionaryTrainingConfig config,
61+
Consumer<CompressionDictionary> listener,
5062
boolean force);
5163

5264
/**
@@ -55,4 +67,6 @@ void scheduleSSTableBasedTraining(ICompressionDictionaryTrainer trainer,
5567
* @param enabled whether the scheduler should be enabled
5668
*/
5769
void setEnabled(boolean enabled);
70+
71+
TrainingState getLastTrainingState();
5872
}

0 commit comments

Comments
 (0)