Skip to content

Commit d85a610

Browse files
committed
Handle read sessions
For read session we do automatic retries since we know the state of the database.
1 parent d106218 commit d85a610

23 files changed

+524
-113
lines changed

driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,11 @@
1919

2020
package org.neo4j.driver.internal;
2121

22-
import java.util.Collections;
23-
import java.util.HashSet;
2422
import java.util.Set;
25-
import java.util.concurrent.ThreadLocalRandom;
2623

2724
import org.neo4j.driver.internal.net.BoltServerAddress;
2825
import org.neo4j.driver.internal.security.SecurityPlan;
26+
import org.neo4j.driver.internal.spi.ConnectionPool;
2927
import org.neo4j.driver.v1.Driver;
3028
import org.neo4j.driver.v1.Logger;
3129
import org.neo4j.driver.v1.Logging;
@@ -35,11 +33,12 @@ abstract class BaseDriver implements Driver
3533
{
3634
private final SecurityPlan securityPlan;
3735
protected final Logger log;
38-
protected final Set<BoltServerAddress> servers = new HashSet<>();
36+
protected final ConnectionPool connections;
3937

40-
BaseDriver( BoltServerAddress address, SecurityPlan securityPlan, Logging logging )
38+
BaseDriver( ConnectionPool connections, BoltServerAddress address, SecurityPlan securityPlan, Logging logging )
4139
{
42-
this.servers.add( address );
40+
this.connections = connections;
41+
this.connections.add( address );
4342
this.securityPlan = securityPlan;
4443
this.log = logging.getLog( Session.LOG_NAME );
4544
}
@@ -50,26 +49,9 @@ public boolean isEncrypted()
5049
return securityPlan.requiresEncryption();
5150
}
5251

52+
//Used for testing
5353
Set<BoltServerAddress> servers()
5454
{
55-
return Collections.unmodifiableSet( servers );
55+
return connections.addresses();
5656
}
57-
58-
//This is somewhat silly and has O(n) complexity
59-
protected BoltServerAddress randomServer()
60-
{
61-
ThreadLocalRandom random = ThreadLocalRandom.current();
62-
int item = random.nextInt(servers.size());
63-
int i = 0;
64-
for ( BoltServerAddress server : servers )
65-
{
66-
if (i == item)
67-
{
68-
return server;
69-
}
70-
}
71-
72-
throw new IllegalStateException( "This cannot happen" );
73-
}
74-
7557
}

driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java

Lines changed: 168 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,15 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
2019
package org.neo4j.driver.internal;
2120

22-
import java.util.LinkedList;
2321
import java.util.List;
2422

2523
import org.neo4j.driver.internal.net.BoltServerAddress;
2624
import org.neo4j.driver.internal.net.pooling.PoolSettings;
2725
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
2826
import org.neo4j.driver.internal.security.SecurityPlan;
2927
import org.neo4j.driver.internal.spi.Connection;
30-
import org.neo4j.driver.internal.spi.ConnectionPool;
3128
import org.neo4j.driver.internal.util.Consumer;
3229
import org.neo4j.driver.internal.util.Supplier;
3330
import org.neo4j.driver.v1.Logging;
@@ -36,61 +33,64 @@
3633
import org.neo4j.driver.v1.SessionMode;
3734
import org.neo4j.driver.v1.StatementResult;
3835
import org.neo4j.driver.v1.exceptions.ClientException;
39-
import org.neo4j.driver.v1.exceptions.ClusterUnavailableException;
4036
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
37+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
4138

4239
import static java.lang.String.format;
4340

