diff --git a/src/main/java/io/vertx/core/streams/impl/PipeImpl.java b/src/main/java/io/vertx/core/streams/impl/PipeImpl.java index a9f0661c850..d4dc9ab5719 100644 --- a/src/main/java/io/vertx/core/streams/impl/PipeImpl.java +++ b/src/main/java/io/vertx/core/streams/impl/PipeImpl.java @@ -84,7 +84,6 @@ public void to(WriteStream ws, Handler> completionHandler) ws.drainHandler(drainHandler); } }); - src.resume(); result.future().onComplete(ar -> { try { src.handler(null); @@ -109,6 +108,7 @@ public void to(WriteStream ws, Handler> completionHandler) handleFailure(err, completionHandler); } }); + src.resume(); } private void handleSuccess(Handler> completionHandler) { diff --git a/src/test/java/io/vertx/test/fakestream/FakeStream.java b/src/test/java/io/vertx/test/fakestream/FakeStream.java index 77e13bb6fe3..67afd836ab6 100644 --- a/src/test/java/io/vertx/test/fakestream/FakeStream.java +++ b/src/test/java/io/vertx/test/fakestream/FakeStream.java @@ -33,6 +33,7 @@ public class FakeStream implements ReadStream, WriteStream { private static final Object END_SENTINEL = new Object(); + private static final Object ERR_SENTINEL = new Object(); static class Op { final T item; @@ -143,10 +144,21 @@ public void end(Handler> h) { } public synchronized void fail(Throwable err) { - Handler handler = exceptionHandler; - if (handler != null) { - exceptionHandler.handle(err); + synchronized(this) { + if (ended) { + throw new IllegalStateException(); + } + ended = true; + Promise promise = Promise.promise(); + promise.future().onComplete(ar -> { + Handler handler = exceptionHandler; + if (handler != null) { + handler.handle(err); + } + }); + pending.add(new Op<>((T)ERR_SENTINEL, promise)); } + checkPending(); } @Override @@ -178,6 +190,8 @@ private void checkPending() { } if (op.item == END_SENTINEL) { end.onComplete(op.ack); + } else if(op.item == ERR_SENTINEL) { + op.ack.complete(); } else { Handler handler = itemHandler; try {