Skip to content

Commit d106218

Browse files
committed
Added basic acquisition for ClusterDriver
By specifying `READ` or `WRITE` ClusterDriver will call procedure to get endpoints and creates a session using that connection.
1 parent 1bed415 commit d106218

File tree

10 files changed

+192
-13
lines changed

10 files changed

+192
-13
lines changed

driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
package org.neo4j.driver.internal;
2121

2222
import java.util.Collections;
23-
import java.util.LinkedList;
24-
import java.util.List;
23+
import java.util.HashSet;
24+
import java.util.Set;
2525
import java.util.concurrent.ThreadLocalRandom;
2626

2727
import org.neo4j.driver.internal.net.BoltServerAddress;
@@ -35,7 +35,7 @@ abstract class BaseDriver implements Driver
3535
{
3636
private final SecurityPlan securityPlan;
3737
protected final Logger log;
38-
protected final List<BoltServerAddress> servers = new LinkedList<>();
38+
protected final Set<BoltServerAddress> servers = new HashSet<>();
3939

4040
BaseDriver( BoltServerAddress address, SecurityPlan securityPlan, Logging logging )
4141
{
@@ -50,14 +50,26 @@ public boolean isEncrypted()
5050
return securityPlan.requiresEncryption();
5151
}
5252

53-
List<BoltServerAddress> servers()
53+
Set<BoltServerAddress> servers()
5454
{
55-
return Collections.unmodifiableList( servers );
55+
return Collections.unmodifiableSet( servers );
5656
}
5757

58+
//This is somewhat silly and has O(n) complexity
5859
protected BoltServerAddress randomServer()
5960
{
60-
return servers.get( ThreadLocalRandom.current().nextInt( 0, servers.size() ) );
61+
ThreadLocalRandom random = ThreadLocalRandom.current();
62+
int item = random.nextInt(servers.size());
63+
int i = 0;
64+
for ( BoltServerAddress server : servers )
65+
{
66+
if (i == item)
67+
{
68+
return server;
69+
}
70+
}
71+
72+
throw new IllegalStateException( "This cannot happen" );
6173
}
6274

6375
}

driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
import org.neo4j.driver.internal.spi.Connection;
3030
import org.neo4j.driver.internal.spi.ConnectionPool;
3131
import org.neo4j.driver.internal.util.Consumer;
32+
import org.neo4j.driver.internal.util.Supplier;
3233
import org.neo4j.driver.v1.Logging;
3334
import org.neo4j.driver.v1.Record;
3435
import org.neo4j.driver.v1.Session;
36+
import org.neo4j.driver.v1.SessionMode;
3537
import org.neo4j.driver.v1.StatementResult;
3638
import org.neo4j.driver.v1.exceptions.ClientException;
3739
import org.neo4j.driver.v1.exceptions.ClusterUnavailableException;
@@ -42,6 +44,8 @@
4244
public class ClusterDriver extends BaseDriver
4345
{
4446
private static final String DISCOVER_MEMBERS = "dbms.cluster.discoverMembers";
47+
private static final String ACQUIRE_ENDPOINTS = "dbms.cluster.acquireEndpoints";
48+
private static final int MINIMUM_NUMBER_OF_SERVERS = 3;
4549

4650
private final ConnectionPool connections;
4751

@@ -122,12 +126,58 @@ private void forget(BoltServerAddress address)
122126
connections.purge(address);
123127
}
124128

129+
//TODO this could return a WRITE session but that may lead to users using the LEADER too much
130+
//a `ClientException` may be what we want
125131
@Override
126132
public Session session()
127133
{
128134
throw new UnsupportedOperationException();
129135
}
130136

