Skip to content

Commit ad0ef9f

Browse files
authored
Merge pull request #1323 from bosch-io/bugfix/response-receiver-cache
Invalidate ResponseReceiver cache after a response is received.
2 parents ccf0de6 + 5fda13e commit ad0ef9f

File tree

6 files changed

+197
-34
lines changed

6 files changed

+197
-34
lines changed

concierge/service/src/main/java/org/eclipse/ditto/concierge/service/enforcement/LiveSignalEnforcement.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ private CompletionStage<Contextual<WithDittoHeaders>> returnCommandResponseConte
256256
final var receiver = responseReceiverEntry.get();
257257
log().info("Scheduling CommandResponse <{}> to original sender <{}>", liveResponse, receiver);
258258
commandResponseContextual = withMessageToReceiver(liveResponse, receiver);
259+
responseReceiverCache.invalidate(correlationId);
259260
} else {
260261
log().info("Got <{}> with unknown correlation ID: <{}>", liveResponse.getType(), correlationId);
261262
commandResponseContextual = withMessageToReceiver(null, null);
@@ -396,13 +397,16 @@ private <T extends Signal<?>, S extends T> CompletionStage<Contextual<WithDittoH
396397
final DistributedPub<T> pub) {
397398

398399
// using pub/sub to publish the command to any interested parties (e.g. a Websocket):
399-
log(signal).debug("Publish message to pub-sub: <{}>", signal);
400400

401401
if (enforcementConfig.shouldDispatchGlobally(signal)) {
402402
return responseReceiverCache.insertResponseReceiverConflictFree(signal,
403403
newSignal -> sender(),
404-
(newSignal, receiver) -> publishSignal(newSignal, ackExtractor, pub));
404+
(newSignal, receiver) -> {
405+
log(newSignal).debug("Publish message to pub-sub: <{}>", newSignal);
406+
return publishSignal(newSignal, ackExtractor, pub);
407+
});
405408
} else {
409+
log(signal).debug("Publish message to pub-sub: <{}>", signal);
406410
return CompletableFuture.completedStage(publishSignal(signal, ackExtractor, pub));
407411
}
408412
}

concierge/service/src/main/java/org/eclipse/ditto/concierge/service/enforcement/ResponseReceiverCache.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,22 @@ public CompletableFuture<Optional<ActorRef>> get(final CharSequence correlationI
158158
return cache.get(CorrelationIdKey.forCacheRetrieval(correlationIdString));
159159
}
160160

