Skip to content

Commit 48fabc2

Browse files
committed
Combined error handling in single object
1 parent a7f1188 commit 48fabc2

File tree

4 files changed

+54
-32
lines changed

4 files changed

+54
-32
lines changed

driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -198,21 +198,20 @@ public Session session()
198198
public Session session( final AccessMode mode )
199199
{
200200
return new ClusteredNetworkSession( acquireConnection( mode ),
201-
new Consumer<BoltServerAddress>()
201+
new ClusteredErrorHandler()
202202
{
203203
@Override
204-
public void accept( BoltServerAddress address )
204+
public void onConnectionFailure( BoltServerAddress address )
205205
{
206206
forget( address );
207207
}
208-
}, new Consumer<BoltServerAddress>()
209-
{
210-
@Override
211-
public void accept( BoltServerAddress address )
212-
{
213-
writeServers.remove( address );
214-
}
215-
},
208+
209+
@Override
210+
public void onWriteFailure( BoltServerAddress address )
211+
{
212+
writeServers.remove( address );
213+
}
214+
},
216215
log );
217216
}
218217

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal;
20+
21+
import org.neo4j.driver.internal.net.BoltServerAddress;
22+
23+
/**
24+
* Interface used for tracking errors when connected to a cluster.
25+
*/
26+
interface ClusteredErrorHandler
27+
{
28+
void onConnectionFailure( BoltServerAddress address );
29+
30+
void onWriteFailure( BoltServerAddress address );
31+
}

driver/src/main/java/org/neo4j/driver/internal/ClusteredNetworkSession.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.neo4j.driver.internal.net.BoltServerAddress;
2323
import org.neo4j.driver.internal.spi.Connection;
24-
import org.neo4j.driver.internal.util.Consumer;
2524
import org.neo4j.driver.v1.Logger;
2625
import org.neo4j.driver.v1.Statement;
2726
import org.neo4j.driver.v1.StatementResult;
@@ -31,36 +30,32 @@
3130

3231
public class ClusteredNetworkSession extends NetworkSession
3332
{
34-
private final Consumer<BoltServerAddress> onFailedConnection;
35-
private final Consumer<BoltServerAddress> onFailedWrite;
33+
private final ClusteredErrorHandler onError;
3634

37-
//TODO combine failure handling
3835
ClusteredNetworkSession( Connection connection,
39-
Consumer<BoltServerAddress> onFailedConnection,
40-
Consumer<BoltServerAddress> onFailedWrite, Logger logger )
36+
ClusteredErrorHandler onError, Logger logger )
4137
{
4238
super( connection, logger );
43-
this.onFailedConnection = onFailedConnection;
44-
this.onFailedWrite = onFailedWrite;
39+
this.onError = onError;
4540
}
4641

4742
@Override
4843
public StatementResult run( Statement statement )
4944
{
5045
try
5146
{
52-
return new ClusteredStatementResult( super.run( statement ), connection.address(), onFailedConnection, onFailedWrite);
47+
return new ClusteredStatementResult( super.run( statement ), connection.address(), onError );
5348
}
5449
catch ( ConnectionFailureException e )
5550
{
56-
onFailedConnection.accept( connection.address() );
51+
onError.onConnectionFailure( connection.address() );
5752
throw new SessionExpiredException( "Failed to perform write load to server", e );
5853
}
5954
catch ( ClientException e )
6055
{
6156
if ( e.code().equals( "Neo.ClientError.General.ForbiddenOnFollower" ) )
6257
{
63-
onFailedWrite.accept( connection.address() );
58+
onError.onWriteFailure( connection.address() );
6459
throw new SessionExpiredException(
6560
String.format( "Server at %s no longer accepts writes", connection.address().toString() ) );
6661
}
@@ -81,8 +76,9 @@ public void close()
8176
catch ( ConnectionFailureException e )
8277
{
8378
BoltServerAddress address = connection.address();
84-
onFailedConnection.accept( address );
85-
throw new SessionExpiredException( String.format( "Server at %s is no longer available", address.toString()), e);
79+
onError.onConnectionFailure( address );
80+
throw new SessionExpiredException(
81+
String.format( "Server at %s is no longer available", address.toString() ), e );
8682
}
8783
}
8884
}

driver/src/main/java/org/neo4j/driver/internal/ClusteredStatementResult.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.List;
2222

2323
import org.neo4j.driver.internal.net.BoltServerAddress;
24-
import org.neo4j.driver.internal.util.Consumer;
2524
import org.neo4j.driver.v1.Record;
2625
import org.neo4j.driver.v1.StatementResult;
2726
import org.neo4j.driver.v1.exceptions.ClientException;
@@ -35,15 +34,13 @@ public class ClusteredStatementResult implements StatementResult
3534
{
3635
private final StatementResult delegate;
3736
private final BoltServerAddress address;
38-
private final Consumer<BoltServerAddress> onFailedConnection;
39-
private final Consumer<BoltServerAddress> onFailedWrite;
37+
private final ClusteredErrorHandler onError;
4038

41-
ClusteredStatementResult( StatementResult delegate, BoltServerAddress address, Consumer<BoltServerAddress> onFailedConnection, Consumer<BoltServerAddress> onFailedWrite)
39+
ClusteredStatementResult( StatementResult delegate, BoltServerAddress address, ClusteredErrorHandler onError )
4240
{
4341
this.delegate = delegate;
4442
this.address = address;
45-
this.onFailedConnection = onFailedConnection;
46-
this.onFailedWrite = onFailedWrite;
43+
this.onError = onError;
4744
}
4845

4946
@Override
@@ -241,14 +238,13 @@ public ResultSummary consume()
241238

242239
private SessionExpiredException sessionExpired( ConnectionFailureException e )
243240
{
244-
onFailedConnection.accept( address );
241+
onError.onConnectionFailure( address );
245242
return new SessionExpiredException( String.format( "Server at %s is no longer available", address.toString()), e);
246243
}
247244

248245
private SessionExpiredException failedWrite()
249246
{
250-
251-
onFailedWrite.accept( address );
247+
onError.onWriteFailure( address );
252248
return new SessionExpiredException( String.format( "Server at %s no longer accepts writes", address.toString()));
253249
}
254250

0 commit comments

Comments
 (0)