Skip to content

Commit 1bed415

Browse files
committed
Added error handling for connection failure
Instead of always throwing `ClientException` for connection failure we throw a more specific, `ConnectionFailureException` so that the driver could better recover from failures.
1 parent d353ef7 commit 1bed415

File tree

12 files changed

+165
-39
lines changed

12 files changed

+165
-39
lines changed

driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@
2626
import org.neo4j.driver.internal.net.pooling.PoolSettings;
2727
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
2828
import org.neo4j.driver.internal.security.SecurityPlan;
29+
import org.neo4j.driver.internal.spi.Connection;
2930
import org.neo4j.driver.internal.spi.ConnectionPool;
31+
import org.neo4j.driver.internal.util.Consumer;
3032
import org.neo4j.driver.v1.Logging;
3133
import org.neo4j.driver.v1.Record;
3234
import org.neo4j.driver.v1.Session;
3335
import org.neo4j.driver.v1.StatementResult;
3436
import org.neo4j.driver.v1.exceptions.ClientException;
35-
import org.neo4j.driver.v1.util.Function;
37+
import org.neo4j.driver.v1.exceptions.ClusterUnavailableException;
38+
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
3639

3740
import static java.lang.String.format;
3841

@@ -55,18 +58,29 @@ void discover()
5558
final List<BoltServerAddress> newServers = new LinkedList<>( );
5659
try
5760
{
58-
call( DISCOVER_MEMBERS, new Function<Record, Integer>()
61+
boolean success = false;
62+
while ( !servers.isEmpty() && !success )
5963
{
60-
@Override
61-
public Integer apply( Record record )
64+
success = call( DISCOVER_MEMBERS, new Consumer<Record>()
6265
{
63-
newServers.add( new BoltServerAddress( record.get( "address" ).asString() ) );
64-
return 0;
65-
}
66-
} );
67-
this.servers.clear();
68-
this.servers.addAll( newServers );
69-
log.debug( "~~ [MEMBERS] -> %s", newServers );
66+
@Override
67+
public void accept( Record record )
68+
{
69+
newServers.add( new BoltServerAddress( record.get( "address" ).asString() ) );
70+
}
71+
} );
72+
73+
}
74+
if ( success )
75+
{
76+
this.servers.clear();
77+
this.servers.addAll( newServers );
78+
log.debug( "~~ [MEMBERS] -> %s", newServers );
79+
}
80+
else
81+
{
82+
throw new ClusterUnavailableException( "Run out of servers" );
83+
}
7084
}
7185
catch ( ClientException ex )
7286
{
@@ -81,16 +95,31 @@ public Integer apply( Record record )
8195
}
8296
}
8397

84-
void call( String procedureName, Function<Record, Integer> recorder )
98+
private boolean call( String procedureName, Consumer<Record> recorder )
8599
{
86-
try ( Session session = new NetworkSession( connections.acquire( randomServer() ), log ) )
100+
101+
BoltServerAddress address = randomServer();
102+
Connection acquire = connections.acquire( address );
103+
try ( Session session = new NetworkSession( acquire, log ) )
87104
{
88105
StatementResult records = session.run( format( "CALL %s", procedureName ) );
89106
while ( records.hasNext() )
90107
{
91-
recorder.apply( records.next() );
108+
recorder.accept( records.next() );
92109
}
93110
}
111+
catch ( ConnectionFailureException e )
112+
{
113+
forget(address );
114+
return false;
115+
}
116+
return true;
117+
}
118+
119+
private void forget(BoltServerAddress address)
120+
{
121+
servers.remove( address );
122+
connections.purge(address);
94123
}
95124

96125
@Override

driver/src/main/java/org/neo4j/driver/internal/net/BufferingChunkedInput.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.neo4j.driver.internal.packstream.PackInput;
2828
import org.neo4j.driver.internal.util.BytePrinter;
2929
import org.neo4j.driver.v1.exceptions.ClientException;
30+
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
3031

3132
import static java.lang.Math.min;
3233