161+
/**
162+
* Invalidates the cached response receiver for the specified correlation ID argument.
163+
*
164+
* @param correlationId the correlation ID to invalidate the cached response receiver for.
165+
* @throws NullPointerException if {@code correlationId} is {@code null}.
166+
* @throws IllegalArgumentException if {@code correlationId} is empty or blank.
167+
*/
168+
public void invalidate(final CharSequence correlationId) {
169+
final var correlationIdString = checkNotNull(correlationId, "correlationId").toString();
170+
ConditionChecker.checkArgument(correlationIdString,
171+
Predicate.not(String::isBlank),
172+
() -> "The correlationId must not be blank.");
173+
174+
cache.invalidate(CorrelationIdKey.forCacheRetrieval(correlationIdString));
175+
}
176+
161177
/**
162178
* Insert a response receiver for a live or message command.
163179
*

concierge/service/src/test/java/org/eclipse/ditto/concierge/service/enforcement/LiveSignalEnforcementTest.java

Lines changed: 125 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
2727
import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
2828
import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType;
29+
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
2930
import org.eclipse.ditto.base.model.headers.DittoHeaders;
31+
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
3032
import org.eclipse.ditto.base.model.json.FieldType;
3133
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
3234
import org.eclipse.ditto.base.model.signals.events.Event;
@@ -119,7 +121,7 @@ public void rejectMessageCommandByPolicy() {
119121
mockEntitiesActorInstance.setReply(TestSetup.POLICY_SUDO, sudoRetrievePolicyResponse);
120122

121123
final ActorRef underTest = newEnforcerActor(getRef());
122-
underTest.tell(thingMessageCommand(), getRef());
124+
underTest.tell(thingMessageCommand("abc"), getRef());
123125
TestSetup.fishForMsgClass(this, MessageSendNotAllowedException.class);
124126
}};
125127
}
@@ -253,6 +255,118 @@ public void retrieveLiveThingCommandAndResponseByPolicy() {
253255
}};
254256
}
255257

258+
@Test
259+
public void correlationIdSameAfterResponseSuccessful() {
260+
final PolicyId policyId = PolicyId.of("policy", "id");
261+
final JsonObject thingWithPolicy = newThingWithPolicyId(policyId);
262+
final JsonObject policy = PoliciesModelFactory.newPolicyBuilder(policyId)
263+
.setRevision(1L)
264+
.forLabel("authorize-self")
265+
.setSubject(GOOGLE, SUBJECT_ID)
266+
.setGrantedPermissions(PoliciesResourceType.thingResource("/"),
267+
Permissions.newInstance(Permission.READ, Permission.WRITE))
268+
.setRevokedPermissions(PoliciesResourceType.thingResource("/features/x/properties/key2"),
269+
Permissions.newInstance(Permission.READ))
270+
.build()
271+
.toJson(FieldType.all());
272+
final SudoRetrieveThingResponse sudoRetrieveThingResponse =
273+
SudoRetrieveThingResponse.of(thingWithPolicy, DittoHeaders.empty());
274+
final SudoRetrievePolicyResponse sudoRetrievePolicyResponse =
275+
SudoRetrievePolicyResponse.of(policyId, policy, DittoHeaders.empty());
276+
277+
new TestKit(system) {{
278+
mockEntitiesActorInstance.setReply(TestSetup.THING_SUDO, sudoRetrieveThingResponse);
279+
mockEntitiesActorInstance.setReply(TestSetup.POLICY_SUDO, sudoRetrievePolicyResponse);
280+
281+
final ActorRef underTest = newEnforcerActor(getRef());
282+
283+
final DittoHeaders headers = headers();
284+
final ThingCommand<?> read = getRetrieveThingCommand(headers);
285+
286+
underTest.tell(read, getRef());
287+
288+
final var responseHeaders = headers.toBuilder()
289+
.authorizationContext(AuthorizationContext.newInstance(
290+
DittoAuthorizationContextType.PRE_AUTHENTICATED_CONNECTION,
291+
AuthorizationSubject.newInstance("myIssuer:mySubject")))
292+
.build();
293+
294+
final ThingCommandResponse<?> readResponse = getRetrieveThingResponse(responseHeaders);
295+
296+
// Second message right after the response for the first was sent, should have the same correlation-id (Not suffixed).
297+
underTest.tell(readResponse, getRef());
298+
final RetrieveThingResponse retrieveThingResponse =
299+
TestSetup.fishForMsgClass(this, RetrieveThingResponse.class);
300+
assertThat(retrieveThingResponse.getDittoHeaders().getCorrelationId()).isEqualTo(
301+
read.getDittoHeaders().getCorrelationId());
302+
303+
underTest.tell(read, getRef());
304+
305+
underTest.tell(readResponse, getRef());
306+
final RetrieveThingResponse retrieveThingResponse2 =
307+
TestSetup.fishForMsgClass(this, RetrieveThingResponse.class);
308+
assertThat(retrieveThingResponse2.getDittoHeaders().getCorrelationId()).isEqualTo(
309+
read.getDittoHeaders().getCorrelationId());
310+
}};
311+
}
312+
313+
@Test
314+
public void correlationIdDifferentInCaseOfConflict() {
315+
final PolicyId policyId = PolicyId.of("policy:id");
316+
final JsonObject thingWithPolicy = newThingWithPolicyId(policyId);
317+
final JsonObject policy = PoliciesModelFactory.newPolicyBuilder(policyId)
318+
.setRevision(1L)
319+
.forLabel("authorize-self")
320+
.setSubject(GOOGLE, SUBJECT_ID)
321+
.setGrantedPermissions(PoliciesResourceType.messageResource(JsonPointer.empty()),
322+
Permissions.newInstance(Permission.READ, Permission.WRITE))
323+
.build()
324+
.toJson(FieldType.all());
325+
final SudoRetrieveThingResponse sudoRetrieveThingResponse =
326+
SudoRetrieveThingResponse.of(thingWithPolicy, DittoHeaders.empty());
327+
final SudoRetrievePolicyResponse sudoRetrievePolicyResponse =
328+
SudoRetrievePolicyResponse.of(policyId, policy, DittoHeaders.empty());
329+
330+
new TestKit(system) {{
331+
mockEntitiesActorInstance.setReply(TestSetup.THING_SUDO, sudoRetrieveThingResponse);
332+
mockEntitiesActorInstance.setReply(TestSetup.POLICY_SUDO, sudoRetrievePolicyResponse);
333+
334+
final ActorRef underTest = newEnforcerActor(getRef());
335+
336+
final MessageCommand<?, ?> message = thingMessageCommand("abc");
337+
338+
underTest.tell(message, getRef());
339+
final DistributedPubSubMediator.Publish firstPublishRead =
340+
pubSubMediatorProbe.expectMsgClass(DistributedPubSubMediator.Publish.class);
341+
assertThat(firstPublishRead.topic()).isEqualTo(StreamingType.MESSAGES.getDistributedPubSubTopic());
342+
assertThat(firstPublishRead.msg()).isInstanceOf(MessageCommand.class);
343+
assertThat((CharSequence) ((WithEntityId) firstPublishRead.msg()).getEntityId()).isEqualTo(
344+
message.getEntityId());
345+
assertThat((CharSequence) ((WithDittoHeaders) firstPublishRead.msg()).getDittoHeaders()
346+
.getCorrelationId()
347+
.orElseThrow()).isEqualTo(
348+
message.getDittoHeaders().getCorrelationId().orElseThrow());
349+
350+
underTest.tell(message, getRef());
351+
final DistributedPubSubMediator.Publish secondPublishRead =
352+
pubSubMediatorProbe.expectMsgClass(DistributedPubSubMediator.Publish.class);
353+
assertThat(secondPublishRead.topic()).isEqualTo(StreamingType.MESSAGES.getDistributedPubSubTopic());
354+
assertThat(secondPublishRead.msg()).isInstanceOf(MessageCommand.class);
355+
assertThat((CharSequence) ((WithEntityId) secondPublishRead.msg()).getEntityId()).isEqualTo(
356+
message.getEntityId());
357+
// Assure second command has suffixed correlation-id, because of conflict with first command.
358+
assertThat((CharSequence) ((WithDittoHeaders) secondPublishRead.msg()).getDittoHeaders()
359+
.getCorrelationId()
360+
.orElseThrow()).startsWith(
361+
message.getDittoHeaders().getCorrelationId().orElseThrow());
362+
assertThat((CharSequence) ((WithDittoHeaders) secondPublishRead.msg()).getDittoHeaders()
363+
.getCorrelationId()
364+
.orElseThrow()).isNotEqualTo(
365+
message.getDittoHeaders().getCorrelationId().orElseThrow());
366+
367+
}};
368+
}
369+
256370
@Test
257371
public void acceptMessageCommandByPolicy() {
258372
final PolicyId policyId = PolicyId.of("policy:id");
@@ -276,7 +390,7 @@ public void acceptMessageCommandByPolicy() {
276390

277391
final ActorRef underTest = newEnforcerActor(getRef());
278392

279-
final MessageCommand<?, ?> msgCommand = thingMessageCommand();
393+
final MessageCommand<?, ?> msgCommand = thingMessageCommand("abc");
280394
mockEntitiesActorInstance.setReply(msgCommand);
281395
underTest.tell(msgCommand, getRef());
282396
final DistributedPubSubMediator.Publish publish =
@@ -419,11 +533,12 @@ private static ThingCommand<?> getModifyFeatureCommand(final DittoHeaders header
419533
return ModifyFeature.of(TestSetup.THING_ID, TestSetup.FEATURE, headers);
420534
}
421535

422-
private static MessageCommand<?, ?> thingMessageCommand() {
536+
private static MessageCommand<?, ?> thingMessageCommand(final String correlationId) {
423537
final Message<Object> message = Message.newBuilder(
424-
MessageBuilder.newHeadersBuilder(MessageDirection.TO, TestSetup.THING_ID, "my-subject")
425-
.contentType("text/plain")
426-
.build())
538+
MessageBuilder.newHeadersBuilder(MessageDirection.TO, TestSetup.THING_ID, "my-subject")
539+
.contentType("text/plain")
540+
.correlationId(correlationId)
541+
.build())
427542
.payload("Hello you!")
428543
.build();
429544
return SendThingMessage.of(TestSetup.THING_ID, message, headers());
@@ -441,10 +556,10 @@ private static ThingEvent<?> liveEventRevoked() {
441556

442557
private static MessageCommand<?, ?> featureMessageCommand() {
443558
final Message<?> message = Message.newBuilder(
444-
MessageBuilder.newHeadersBuilder(MessageDirection.TO, TestSetup.THING_ID, "my-subject")
445-
.contentType("text/plain")
446-
.featureId("foo")
447-
.build())
559+
MessageBuilder.newHeadersBuilder(MessageDirection.TO, TestSetup.THING_ID, "my-subject")
560+
.contentType("text/plain")
561+
.featureId("foo")
562+
.build())
448563
.payload("Hello you!")
449564
.build();
450565
return SendFeatureMessage.of(TestSetup.THING_ID, "foo", message, headers());

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessor.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
2727
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
2828
import org.eclipse.ditto.base.model.headers.DittoHeaders;
29+
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
2930
import org.eclipse.ditto.base.model.signals.Signal;
3031
import org.eclipse.ditto.connectivity.api.ExternalMessage;
3132
import org.eclipse.ditto.connectivity.api.ExternalMessageFactory;
@@ -184,17 +185,33 @@ private List<MappingOutcome<OutboundSignal.Mapped>> processMappableSignals(final
184185
.map(extra -> ProtocolFactory.setExtra(adaptableWithoutExtra, extra))
185186
.orElse(adaptableWithoutExtra);
186187

188+
final var adaptableWithInternalCorrelationId = mappableSignals.stream()
189+
.findFirst()
190+
.map(signal -> setInternalCorrelationIdToAdaptable(adaptable, signal.getSource()))
191+
.orElse(adaptable);
192+
187193
return timer.overall(() -> mappableSignals.stream()
188194
.flatMap(mappableSignal -> {
189195
final Signal<?> source = mappableSignal.getSource();
190196
final List<Target> targets = mappableSignal.getTargets();
191197
final List<MessageMapper> mappers = getMappers(mappableSignal.getPayloadMapping());
192-
logger.withCorrelationId(adaptable)
198+
logger.withCorrelationId(adaptableWithInternalCorrelationId)
193199
.debug("Resolved mappers for message {} to targets {}: {}", source, targets, mappers);
194200
// convert messages in the order of payload mapping and forward to result handler
195-
return mappers.stream().flatMap(mapper -> runMapper(mappableSignal, adaptable, mapper, timer));
201+
return mappers.stream()
202+
.flatMap(mapper -> runMapper(mappableSignal, adaptableWithInternalCorrelationId, mapper,
203+
timer));
196204
})
197-
.collect(Collectors.toList()));
205+
.toList());
206+
}
207+
208+
private static Adaptable setInternalCorrelationIdToAdaptable(final Adaptable adaptable,
209+
final WithDittoHeaders internalSignal) {
210+
final var optionalCorrelationId = internalSignal.getDittoHeaders().getCorrelationId();
211+
final Adaptable result;
212+
result = optionalCorrelationId.map(s -> adaptable.setDittoHeaders(
213+
adaptable.getDittoHeaders().toBuilder().correlationId(s).build())).orElse(adaptable);
214+
return result;
198215
}
199216

200217
private Stream<MappingOutcome<OutboundSignal.Mapped>> runMapper(final OutboundSignal.Mappable outboundSignal,

0 commit comments

Comments
 (0)