Skip to content

Commit 0b61abf

Browse files
committed
Add initialization safety support for Apache Geode CacheServers.
Resolves spring-atticgh-554.
1 parent 9d20ee3 commit 0b61abf

File tree

6 files changed

+779
-22
lines changed

6 files changed

+779
-22
lines changed

spring-data-geode/src/main/java/org/springframework/data/gemfire/config/annotation/EnableGemFireInitializationSafety.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.lang.annotation.Target;
2525

2626
import org.apache.geode.cache.GemFireCache;
27+
import org.apache.geode.cache.Region;
2728

2829
import org.springframework.context.annotation.Import;
2930

@@ -38,6 +39,7 @@
3839
* @see java.lang.annotation.Retention
3940
* @see java.lang.annotation.Target
4041
* @see org.apache.geode.cache.GemFireCache
42+
* @see org.apache.geode.cache.Region
4143
* @see org.springframework.context.annotation.Import
4244
* @see org.springframework.data.gemfire.config.annotation.GemFireInitializationSafetyConfiguration
4345
* @since 2.7.0
@@ -62,6 +64,21 @@
6264
*/
6365
boolean enableCacheCallbackSafety() default GemFireInitializationSafetyConfiguration.DEFAULT_ENABLE_CACHE_CALLBACK_SAFETY;
6466