@@ -408,15 +409,15 @@ private static void readNextPacket( ReadableByteChannel channel, ByteBuffer buff
408409
int read = channel.read( buffer );
409410
if ( read == -1 )
410411
{
411-
throw new ClientException(
412+
throw new ConnectionFailureException(
412413
"Connection terminated while receiving data. This can happen due to network " +
413-
"instabilities, or due to restarts of the database." );
414+
"instabilities, or due to restarts of the database.");
414415
}
415416
buffer.flip();
416417
}
417418
catch ( ClosedByInterruptException e )
418419
{
419-
throw new ClientException(
420+
throw new ConnectionFailureException(
420421
"Connection to the database was lost because someone called `interrupt()` on the driver " +
421422
"thread waiting for a reply. " +
422423
"This normally happens because the JVM is shutting down, but it can also happen because your " +

driver/src/main/java/org/neo4j/driver/internal/net/SocketClient.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.neo4j.driver.internal.util.BytePrinter;
3535
import org.neo4j.driver.v1.Logger;
3636
import org.neo4j.driver.v1.exceptions.ClientException;
37+
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
3738

3839
import static java.lang.String.format;
3940
import static java.nio.ByteOrder.BIG_ENDIAN;
@@ -76,7 +77,7 @@ void blockingRead( ByteBuffer buf ) throws IOException
7677
if (channel.read( buf ) < 0)
7778
{
7879
String bufStr = BytePrinter.hex( buf ).trim();
79-
throw new ClientException( format(
80+
throw new ConnectionFailureException( format(
8081
"Connection terminated while receiving data. This can happen due to network " +
8182
"instabilities, or due to restarts of the database. Expected %s bytes, received %s.",
8283
buf.limit(), bufStr.isEmpty() ? "none" : bufStr ) );
@@ -91,7 +92,7 @@ void blockingWrite( ByteBuffer buf ) throws IOException
9192
if (channel.write( buf ) < 0)
9293
{
9394
String bufStr = BytePrinter.hex( buf ).trim();
94-
throw new ClientException( format(
95+
throw new ConnectionFailureException( format(
9596
"Connection terminated while sending data. This can happen due to network " +
9697
"instabilities, or due to restarts of the database. Expected %s bytes, wrote %s.",
9798
buf.limit(), bufStr.isEmpty() ? "none" :bufStr ) );
@@ -111,7 +112,7 @@ public void start()
111112
}
112113
catch ( ConnectException e )
113114
{
114-
throw new ClientException( format(
115+
throw new ConnectionFailureException( format(
115116
"Unable to connect to %s, ensure the database is running and that there is a " +
116117
"working network connection to it.", address ) );
117118
}
@@ -182,6 +183,7 @@ public void stop()
182183
}
183184
catch ( IOException e )
184185
{
186+
//noinspection StatementWithEmptyBody
185187
if ( e.getMessage().equals( "An existing connection was forcibly closed by the remote host" ) )
186188
{
187189
// Swallow this exception as it is caused by connection already closed by server

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,24 @@ private BlockingQueue<PooledConnection> pool( BoltServerAddress address )
135135
return pool;
136136
}
137137

138+
@Override
139+
public void purge( BoltServerAddress address )
140+
{
141+
BlockingQueue<PooledConnection> connections = pools.get( address );
142+
if ( connections == null )
143+
{
144+
return;
145+
}
146+
while (!connections.isEmpty())
147+
{
148+
PooledConnection connection = connections.poll();
149+
if ( connection != null)
150+
{
151+
connection.dispose();
152+
}
153+
}
154+
}
155+
138156
@Override
139157
public void close()
140158
{

driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,13 @@ public interface ConnectionPool extends AutoCloseable
2727
* Acquire a connection - if a live connection exists in the pool, it will
2828
* be used, otherwise a new connection will be created.
2929
*
30-
* @param address
30+
* @param address The address to acquire
3131
*/
3232
Connection acquire( BoltServerAddress address );
3333

34+
/**
35+
* Removes all connections to a given address from the pool.
36+
* @param address The address to remove.
37+
*/
38+
void purge( BoltServerAddress address );
3439
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.v1.exceptions;
20+
21+
/**
22+
* An <em>ClusterUnavailableException</em> indicates that the driver cannot communicate with the cluster.
23+
* @since 1.1
24+
*/
25+
public class ClusterUnavailableException extends Neo4jException
26+
{
27+
public ClusterUnavailableException( String message )
28+
{
29+
super( message );
30+
}
31+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.v1.exceptions;
20+
21+
/**
22+
* A <em>ConnectionFailureException</em> indicates that there is a problem within the underlying connection, probably
23+
* been terminated.
24+
* @since 1.1
25+
*/
26+
public class ConnectionFailureException extends Neo4jException
27+
{
28+
public ConnectionFailureException( String message )
29+
{
30+
super( message );
31+
}
32+
}

driver/src/test/java/org/neo4j/driver/internal/net/BufferingChunkedInputTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Arrays;
3232

3333
import org.neo4j.driver.v1.exceptions.ClientException;
34+
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
3435
import org.neo4j.driver.v1.util.RecordingByteChannel;
3536

3637
import static org.hamcrest.CoreMatchers.equalTo;
@@ -484,7 +485,7 @@ public void shouldFailNicelyOnClosedConnections() throws IOException
484485
BufferingChunkedInput input = new BufferingChunkedInput( channel );
485486

486487
//Expect
487-
exception.expect( ClientException.class );
488+
exception.expect( ConnectionFailureException.class );
488489
exception.expectMessage( "Connection terminated while receiving data. This can happen due to network " +
489490
"instabilities, or due to restarts of the database." );
490491
// When

driver/src/test/java/org/neo4j/driver/internal/net/SocketClientTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,22 @@
1818
*/
1919
package org.neo4j.driver.internal.net;
2020

21+
import org.junit.Ignore;
22+
import org.junit.Rule;
23+
import org.junit.Test;
24+
import org.junit.rules.ExpectedException;
25+
2126
import java.io.IOException;
2227
import java.net.ServerSocket;
2328
import java.nio.ByteBuffer;
2429
import java.nio.channels.ByteChannel;
2530
import java.util.ArrayList;
2631
import java.util.List;
2732

28-
import org.junit.Ignore;
29-
import org.junit.Rule;
30-
import org.junit.Test;
31-
import org.junit.rules.ExpectedException;
32-
3333
import org.neo4j.driver.internal.logging.DevNullLogger;
3434
import org.neo4j.driver.internal.security.SecurityPlan;
3535
import org.neo4j.driver.v1.exceptions.ClientException;
36+
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
3637

3738
import static org.hamcrest.CoreMatchers.equalTo;
3839
import static org.hamcrest.MatcherAssert.assertThat;
@@ -100,7 +101,7 @@ public void shouldFailIfConnectionFailsWhileReading() throws IOException
100101
SocketClient client = dummyClient();
101102

102103
//Expect
103-
exception.expect( ClientException.class );
104+
exception.expect( ConnectionFailureException.class );
104105
exception.expectMessage( "Expected 4 bytes, received none" );
105106

106107
// When
@@ -138,7 +139,7 @@ public void shouldFailIfConnectionFailsWhileWriting() throws IOException
138139
SocketClient client = dummyClient();
139140

140141
//Expect
141-
exception.expect( ClientException.class );
142+
exception.expect( ConnectionFailureException.class );
142143
exception.expectMessage( "Expected 4 bytes, wrote 00" );
143144

144145
// When

driver/src/test/java/org/neo4j/driver/v1/integration/ErrorIT.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.neo4j.driver.v1.StatementResult;
3232
import org.neo4j.driver.v1.Transaction;
3333
import org.neo4j.driver.v1.exceptions.ClientException;
34+
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
3435
import org.neo4j.driver.v1.util.TestNeo4jSession;
3536

3637
import static org.hamcrest.CoreMatchers.equalTo;
@@ -63,7 +64,7 @@ public void shouldNotAllowMoreTxAfterClientException() throws Throwable
6364
Transaction tx = session.beginTransaction();
6465

6566
// And Given an error has occurred
66-
try { tx.run( "invalid" ).consume(); } catch ( ClientException e ) {}
67+
try { tx.run( "invalid" ).consume(); } catch ( ClientException e ) {/*empty*/}
6768

6869
// Expect
6970
exception.expect( ClientException.class );
@@ -79,7 +80,7 @@ public void shouldNotAllowMoreTxAfterClientException() throws Throwable
7980
public void shouldAllowNewStatementAfterRecoverableError() throws Throwable
8081
{
8182
// Given an error has occurred
82-
try { session.run( "invalid" ).consume(); } catch ( ClientException e ) {}
83+
try { session.run( "invalid" ).consume(); } catch ( ClientException e ) {/*empty*/}
8384

8485
// When
8586
StatementResult cursor = session.run( "RETURN 1" );
@@ -97,7 +98,7 @@ public void shouldAllowNewTransactionAfterRecoverableError() throws Throwable
9798
{
9899
tx.run( "invalid" ).consume();
99100
}
100-
catch ( ClientException e ) {}
101+
catch ( ClientException e ) {/*empty*/}
101102

102103
// When
103104
try ( Transaction tx = session.beginTransaction() )
@@ -114,13 +115,14 @@ public void shouldAllowNewTransactionAfterRecoverableError() throws Throwable
114115
public void shouldExplainConnectionError() throws Throwable
115116
{
116117
// Expect
117-
exception.expect( ClientException.class );
118+
exception.expect( ConnectionFailureException.class );
118119
exception.expectMessage( "Unable to connect to localhost:7777, ensure the database is running " +
119120
"and that there is a working network connection to it." );
120121

121122
// When
123+
//noinspection EmptyTryBlock
122124
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:7777" );
123-
Session session = driver.session()) {}
125+
Session ignore = driver.session()) {/*empty*/}
124126
}
125127

126128
@Test
@@ -165,7 +167,8 @@ public void shouldGetHelpfulErrorWhenTryingToConnectToHttpPort() throws Throwabl
165167
"(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)" );
166168

167169
// When
168-
try(Session session = driver.session() ){}
170+
//noinspection EmptyTryBlock
171+
try(Session ignore = driver.session() ){/*empty*/}
169172
}
170173
}
171174

0 commit comments

Comments
 (0)