137+
@Override
138+
public Session session( final SessionMode mode )
139+
{
140+
return new ClusteredSession( new Supplier<Connection>()
141+
{
142+
@Override
143+
public Connection get()
144+
{
145+
return acquireConnection( mode );
146+
}
147+
}, log );
148+
}
149+
150+
private Connection acquireConnection( SessionMode mode )
151+
{
152+
//if we are short on servers, find new ones
153+
if ( servers.size() < MINIMUM_NUMBER_OF_SERVERS )
154+
{
155+
discover();
156+
}
157+
158+
final BoltServerAddress[] addresses = new BoltServerAddress[2];
159+
call( ACQUIRE_ENDPOINTS, new Consumer<Record>()
160+
{
161+
@Override
162+
public void accept( Record record )
163+
{
164+
addresses[0] = new BoltServerAddress( record.get( "READ" ).asString() );
165+
addresses[1] = new BoltServerAddress( record.get( "WRITE" ).asString() );
166+
}
167+
} );
168+
169+
170+
switch ( mode )
171+
{
172+
case READ:
173+
return connections.acquire( addresses[0] );
174+
case WRITE:
175+
return connections.acquire( addresses[0] );
176+
default:
177+
throw new ClientException( mode + " is not supported for creating new sessions" );
178+
}
179+
}
180+
131181
@Override
132182
public void close()
133183
{
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal;
20+
21+
22+
import org.neo4j.driver.internal.spi.Connection;
23+
import org.neo4j.driver.internal.util.Supplier;
24+
import org.neo4j.driver.v1.Logger;
25+
import org.neo4j.driver.v1.Statement;
26+
import org.neo4j.driver.v1.StatementResult;
27+
import org.neo4j.driver.v1.exceptions.ClusterUnavailableException;
28+
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
29+
30+
public class ClusteredSession extends NetworkSession
31+
{
32+
private static final int RETRIES = 3;
33+
private final Supplier<Connection> connectionSupplier;
34+
35+
ClusteredSession(Supplier<Connection> connectionSupplier, Logger logger )
36+
{
37+
super(connectionSupplier.get(), logger);
38+
this.connectionSupplier = connectionSupplier;
39+
}
40+
41+
@Override
42+
public StatementResult run( Statement statement )
43+
{
44+
for ( int i = 0; i < RETRIES; i++ )
45+
{
46+
try
47+
{
48+
return super.run( statement );
49+
}
50+
catch ( ConnectionFailureException e )
51+
{
52+
//connection
53+
connection = connectionSupplier.get();
54+
}
55+
}
56+
57+
throw new ClusterUnavailableException( "Not able to connect to any members of the cluster" );
58+
}
59+
}

driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.neo4j.driver.internal.spi.ConnectionPool;
2727
import org.neo4j.driver.v1.Logging;
2828
import org.neo4j.driver.v1.Session;
29+
import org.neo4j.driver.v1.SessionMode;
2930

3031
import static java.lang.String.format;
3132

@@ -46,6 +47,12 @@ public Session session()
4647
return new NetworkSession( connections.acquire( randomServer() ), log );
4748
}
4849

50+
@Override
51+
public Session session( SessionMode ignore )
52+
{
53+
return session();
54+
}
55+
4956
@Override
5057
public void close()
5158
{

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838

3939
public class NetworkSession implements Session
4040
{
41-
private final Connection connection;
41+
protected Connection connection;
4242
private final Logger logger;
4343

4444
private String lastBookmark = null;
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.util;
20+
21+
public interface Supplier<T> {
22+
T get();
23+
}

driver/src/main/java/org/neo4j/driver/v1/Driver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ public interface Driver extends AutoCloseable
8686
*/
8787
Session session();
8888

89+
Session session(SessionMode mode);
90+
8991
/**
9092
* Close all the resources assigned to this driver
9193
*/

driver/src/main/java/org/neo4j/driver/v1/Session.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public interface Session extends Resource, StatementRunner
8585
* or if this transaction was rolled back, the bookmark value will
8686
* be null.
8787
*
88-
* @return a reference to a previous transaction
88+
* @return a reference to a previous transac'tion
8989
*/
9090
String lastBookmark();
9191

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.v1;
20+
21+
public enum SessionMode
22+
{
23+
READ,
24+
WRITE
25+
}

driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
import java.io.IOException;
2525
import java.net.URI;
26-
import java.util.List;
26+
import java.util.Set;
2727
import java.util.logging.Level;
2828

2929
import org.neo4j.driver.internal.logging.ConsoleLogging;
@@ -32,6 +32,7 @@
3232
import org.neo4j.driver.v1.GraphDatabase;
3333
import org.neo4j.driver.v1.util.StubServer;
3434

35+
import static org.hamcrest.Matchers.hasItem;
3536
import static org.hamcrest.core.IsEqual.equalTo;
3637
import static org.junit.Assert.assertThat;
3738

@@ -50,11 +51,11 @@ public void shouldDiscoverServers() throws IOException, InterruptedException, St
5051
try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) )
5152
{
5253
// Then
53-
List<BoltServerAddress> addresses = driver.servers();
54+
Set<BoltServerAddress> addresses = driver.servers();
5455
assertThat( addresses.size(), equalTo( 3 ) );
55-
assertThat( addresses.get( 0 ), equalTo( new BoltServerAddress( "127.0.0.1", 9001 ) ) );
56-
assertThat( addresses.get( 1 ), equalTo( new BoltServerAddress( "127.0.0.1", 9002 ) ) );
57-
assertThat( addresses.get( 2 ), equalTo( new BoltServerAddress( "127.0.0.1", 9003 ) ) );
56+
assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ) );
57+
assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) ) );
58+
assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) ) );
5859
}
5960

6061
// Finally

0 commit comments

Comments
 (0)