67+
/**
68+
* Configures whether initialization safety is enabled or disabled for
69+
* {@link org.apache.geode.cache.server.CacheServer} components.
70+
*
71+
* Defaults to {@literal false}. By default, Apache Geode already protects cache {@link Region Regions} from client
72+
* access before the cluster is fully initialized.
73+
*
74+
* Use the {@literal spring.data.gemfire.cache.initialization-safety.server.enabled} property
75+
* in {@literal application.properties}.
76+
*
77+
* @return a boolean value indicating whether initialization safety is enabled or disabled for
78+
* {@link org.apache.geode.cache.server.CacheServer} components.
79+
*/
80+
boolean enableCacheServerSafety() default GemFireInitializationSafetyConfiguration.DEFAULT_ENABLE_CACHE_SERVER_SAFETY;
81+
6582
/**
6683
* Configures whether initialization safety is enabled or disabled for {@link GemFireCache} WAN Gateway components.
6784
*

spring-data-geode/src/main/java/org/springframework/data/gemfire/config/annotation/GemFireInitializationSafetyConfiguration.java

Lines changed: 96 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.geode.cache.TransactionListener;
3737
import org.apache.geode.cache.TransactionWriter;
3838
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
39+
import org.apache.geode.cache.server.CacheServer;
3940
import org.apache.geode.cache.wan.GatewayReceiver;
4041

4142
import org.springframework.beans.BeansException;
@@ -67,6 +68,8 @@
6768
import org.springframework.data.gemfire.event.support.InitializationSafeTransactionWriter;
6869
import org.springframework.data.gemfire.init.InitializationSafe;
6970
import org.springframework.data.gemfire.init.WaitStrategy;
71+
import org.springframework.data.gemfire.server.CacheServerFactoryBean;
72+
import org.springframework.data.gemfire.server.support.InitializationSafeCacheServer;
7073
import org.springframework.data.gemfire.util.CollectionUtils;
7174
import org.springframework.data.gemfire.wan.GatewayReceiverFactoryBean;
7275
import org.springframework.data.gemfire.wan.support.GatewayReceiverStartException;
@@ -84,9 +87,11 @@
8487
* @see java.lang.annotation.Annotation
8588
* @see org.apache.geode.cache.CacheCallback
8689
* @see org.apache.geode.cache.GemFireCache
90+
* @see org.apache.geode.cache.server.CacheServer
8791
* @see org.apache.geode.cache.wan.GatewayReceiver
8892
* @see org.springframework.beans.factory.config.BeanPostProcessor
8993
* @see org.springframework.context.ApplicationContext
94+
* @see org.springframework.context.ApplicationContextAware
9095
* @see org.springframework.context.ApplicationListener
9196
* @see org.springframework.context.annotation.Bean
9297
* @see org.springframework.context.annotation.Configuration
@@ -100,7 +105,8 @@
100105
* @see org.springframework.data.gemfire.event.GemFireClusterReadyStrategy
101106
* @see org.springframework.data.gemfire.init.InitializationSafe
102107
* @see org.springframework.data.gemfire.init.WaitStrategy
103-
* @see InitializationSafeGatewayReceiver
108+
* @see org.springframework.data.gemfire.server.support.InitializationSafeCacheServer
109+
* @see org.springframework.data.gemfire.wan.support.InitializationSafeGatewayReceiver
104110
* @since 2.7.0
105111
*/
106112
@Configuration
@@ -109,13 +115,16 @@ public class GemFireInitializationSafetyConfiguration extends AbstractAnnotation
109115
implements ApplicationContextAware, ImportAware {
110116

111117
protected static final boolean DEFAULT_ENABLE_CACHE_CALLBACK_SAFETY = true;
118+
protected static final boolean DEFAULT_ENABLE_CACHE_SERVER_SAFETY = false;
112119
protected static final boolean DEFAULT_ENABLE_GATEWAY_SAFETY = true;
113120

114121
private volatile Boolean enableCacheCallbackSafety = DEFAULT_ENABLE_CACHE_CALLBACK_SAFETY;
122+
private volatile Boolean enableCacheServerSafety = DEFAULT_ENABLE_CACHE_SERVER_SAFETY;
115123
private volatile Boolean enableGatewaySafety = DEFAULT_ENABLE_GATEWAY_SAFETY;
116124

117125
private ConfigurableApplicationContext applicationContext;
118126

127+
private final List<String> cacheServerBeanNames = Collections.synchronizedList(new ArrayList<>());
119128
private final List<String> gatewayReceiverBeanNames = Collections.synchronizedList(new ArrayList<>());
120129

121130
/**
@@ -174,14 +183,17 @@ public void setImportMetadata(@NonNull AnnotationMetadata importMetadata) {
174183
setEnableCacheCallbackSafety(resolveProperty(cacheProperty("initialization-safety.callback.enabled"),
175184
Boolean.class, enableGemFireInitializationSafetyAttributes.getBoolean("enableCacheCallbackSafety")));
176185

186+
setEnableCacheServerSafety(resolveProperty(cacheProperty("initialization-safety.server.enabled"),
187+
Boolean.class, enableGemFireInitializationSafetyAttributes.getBoolean("enableCacheServerSafety")));
188+
177189
setEnableGatewaySafety(resolveProperty(cacheProperty("initialization-safety.gateway.enabled"),
178190
Boolean.class, enableGemFireInitializationSafetyAttributes.getBoolean("enableGatewaySafety")));
179191
}
180192
}
181193

182194
@Bean
183195
@Order(Ordered.HIGHEST_PRECEDENCE)
184-
public ApplicationListener<ContextRefreshedEvent> gemfireClusterReadyApplicationListener(
196+
public @NonNull ApplicationListener<ContextRefreshedEvent> gemfireClusterReadyApplicationListener(
185197
@Autowired(required = false) List<GemFireClusterReadyStrategy> clusterReadyStrategies) {
186198

187199
GemFireClusterReadyApplicationListener listener = new GemFireClusterReadyApplicationListener();
@@ -202,7 +214,7 @@ public ApplicationListener<ContextRefreshedEvent> gemfireClusterReadyApplication
202214
return new BeanPostProcessor() {
203215

204216
@Override
205-
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
217+
public @Nullable Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
206218

207219
if (isCacheCallbackInitializationSafetyEnabled()) {
208220
if (bean instanceof CacheCallback) {
@@ -224,16 +236,51 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
224236
};
225237
}
226238

227-
// TODO: Should we also explicitly handle Apache Geode CacheServers even though Apache Geode seems to do
228-
// the right thing in this case!?!?
229-
/*
230239
@Bean
231240
public @NonNull BeanPostProcessor cacheServerGemFireClusterReadyAwareBeanPostProcessor(
232241
@Autowired(required = false) WaitStrategy waitStrategy) {
233242

234-
return new BeanPostProcessor() { };
243+
return new BeanPostProcessor() {
244+
245+
@Override
246+
public @Nullable Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
247+
248+
if (isCacheServerInitializationSafetyEnabled()) {
249+
if (bean instanceof CacheServerFactoryBean) {
250+
configureForInitializationSafety((CacheServerFactoryBean) bean, beanName);
251+
}
252+
}
253+
254+
return bean;
255+
}
256+
257+
private CacheServerFactoryBean configureForInitializationSafety(CacheServerFactoryBean bean,
258+
String beanName) {
259+
260+
if (bean.isAutoStartup()) {
261+
GemFireInitializationSafetyConfiguration.this.cacheServerBeanNames.add(beanName);
262+
}
263+
264+
bean.setAutoStartup(false);
265+
266+
return bean;
267+
}
268+
269+
@Override
270+
public @Nullable Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
271+
272+
if (isCacheServerInitializationSafetyEnabled()) {
273+
if (bean instanceof CacheServer) {
274+
bean = InitializationSafeCacheServer.from((CacheServer) bean)
275+
.usingWaitStrategy(waitStrategy);
276+
bean = registerApplicationListener(bean);
277+
}
278+
}
279+
280+
return bean;
281+
}
282+
};
235283
}
236-
*/
237284

238285
@Bean
239286
public @NonNull BeanPostProcessor wanGatewayGemFireClusterReadyAwareBeanPostProcessor(
@@ -242,21 +289,22 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
242289
return new BeanPostProcessor() {
243290

244291
@Override
245-
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
292+
public @Nullable Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
246293

247294
if (isGatewayInitializationSafetyEnabled()) {
248295
if (bean instanceof GatewayReceiverFactoryBean) {
249-
bean = configureForInitializationSafety((GatewayReceiverFactoryBean) bean);
296+
bean = configureForInitializationSafety((GatewayReceiverFactoryBean) bean, beanName);
250297
}
251298
}
252299

253300
return bean;
254301
}
255302

256-
private GatewayReceiverFactoryBean configureForInitializationSafety(GatewayReceiverFactoryBean bean) {
303+
private GatewayReceiverFactoryBean configureForInitializationSafety(GatewayReceiverFactoryBean bean,
304+
String beanName) {
257305

258306
if (bean.isAutoStart()) {
259-
GemFireInitializationSafetyConfiguration.this.gatewayReceiverBeanNames.add(bean.getName());
307+
GemFireInitializationSafetyConfiguration.this.gatewayReceiverBeanNames.add(beanName);
260308
}
261309

262310
bean.setManualStart(true);
@@ -265,7 +313,7 @@ private GatewayReceiverFactoryBean configureForInitializationSafety(GatewayRecei
265313
}
266314

267315
@Override
268-
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
316+
public @Nullable Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
269317

270318
if (isGatewayInitializationSafetyEnabled()) {
271319
if (bean instanceof GatewayReceiver) {
@@ -284,9 +332,36 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
284332
@EventListener(GemFireClusterReadyApplicationEvent.class)
285333
public void disableGemFireInitializationSafetyProcessingEventListener(GemFireClusterReadyApplicationEvent event) {
286334
setEnableCacheCallbackSafety(false);
335+
setEnableCacheServerSafety(false);
287336
setEnableGatewaySafety(false);
288337
}
289338

339+
@Order(Ordered.HIGHEST_PRECEDENCE + 2_000_000)
340+
@EventListener(StartableGemFireComponentsNotificationApplicationEvent.class)
341+
public void startCacheServersEventListener(StartableGemFireComponentsNotificationApplicationEvent event) {
342+
343+
Map<String, CacheServer> cacheServerBeans =
344+
CollectionUtils.nullSafeMap(getApplicationContext().getBeansOfType(CacheServer.class));
345+
346+
Consumer<CacheServer> cacheServerStarter = cacheServer -> {
347+
try {
348+
cacheServer.start();
349+
}
350+
catch (IOException cause) {
351+
String message = String.format("Failed to start CacheServer [%s]", cacheServer);
352+
throw new IllegalStateException(message, cause);
353+
}
354+
};
355+
356+
Predicate<Map.Entry<String, CacheServer>> cacheServerBeanPredicate = entry ->
357+
GemFireInitializationSafetyConfiguration.this.cacheServerBeanNames.contains(entry.getKey());
358+
359+
cacheServerBeans.entrySet().stream()
360+
.filter(cacheServerBeanPredicate)
361+
.map(Map.Entry::getValue)
362+
.forEach(cacheServerStarter);
363+
}
364+
290365
@Order(Ordered.HIGHEST_PRECEDENCE + 1_000_000)
291366
@EventListener(StartableGemFireComponentsNotificationApplicationEvent.class)
292367
public void startGatewayReceiversEventListener(StartableGemFireComponentsNotificationApplicationEvent event) {
@@ -326,6 +401,10 @@ public boolean isCacheCallbackInitializationSafetyEnabled() {
326401
return Boolean.TRUE.equals(this.enableCacheCallbackSafety);
327402
}
328403

404+
public boolean isCacheServerInitializationSafetyEnabled() {
405+
return Boolean.TRUE.equals(this.enableCacheServerSafety);
406+
}
407+
329408
public boolean isGatewayInitializationSafetyEnabled() {
330409
return Boolean.TRUE.equals(this.enableGatewaySafety);
331410
}
@@ -334,6 +413,10 @@ protected void setEnableCacheCallbackSafety(@Nullable Boolean enableCacheCallbac
334413
this.enableCacheCallbackSafety = enableCacheCallbackSafety;
335414
}
336415

416+
protected void setEnableCacheServerSafety(@Nullable Boolean enableCacheServerSafety) {
417+
this.enableCacheServerSafety = enableCacheServerSafety;
418+
}
419+
337420
protected void setEnableGatewaySafety(@Nullable Boolean enableGatewaySafety) {
338421
this.enableGatewaySafety = enableGatewaySafety;
339422
}

0 commit comments

Comments
 (0)