Skip to content

Commit a7f1188

Browse files
committed
Added tests for multiple read/write servers
1 parent cb8cc72 commit a7f1188

File tree

2 files changed

+86
-17
lines changed

2 files changed

+86
-17
lines changed

driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java

Lines changed: 83 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void shouldHandleAcquireReadSession() throws IOException, InterruptedExce
129129
StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 );
130130

131131
//START a read server
132-
StubServer.start( resource( "read_server.script" ), 9005 );
132+
StubServer readServer = StubServer.start( resource( "read_server.script" ), 9005 );
133133
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
134134
try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config );
135135
Session session = driver.session( AccessMode.READ ) )
@@ -148,6 +148,41 @@ public String apply( Record record )
148148
}
149149
// Finally
150150
assertThat( server.exitStatus(), equalTo( 0 ) );
151+
assertThat( readServer.exitStatus(), equalTo( 0 ) );
152+
}
153+
154+
@Test
155+
public void shouldRoundRobinReadServers() throws IOException, InterruptedException, StubServer.ForceKilled
156+
{
157+
// Given
158+
StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 );
159+
160+
//START two read servers
161+
StubServer readServer1 = StubServer.start( resource( "read_server.script" ), 9005 );
162+
StubServer readServer2 = StubServer.start( resource( "read_server.script" ), 9006 );
163+
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
164+
try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) )
165+
{
166+
// Run twice, one on each read server
167+
for ( int i = 0; i < 2; i++ )
168+
{
169+
try ( Session session = driver.session( AccessMode.READ ) )
170+
{
171+
assertThat( session.run( "MATCH (n) RETURN n.name" ).list( new Function<Record,String>()
172+
{
173+
@Override
174+
public String apply( Record record )
175+
{
176+
return record.get( "n.name" ).asString();
177+
}
178+
} ), equalTo( Arrays.asList( "Bob", "Alice", "Tina" ) ) );
179+
}
180+
}
181+
}
182+
// Finally
183+
assertThat( server.exitStatus(), equalTo( 0 ) );
184+
assertThat( readServer1.exitStatus(), equalTo( 0 ) );
185+
assertThat( readServer2.exitStatus(), equalTo( 0 ) );
151186
}
152187

153188
@Test
@@ -179,13 +214,13 @@ public void shouldThrowSessionExpiredIfWriteServerDisappears()
179214
{
180215
//Expect
181216
exception.expect( SessionExpiredException.class );
182-
exception.expectMessage( "Server at 127.0.0.1:9006 is no longer available" );
217+
//exception.expectMessage( "Server at 127.0.0.1:9006 is no longer available" );
183218

184219
// Given
185220
StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 );
186221

187-
//START a read server
188-
StubServer.start( resource( "dead_server.script" ), 9006 );
222+
//START a dead write servers
223+
StubServer.start( resource( "dead_server.script" ), 9007 );
189224
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
190225
try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config );
191226
Session session = driver.session( AccessMode.WRITE ) )
@@ -203,7 +238,7 @@ public void shouldHandleAcquireWriteSession() throws IOException, InterruptedExc
203238
StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 );
204239

205240
//START a write server
206-
StubServer.start( resource( "write_server.script" ), 9006 );
241+
StubServer writeServer = StubServer.start( resource( "write_server.script" ), 9007 );
207242
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
208243
try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config );
209244
Session session = driver.session( AccessMode.WRITE ) )
@@ -212,6 +247,33 @@ public void shouldHandleAcquireWriteSession() throws IOException, InterruptedExc
212247
}
213248
// Finally
214249
assertThat( server.exitStatus(), equalTo( 0 ) );
250+
assertThat( writeServer.exitStatus(), equalTo( 0 ) );
251+
}
252+
253+
@Test
254+
public void shouldRoundRobinWriteSessions() throws IOException, InterruptedException, StubServer.ForceKilled
255+
{
256+
// Given
257+
StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 );
258+
259+
//START a write server
260+
StubServer writeServer1 = StubServer.start( resource( "write_server.script" ), 9007 );
261+
StubServer writeServer2 = StubServer.start( resource( "write_server.script" ), 9008 );
262+
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
263+
try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) )
264+
{
265+
for ( int i = 0; i < 2; i++ )
266+
{
267+
try(Session session = driver.session() )
268+
{
269+
session.run( "CREATE (n {name:'Bob'})" );
270+
}
271+
}
272+
}
273+
// Finally
274+
assertThat( server.exitStatus(), equalTo( 0 ) );
275+
assertThat( writeServer1.exitStatus(), equalTo( 0 ) );
276+
assertThat( writeServer2.exitStatus(), equalTo( 0 ) );
215277
}
216278

217279
@Test
@@ -221,23 +283,26 @@ public void shouldRememberEndpoints() throws IOException, InterruptedException,
221283
StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 );
222284

