Skip to content
67 changes: 43 additions & 24 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -116,15 +117,15 @@ public EventBus send(String address, Object message) {
@Override
public EventBus send(String address, Object message, DeliveryOptions options) {
MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName());
sendOrPubInternal(msg, options, null, null);
sendOrPubInternal(msg, options, null);
return this;
}

@Override
public <T> Future<Message<T>> request(String address, Object message, DeliveryOptions options) {
MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName());
ReplyHandler<T> handler = createReplyHandler(msg, true, options);
sendOrPubInternal(msg, options, handler, null);
sendOrPubInternal(msg, options, handler);
return handler.result();
}

Expand Down Expand Up @@ -161,7 +162,7 @@ public EventBus publish(String address, Object message) {

@Override
public EventBus publish(String address, Object message, DeliveryOptions options) {
sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null, null);
sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null);
return this;
}

Expand Down Expand Up @@ -257,10 +258,18 @@ public MessageImpl createMessage(boolean send, String address, MultiMap headers,
return msg;
}

protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly, Promise<Void> promise) {
HandlerHolder<T> holder = addLocalRegistration(address, registration, replyHandler, localOnly);
onLocalRegistration(holder, promise);
return holder;
protected <T> Consumer<Promise<Void>> addRegistration(String address, HandlerRegistration<T> registration, boolean broadcast, boolean localOnly, Promise<Void> promise) {
HandlerHolder<T> holder = addLocalRegistration(address, registration, localOnly);
if (broadcast) {
onLocalRegistration(holder, promise);
} else {
if (promise != null) {
promise.complete();
}
}
return p -> {
removeRegistration(holder, broadcast, p);
};
}

protected <T> void onLocalRegistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
Expand All @@ -270,12 +279,12 @@ protected <T> void onLocalRegistration(HandlerHolder<T> handlerHolder, Promise<V
}

private <T> HandlerHolder<T> addLocalRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {
boolean localOnly) {
Objects.requireNonNull(address, "address");

ContextInternal context = registration.context;

HandlerHolder<T> holder = createHandlerHolder(registration, replyHandler, localOnly, context);
HandlerHolder<T> holder = createHandlerHolder(registration, localOnly, context);

ConcurrentCyclicSequence<HandlerHolder> handlers = new ConcurrentCyclicSequence<HandlerHolder>().add(holder);
ConcurrentCyclicSequence<HandlerHolder> actualHandlers = handlerMap.merge(
Expand All @@ -290,13 +299,17 @@ private <T> HandlerHolder<T> addLocalRegistration(String address, HandlerRegistr
return holder;
}

protected <T> HandlerHolder<T> createHandlerHolder(HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly, ContextInternal context) {
return new HandlerHolder<>(registration, replyHandler, localOnly, context);
protected <T> HandlerHolder<T> createHandlerHolder(HandlerRegistration<T> registration, boolean localOnly, ContextInternal context) {
return new HandlerHolder<>(registration, localOnly, context);
}

protected <T> void removeRegistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
protected <T> void removeRegistration(HandlerHolder<T> handlerHolder, boolean broadcast, Promise<Void> promise) {
removeLocalRegistration(handlerHolder);
onLocalUnregistration(handlerHolder, promise);
if (broadcast) {
onLocalUnregistration(handlerHolder, promise);
} else {
promise.complete();
}
}

protected <T> void onLocalUnregistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
Expand All @@ -321,20 +334,24 @@ protected <T> void sendReply(MessageImpl replyMessage, DeliveryOptions options,
if (replyMessage.address() == null) {
throw new IllegalStateException("address not specified");
} else {
sendOrPubInternal(new OutboundDeliveryContext<>(vertx.getOrCreateContext(), replyMessage, options, replyHandler, null));
sendOrPubInternal(new OutboundDeliveryContext<>(vertx.getOrCreateContext(), replyMessage, options, replyHandler));
}
}

protected <T> void sendOrPub(ContextInternal ctx, MessageImpl<?, T> message, DeliveryOptions options, Promise<Void> writePromise) {
sendLocally(message, writePromise);
}

protected <T> void sendOrPub(OutboundDeliveryContext<T> sendContext) {
sendLocally(sendContext);
sendOrPub(sendContext.ctx, sendContext.message, sendContext.options, sendContext);
}

private <T> void sendLocally(OutboundDeliveryContext<T> sendContext) {
ReplyException failure = deliverMessageLocally(sendContext.message);
protected <T> void sendLocally(MessageImpl<?, T> message, Promise<Void> writePromise) {
ReplyException failure = deliverMessageLocally(message);
if (failure != null) {
sendContext.written(failure);
writePromise.tryFail(failure);
} else {
sendContext.written(null);
writePromise.tryComplete();
}
}

Expand Down Expand Up @@ -403,8 +420,8 @@ <T> ReplyHandler<T> createReplyHandler(MessageImpl message,
}

public <T> OutboundDeliveryContext<T> newSendContext(MessageImpl message, DeliveryOptions options,
ReplyHandler<T> handler, Promise<Void> writePromise) {
return new OutboundDeliveryContext<>(vertx.getOrCreateContext(), message, options, handler, writePromise);
ReplyHandler<T> handler) {
return new OutboundDeliveryContext<>(vertx.getOrCreateContext(), message, options, handler);
}

public <T> void sendOrPubInternal(OutboundDeliveryContext<T> senderCtx) {
Expand All @@ -414,10 +431,12 @@ public <T> void sendOrPubInternal(OutboundDeliveryContext<T> senderCtx) {
senderCtx.next();
}

public <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
ReplyHandler<T> handler, Promise<Void> writePromise) {
public <T> Future<Void> sendOrPubInternal(MessageImpl message, DeliveryOptions options,
ReplyHandler<T> handler) {
checkStarted();
sendOrPubInternal(newSendContext(message, options, handler, writePromise));
OutboundDeliveryContext<T> ctx = newSendContext(message, options, handler);
sendOrPubInternal(ctx);
return ctx.writePromise.future();
}

private Future<Void> unregisterAll() {
Expand Down
8 changes: 1 addition & 7 deletions src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ public class HandlerHolder<T> {

public final ContextInternal context;
public final HandlerRegistration<T> handler;
public final boolean replyHandler;
public final boolean localOnly;
private boolean removed;

public HandlerHolder(HandlerRegistration<T> handler, boolean replyHandler, boolean localOnly, ContextInternal context) {
public HandlerHolder(HandlerRegistration<T> handler, boolean localOnly, ContextInternal context) {
this.context = context;
this.handler = handler;
this.replyHandler = replyHandler;
this.localOnly = localOnly;
}

Expand Down Expand Up @@ -76,10 +74,6 @@ public HandlerRegistration<T> getHandler() {
return handler;
}

public boolean isReplyHandler() {
return replyHandler;
}

public boolean isLocalOnly() {
return localOnly;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.tracing.TracingPolicy;

import java.util.function.Consumer;

public abstract class HandlerRegistration<T> implements Closeable {

public final ContextInternal context;
public final EventBusImpl bus;
public final String address;
public final boolean src;
private HandlerHolder<T> registered;
private Consumer<Promise<Void>> registered;
private Object metric;

HandlerRegistration(ContextInternal context,
Expand Down Expand Up @@ -56,13 +58,13 @@ void receive(MessageImpl msg) {

protected abstract void dispatch(Message<T> msg, ContextInternal context, Handler<Message<T>> handler);

synchronized void register(String repliedAddress, boolean localOnly, Promise<Void> promise) {
synchronized void register(boolean broadcast, boolean localOnly, Promise<Void> promise) {
if (registered != null) {
throw new IllegalStateException();
}
registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, promise);
registered = bus.addRegistration(address, this, broadcast, localOnly, promise);
if (bus.metrics != null) {
metric = bus.metrics.handlerRegistered(address, repliedAddress);
metric = bus.metrics.handlerRegistered(address);
}
}

Expand All @@ -74,7 +76,7 @@ public Future<Void> unregister() {
Promise<Void> promise = context.promise();
synchronized (this) {
if (registered != null) {
bus.removeRegistration(registered, promise);
registered.accept(promise);
registered = null;
if (bus.metrics != null) {
bus.metrics.handlerUnregistered(metric);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
registered = true;
Promise<Void> p = result;
Promise<Void> registration = context.promise();
register(null, localOnly, registration);
register(true, localOnly, registration);
registration.future().onComplete(ar -> {
if (ar.succeeded()) {
p.tryComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,8 @@ public synchronized MessageProducer<T> deliveryOptions(DeliveryOptions options)

@Override
public Future<Void> write(T body) {
Promise<Void> promise = ((VertxInternal)vertx).getOrCreateContext().promise();
write(body, promise);
return promise.future();
}

private void write(T data, Promise<Void> handler) {
MessageImpl msg = bus.createMessage(send, address, options.getHeaders(), data, options.getCodecName());
bus.sendOrPubInternal(msg, options, null, handler);
MessageImpl msg = bus.createMessage(send, address, options.getHeaders(), body, options.getCodecName());
return bus.sendOrPubInternal(msg, options, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package io.vertx.core.eventbus.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.DeliveryOptions;
Expand All @@ -25,31 +26,43 @@

import java.util.function.BiConsumer;

public class OutboundDeliveryContext<T> extends DeliveryContextBase<T> implements Handler<AsyncResult<Void>> {
public class OutboundDeliveryContext<T> extends DeliveryContextBase<T> implements Promise<Void> {

public final ContextInternal ctx;
public final DeliveryOptions options;
public final ReplyHandler<T> replyHandler;
private final Promise<Void> writePromise;
public final Promise<Void> writePromise;
private boolean src;

EventBusImpl bus;
EventBusMetrics metrics;

OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler<T> replyHandler, Promise<Void> writePromise) {
OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler<T> replyHandler) {
super(message, message.bus.outboundInterceptors(), ctx);
this.ctx = ctx;
this.options = options;
this.replyHandler = replyHandler;
this.writePromise = writePromise;
this.writePromise = ctx.promise();
}

@Override
public void handle(AsyncResult<Void> event) {
written(event.cause());
public boolean tryComplete(Void result) {
written(null);
return true;
}

public void written(Throwable failure) {
@Override
public boolean tryFail(Throwable cause) {
written(cause);
return false;
}

@Override
public Future<Void> future() {
throw new UnsupportedOperationException();
}

private void written(Throwable failure) {

// Metrics
if (metrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected boolean doReceive(Message<T> reply) {
}

void register() {
register(repliedAddress, true, null);
register(false, false, null);
}

@Override
Expand Down
Loading