Skip to content

Initial version with TX, Context Propagation and Observation #28156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions spring-context-support/spring-context-support.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ dependencies {
api(project(":spring-beans"))
api(project(":spring-context"))
api(project(":spring-core"))
api("io.micrometer:micrometer-observation")
optional(project(":spring-jdbc")) // for Quartz support
optional(project(":spring-tx")) // for Quartz support
optional("jakarta.activation:jakarta.activation-api")
Expand All @@ -26,4 +27,5 @@ dependencies {
testFixturesApi("org.junit.jupiter:junit-jupiter-api")
testFixturesImplementation("org.assertj:assertj-core")
testFixturesImplementation("org.mockito:mockito-core")
testImplementation("io.micrometer:micrometer-tracing-integration-test:1.0.2-SNAPSHOT")
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;

import io.micrometer.observation.ObservationRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.quartz.Calendar;
Expand All @@ -43,6 +44,10 @@
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionObservationConvention;
import org.springframework.transaction.support.ObservationPlatformTransactionManager;
import org.springframework.transaction.support.TransactionObservationContext;
import org.springframework.transaction.support.TransactionObservationConvention;

/**
* Common base class for accessing a Quartz Scheduler, i.e. for registering jobs,
Expand Down Expand Up @@ -90,6 +95,10 @@ public abstract class SchedulerAccessor implements ResourceLoaderAware {
@Nullable
protected ResourceLoader resourceLoader;

private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;

private TransactionObservationConvention observationConvention = new DefaultTransactionObservationConvention();


/**
* Set whether any jobs defined on this SchedulerFactoryBean should overwrite
Expand Down Expand Up @@ -199,14 +208,27 @@ public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}

public void setObservationRegistry(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

public void setObservationConvention(TransactionObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}

/**
* Register jobs and triggers (within a transaction, if possible).
*/
protected void registerJobsAndTriggers() throws SchedulerException {
TransactionStatus transactionStatus = null;
if (this.transactionManager != null) {
transactionStatus = this.transactionManager.getTransaction(TransactionDefinition.withDefaults());
TransactionDefinition definition = TransactionDefinition.withDefaults();
PlatformTransactionManager manager = this.transactionManager;
if (manager != null) {
if (!this.observationRegistry.isNoop()) {
TransactionObservationContext context = new TransactionObservationContext(definition, manager);
manager = new ObservationPlatformTransactionManager(manager, this.observationRegistry, context, this.observationConvention);
}
transactionStatus = manager.getTransaction(definition);
}

try {
Expand Down Expand Up @@ -249,7 +271,7 @@ protected void registerJobsAndTriggers() throws SchedulerException {
catch (Throwable ex) {
if (transactionStatus != null) {
try {
this.transactionManager.rollback(transactionStatus);
manager.rollback(transactionStatus);
}
catch (TransactionException tex) {
logger.error("Job registration exception overridden by rollback exception", ex);
Expand All @@ -266,7 +288,7 @@ protected void registerJobsAndTriggers() throws SchedulerException {
}

if (transactionStatus != null) {
this.transactionManager.commit(transactionStatus);
manager.commit(transactionStatus);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.scheduling.quartz;

import java.util.HashMap;
import java.util.Map;

import io.micrometer.observation.ObservationRegistry;
import org.mockito.BDDMockito;
import org.quartz.Scheduler;
import org.quartz.SchedulerContext;
import org.quartz.SchedulerFactory;

import org.springframework.beans.testfixture.beans.TestBean;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.transaction.testfixture.CallCountingTransactionManager;
import org.springframework.transaction.testfixture.ObservationTransactionManagerSampleTestRunner;

import static org.mockito.Mockito.mock;

/**
* @author Marcin Grzejszczak
*/
class ObservationSchedulerAccessorTests extends ObservationTransactionManagerSampleTestRunner<SchedulerFactoryBean> {

@Override
protected SchedulerFactoryBean given(ObservationRegistry observationRegistry) throws Exception {
TestBean tb = new TestBean("tb", 99);
StaticApplicationContext ac = new StaticApplicationContext();
final Scheduler scheduler = mock(Scheduler.class);
SchedulerContext schedulerContext = new SchedulerContext();
BDDMockito.given(scheduler.getContext()).willReturn(schedulerContext);
return schedulerFactoryBean(observationRegistry, tb, ac, scheduler);
}

private SchedulerFactoryBean schedulerFactoryBean(ObservationRegistry observationRegistry, TestBean tb, StaticApplicationContext ac, Scheduler scheduler) {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean() {
@Override
protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName) {
return scheduler;
}
};
schedulerFactoryBean.setJobFactory(null);
Map<String, Object> schedulerContextMap = new HashMap<>();
schedulerContextMap.put("testBean", tb);
schedulerFactoryBean.setSchedulerContextAsMap(schedulerContextMap);
schedulerFactoryBean.setApplicationContext(ac);
schedulerFactoryBean.setApplicationContextSchedulerContextKey("appCtx");
schedulerFactoryBean.setTransactionManager(new CallCountingTransactionManager());
schedulerFactoryBean.setObservationRegistry(observationRegistry);
return schedulerFactoryBean;
}

@Override
protected void when(SchedulerFactoryBean sut) throws Exception {
sut.afterPropertiesSet();
sut.start();
}
}
1 change: 1 addition & 0 deletions spring-jms/spring-jms.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dependencies {
api(project(":spring-core"))
api(project(":spring-messaging"))
api(project(":spring-tx"))
api("io.micrometer:micrometer-observation")
compileOnly("jakarta.jms:jakarta.jms-api")
optional(project(":spring-aop"))
optional(project(":spring-context"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.jms.listener;

import io.micrometer.observation.ObservationRegistry;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
Expand All @@ -32,7 +33,11 @@
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.DefaultTransactionObservationConvention;
import org.springframework.transaction.support.ObservationPlatformTransactionManager;
import org.springframework.transaction.support.ResourceTransactionManager;
import org.springframework.transaction.support.TransactionObservationContext;
import org.springframework.transaction.support.TransactionObservationConvention;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionSynchronizationUtils;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -95,6 +100,10 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe

private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;

private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;

private TransactionObservationConvention observationConvention = new DefaultTransactionObservationConvention();


@Override
public void setSessionTransacted(boolean sessionTransacted) {
Expand Down Expand Up @@ -183,6 +192,13 @@ protected long getReceiveTimeout() {
return this.receiveTimeout;
}

public void setObservationRegistry(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

public void setObservationConvention(TransactionObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}

@Override
public void initialize() {
Expand Down Expand Up @@ -238,18 +254,23 @@ protected boolean receiveAndExecute(
throws JMSException {

if (this.transactionManager != null) {
PlatformTransactionManager manager = this.transactionManager;
if (!this.observationRegistry.isNoop()) {
TransactionObservationContext context = new TransactionObservationContext(this.transactionDefinition, this.transactionManager);
manager = new ObservationPlatformTransactionManager(this.transactionManager, this.observationRegistry, context, this.observationConvention);
}
// Execute receive within transaction.
TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
TransactionStatus status = manager.getTransaction(this.transactionDefinition);
boolean messageReceived;
try {
messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
}
catch (JMSException | RuntimeException | Error ex) {
rollbackOnException(this.transactionManager, status, ex);
rollbackOnException(manager, status, ex);
throw ex;
}
try {
this.transactionManager.commit(status);
manager.commit(status);
}
catch (TransactionException ex) {
// Propagate transaction system exceptions as infrastructure problems.
Expand Down
5 changes: 5 additions & 0 deletions spring-tx/spring-tx.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ apply plugin: "kotlin"
dependencies {
api(project(":spring-beans"))
api(project(":spring-core"))
api("io.micrometer:micrometer-observation")
optional(project(":spring-aop"))
optional(project(":spring-context")) // for JCA, @EnableTransactionManagement
optional("jakarta.ejb:jakarta.ejb-api")
Expand All @@ -17,6 +18,7 @@ dependencies {
optional("org.jetbrains.kotlin:kotlin-stdlib")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-core")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
optional("io.micrometer:context-propagation")
testImplementation(project(":spring-core-test"))
testImplementation(testFixtures(project(":spring-beans")))
testImplementation(testFixtures(project(":spring-context")))
Expand All @@ -25,4 +27,7 @@ dependencies {
testImplementation("org.apache.groovy:groovy")
testImplementation("jakarta.persistence:jakarta.persistence-api")
testImplementation("io.projectreactor:reactor-test")
optional("io.micrometer:micrometer-tracing-integration-test:1.0.2-SNAPSHOT") // I don't know how to make this be in tests and testFixtures
optional("io.micrometer:micrometer-core") // I don't know how to make this be in tests and testFixtures
testImplementation("io.micrometer:micrometer-observation-test")
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;

import io.micrometer.observation.ObservationRegistry;
import io.vavr.control.Try;
import kotlin.coroutines.Continuation;
import kotlinx.coroutines.reactive.AwaitKt;
Expand Down Expand Up @@ -50,6 +51,10 @@
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.reactive.TransactionContextManager;
import org.springframework.transaction.support.CallbackPreferringPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionObservationConvention;
import org.springframework.transaction.support.ObservationPlatformTransactionManager;
import org.springframework.transaction.support.TransactionObservationContext;
import org.springframework.transaction.support.TransactionObservationConvention;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ConcurrentReferenceHashMap;
Expand Down Expand Up @@ -179,6 +184,10 @@ public static TransactionStatus currentTransactionStatus() throws NoTransactionE
@Nullable
private BeanFactory beanFactory;

private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;

private TransactionObservationConvention observationConvention = new DefaultTransactionObservationConvention();

private final ConcurrentMap<Object, TransactionManager> transactionManagerCache =
new ConcurrentReferenceHashMap<>(4);

Expand Down Expand Up @@ -303,6 +312,14 @@ protected final BeanFactory getBeanFactory() {
return this.beanFactory;
}

public void setObservationRegistry(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

public void setObservationConvention(TransactionObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}

/**
* Check that required properties were set.
*/
Expand Down Expand Up @@ -592,6 +609,10 @@ public String getName() {
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
if (this.transactionManager instanceof PlatformTransactionManager && !this.observationRegistry.isNoop()) {
TransactionObservationContext context = new TransactionObservationContext(txAttr, this.transactionManager);
tm = new ObservationPlatformTransactionManager((PlatformTransactionManager) this.transactionManager, this.observationRegistry, context, this.observationConvention);
}
status = tm.getTransaction(txAttr);
}
else {
Expand Down
Loading