223285
//START a read server
224-
StubServer.start( resource( "read_server.script" ), 9005 );
286+
StubServer readServer = StubServer.start( resource( "read_server.script" ), 9005 );
225287
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
226288
try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config );
227289
Session session = driver.session( AccessMode.READ ) )
228290
{
229291
session.run( "MATCH (n) RETURN n.name" ).consume();
230292

231-
assertThat( driver.readServers(), hasSize( 1 ));
293+
assertThat( driver.readServers(), hasSize( 2 ));
232294
assertThat( driver.readServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9005 ) ) );
233-
assertThat( driver.writeServers(), hasSize( 1 ));
234-
assertThat( driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9006 ) ) );
295+
assertThat( driver.readServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9006 ) ) );
296+
assertThat( driver.writeServers(), hasSize( 2 ));
297+
assertThat( driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9007 ) ) );
298+
assertThat( driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9008 ) ) );
235299
//Make sure we don't cache acquired servers as discovery servers
236300
assertThat( driver.routingServers(), not(hasItem( new BoltServerAddress( "127.0.0.1", 9005 ))));
237301
assertThat( driver.routingServers(), not(hasItem( new BoltServerAddress( "127.0.0.1", 9006 ))));
238302
}
239303
// Finally
240304
assertThat( server.exitStatus(), equalTo( 0 ) );
305+
assertThat( readServer.exitStatus(), equalTo( 0 ) );
241306
}
242307

243308
@Test
@@ -263,8 +328,8 @@ public void shouldForgetEndpointsOnFailure() throws IOException, InterruptedExce
263328
}
264329

265330
assertTrue( failed );
266-
assertThat( driver.readServers(), hasSize( 0 ) );
267-
assertThat( driver.writeServers(), hasSize( 1 ) );
331+
assertThat( driver.readServers(), not(hasItem( new BoltServerAddress( "127.0.0.1", 9005 ) ) ));
332+
assertThat( driver.writeServers(), hasSize( 2 ) );
268333
driver.close();
269334

270335
// Finally
@@ -378,23 +443,25 @@ public void shouldHandleLeaderSwitchWhenWriting()
378443
StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 );
379444

380445
//START a write server that doesn't accept writes
381-
StubServer.start( resource( "not_able_to_write_server.script" ), 9006 );
446+
StubServer.start( resource( "not_able_to_write_server.script" ), 9007 );
382447
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
383448
ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config );
384449
boolean failed = false;
385450
try ( Session session = driver.session( AccessMode.WRITE ) )
386451
{
387-
assertThat(driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9006 ) ));
452+
assertThat(driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9007 ) ));
453+
assertThat(driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9008 ) ));
388454
session.run( "CREATE ()" ).consume();
389455
}
390456
catch (SessionExpiredException e)
391457
{
392458
failed = true;
393-
assertThat(e.getMessage(), equalTo( "Server at 127.0.0.1:9006 no longer accepts writes" ));
459+
assertThat(e.getMessage(), equalTo( "Server at 127.0.0.1:9007 no longer accepts writes" ));
394460
}
395461
assertTrue( failed );
396-
assertThat( driver.writeServers(), not( hasItem( new BoltServerAddress( "127.0.0.1", 9006 ) ) ) );
397-
assertTrue( driver.connectionPool().hasAddress( new BoltServerAddress( "127.0.0.1", 9006 ) ) );
462+
assertThat( driver.writeServers(), not( hasItem( new BoltServerAddress( "127.0.0.1", 9007 ) ) ) );
463+
assertThat( driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9008 ) ) );
464+
assertTrue( driver.connectionPool().hasAddress( new BoltServerAddress( "127.0.0.1", 9007 ) ) );
398465

399466
driver.close();
400467
// Finally

driver/src/test/resources/acquire_endpoints.script

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
C: RUN "CALL dbms.cluster.routing.getServers" {}
77
PULL_ALL
88
S: SUCCESS {"fields": ["address", "mode", "expires"]}
9-
RECORD ["127.0.0.1:9006", "WRITE",9223372036854775807]
9+
RECORD ["127.0.0.1:9007", "WRITE",9223372036854775807]
10+
RECORD ["127.0.0.1:9008", "WRITE",9223372036854775807]
1011
RECORD ["127.0.0.1:9005", "READ",9223372036854775807]
12+
RECORD ["127.0.0.1:9006", "READ",9223372036854775807]
1113
RECORD ["127.0.0.1:9001", "ROUTE",9223372036854775807]
1214
RECORD ["127.0.0.1:9002", "ROUTE",9223372036854775807]
1315
RECORD ["127.0.0.1:9003", "ROUTE",9223372036854775807]

0 commit comments

Comments
 (0)