4441
public class ClusterDriver extends BaseDriver
4542
{
4643
private static final String DISCOVER_MEMBERS = "dbms.cluster.discoverMembers";
4744
private static final String ACQUIRE_ENDPOINTS = "dbms.cluster.acquireEndpoints";
48-
private static final int MINIMUM_NUMBER_OF_SERVERS = 3;
4945

50-
private final ConnectionPool connections;
46+
private final Endpoints endpoints = new Endpoints();
47+
private final ClusterSettings clusterSettings;
48+
private boolean discoverable = true;
5149

52-
public ClusterDriver( BoltServerAddress seedAddress, ConnectionSettings connectionSettings, SecurityPlan securityPlan,
53-
PoolSettings poolSettings, Logging logging )
50+
public ClusterDriver( BoltServerAddress seedAddress, ConnectionSettings connectionSettings,
51+
ClusterSettings clusterSettings,
52+
SecurityPlan securityPlan,
53+
PoolSettings poolSettings, Logging logging )
5454
{
55-
super( seedAddress, securityPlan, logging );
56-
this.connections = new SocketConnectionPool( connectionSettings, securityPlan, poolSettings, logging );
55+
super( new SocketConnectionPool( connectionSettings, securityPlan, poolSettings, logging ),seedAddress, securityPlan, logging );
56+
this.clusterSettings = clusterSettings;
5757
discover();
5858
}
5959

60-
void discover()
60+
synchronized void discover()
6161
{
62-
final List<BoltServerAddress> newServers = new LinkedList<>( );
62+
if (!discoverable)
63+
{
64+
return;
65+
}
66+
6367
try
6468
{
6569
boolean success = false;
66-
while ( !servers.isEmpty() && !success )
70+
while ( !connections.isEmpty() && !success )
6771
{
6872
success = call( DISCOVER_MEMBERS, new Consumer<Record>()
6973
{
7074
@Override
7175
public void accept( Record record )
7276
{
73-
newServers.add( new BoltServerAddress( record.get( "address" ).asString() ) );
77+
connections.add(new BoltServerAddress( record.get( "address" ).asString() ));
7478
}
7579
} );
76-
7780
}
78-
if ( success )
81+
if ( !success )
7982
{
80-
this.servers.clear();
81-
this.servers.addAll( newServers );
82-
log.debug( "~~ [MEMBERS] -> %s", newServers );
83-
}
84-
else
85-
{
86-
throw new ClusterUnavailableException( "Run out of servers" );
83+
throw new ServiceUnavailableException( "Run out of servers" );
8784
}
8885
}
8986
catch ( ClientException ex )
9087
{
9188
if ( ex.code().equals( "Neo.ClientError.Procedure.ProcedureNotFound" ) )
9289
{
93-
throw new ClientException( "Discovery failed: could not find procedure %s", DISCOVER_MEMBERS );
90+
//no procedure there, not much to do, stick with what we've got
91+
//this may happen because server is running in standalone mode
92+
log.warn( "Could not find procedure %s", DISCOVER_MEMBERS );
93+
discoverable = false;
9494
}
9595
else
9696
{
@@ -99,13 +99,15 @@ public void accept( Record record )
9999
}
100100
}
101101

102+
//must be called from a synchronized method
102103
private boolean call( String procedureName, Consumer<Record> recorder )
103104
{
105+
Connection acquire = null;
106+
Session session = null;
107+
try {
108+
acquire = connections.acquire();
109+
session = new NetworkSession( acquire, log );
104110

105-
BoltServerAddress address = randomServer();
106-
Connection acquire = connections.acquire( address );
107-
try ( Session session = new NetworkSession( acquire, log ) )
108-
{
109111
StatementResult records = session.run( format( "CALL %s", procedureName ) );
110112
while ( records.hasNext() )
111113
{
@@ -114,65 +116,162 @@ private boolean call( String procedureName, Consumer<Record> recorder )
114116
}
115117
catch ( ConnectionFailureException e )
116118
{
117-
forget(address );
119+
if (acquire != null)
120+
{
121+
forget( acquire.address() );
122+
}
118123
return false;
119124
}
125+
finally
126+
{
127+
if (acquire != null)
128+
{
129+
acquire.close();
130+
}
131+
if (session != null)
132+
{
133+
session.close();
134+
}
135+
}
120136
return true;
121137
}
122138

123-
private void forget(BoltServerAddress address)
139+
//must be called from a synchronized method
140+
private void callWithRetry(String procedureName, Consumer<Record> recorder )
124141
{
125-
servers.remove( address );
126-
connections.purge(address);
142+
while ( !connections.isEmpty() )
143+
{
144+
Connection acquire = null;
145+
Session session = null;
146+
try {
147+
acquire = connections.acquire();
148+
session = new NetworkSession( acquire, log );
149+
List<Record> list = session.run( format( "CALL %s", procedureName ) ).list();
150+
for ( Record record : list )
151+
{
152+
recorder.accept( record );
153+
}
154+
//we found results give up
155+
return;
156+
}
157+
catch ( ConnectionFailureException e )
158+
{
159+
if (acquire != null)
160+
{
161+
forget( acquire.address() );
162+
}
163+
}
164+
finally
165+
{
166+
if (acquire != null)
167+
{
168+
acquire.close();
169+
}
170+
if (session != null)
171+
{
172+
session.close();
173+
}
174+
}
175+
}
176+
177+
throw new ServiceUnavailableException( "Failed to communicate with any of the cluster members" );
178+
}
179+
180+
private synchronized void forget( BoltServerAddress address )
181+
{
182+
connections.purge( address );
127183
}
128184

129-
//TODO this could return a WRITE session but that may lead to users using the LEADER too much
130-
//a `ClientException` may be what we want
131185
@Override
132186
public Session session()
133187
{
134-
throw new UnsupportedOperationException();
188+
return session( SessionMode.WRITE );
135189
}
136190

137191
@Override
138192
public Session session( final SessionMode mode )
139193
{
140-
return new ClusteredSession( new Supplier<Connection>()
194+
switch ( mode )
141195
{
142-
@Override
143-
public Connection get()
196+
case READ:
197+
return new ReadNetworkSession( new Supplier<Connection>()
144198
{
145-
return acquireConnection( mode );
146-
}
147-
}, log );
199+
@Override
200+
public Connection get()
201+
{
202+
return acquireConnection( mode );
203+
}
204+
}, new Consumer<Connection>()
205+
{
206+
@Override
207+
public void accept( Connection connection )
208+
{
209+
forget( connection.address() );
210+
}
211+
}, clusterSettings, log );
212+
case WRITE:
213+
throw new UnsupportedOperationException();
214+
default:
215+
throw new UnsupportedOperationException();
216+
}
148217
}
149218

150-
private Connection acquireConnection( SessionMode mode )
219+
private synchronized Connection acquireConnection( SessionMode mode )
151220
{
221+
if (!discoverable)
222+
{
223+
return connections.acquire();
224+
}
225+
152226
//if we are short on servers, find new ones
153-
if ( servers.size() < MINIMUM_NUMBER_OF_SERVERS )
227+
if ( connections.addressCount() < clusterSettings.minimumNumberOfServers() )
154228
{
155229
discover();
156230
}
157231

158-
final BoltServerAddress[] addresses = new BoltServerAddress[2];
159-
call( ACQUIRE_ENDPOINTS, new Consumer<Record>()
232+
endpoints.clear();
233+
try
160234
{
161-
@Override
162-
public void accept( Record record )
235+
callWithRetry( ACQUIRE_ENDPOINTS, new Consumer<Record>()
163236
{
164-
addresses[0] = new BoltServerAddress( record.get( "READ" ).asString() );
165-
addresses[1] = new BoltServerAddress( record.get( "WRITE" ).asString() );
237+
@Override
238+
public void accept( Record record )
239+
{
240+
String serverMode = record.get( "mode" ).asString();
241+
if ( serverMode.equals( "READ" ) )
242+
{
243+
endpoints.readServer = new BoltServerAddress( record.get( "address" ).asString() );
244+
}
245+
else if ( serverMode.equals( "WRITE" ) )
246+
{
247+
endpoints.writeServer = new BoltServerAddress( record.get( "address" ).asString() );
248+
}
249+
}
250+
} );
251+
}
252+
catch (ClientException e)
253+
{
254+
if ( e.code().equals( "Neo.ClientError.Procedure.ProcedureNotFound" ) )
255+
{
256+
log.warn( "Could not find procedure %s", ACQUIRE_ENDPOINTS );
257+
discoverable = false;
258+
return connections.acquire();
166259
}
167-
} );
260+
throw e;
261+
}
262+
263+
if ( !endpoints.valid() )
264+
{
265+
throw new ServiceUnavailableException("Could not establish any endpoints for the call");
266+
}
168267

169268

170269
switch ( mode )
171270
{
172271
case READ:
173-
return connections.acquire( addresses[0] );
272+
return connections.acquire( endpoints.readServer );
174273
case WRITE:
175-
return connections.acquire( addresses[0] );
274+
return connections.acquire( endpoints.writeServer );
176275
default:
177276
throw new ClientException( mode + " is not supported for creating new sessions" );
178277
}
@@ -191,4 +290,21 @@ public void close()
191290
}
192291
}
193292

293+
private static class Endpoints
294+
{
295+
BoltServerAddress readServer;
296+
BoltServerAddress writeServer;
297+
298+
public boolean valid()
299+
{
300+
return readServer != null && writeServer != null;
301+
}
302+
303+
public void clear()
304+
{
305+
readServer = null;
306+
writeServer = null;
307+
}
308+
}
309+
194310
}

0 commit comments

Comments
 (0)