Skip to content

Commit 1600196

Browse files
authored
Merge pull request #211 from zhenlineo/1.1-thread-safe-reset
Make sessioin.reset thread-safe
2 parents eef96fb + 58177f4 commit 1600196

File tree

5 files changed

+101
-16
lines changed

5 files changed

+101
-16
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public StatementResult run( String statementTemplate, Record statementParameters
170170
}
171171

172172
@Override
173-
public StatementResult run( Statement statement )
173+
public synchronized StatementResult run( Statement statement )
174174
{
175175
ensureNotFailed();
176176

@@ -217,7 +217,7 @@ public TypeSystem typeSystem()
217217
return InternalTypeSystem.TYPE_SYSTEM;
218218
}
219219

220-
public void markToClose()
220+
public synchronized void markToClose()
221221
{
222222
state = State.FAILED;
223223
}

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,14 @@ public StatementResult run( Statement statement )
105105

106106
public void reset()
107107
{
108+
ensureSessionIsOpen();
108109
ensureNoUnrecoverableError();
109110
ensureConnectionIsOpen();
110111

112+
if( currentTransaction != null )
113+
{
114+
currentTransaction.markToClose();
115+
}
111116
connection.resetAsync();
112117
}
113118

@@ -202,13 +207,15 @@ public TypeSystem typeSystem()
202207

203208
private void ensureConnectionIsValidBeforeRunningSession()
204209
{
210+
ensureSessionIsOpen();
205211
ensureNoUnrecoverableError();
206212
ensureNoOpenTransactionBeforeRunningSession();
207213
ensureConnectionIsOpen();
208214
}
209215

210216
private void ensureConnectionIsValidBeforeOpeningTransaction()
211217
{
218+
ensureSessionIsOpen();
212219
ensureNoUnrecoverableError();
213220
ensureNoOpenTransactionBeforeOpeningTransaction();
214221
ensureConnectionIsOpen();
@@ -263,4 +270,16 @@ private void ensureConnectionIsOpen()
263270
"Please close this session and retry your statement in another new session." );
264271
}
265272
}
273+
274+
private void ensureSessionIsOpen()
275+
{
276+
if( !isOpen() )
277+
{
278+
throw new ClientException(
279+
"No more interaction with this session is allowed " +
280+
"as the current session is already closed or marked as closed. " +
281+
"You get this error either because you have a bad reference to a session that has already be closed " +
282+
"or you are trying to reuse a session that you have called `reset` on it." );
283+
}
284+
}
266285
}

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void sync()
113113
}
114114

115115
@Override
116-
public void flush()
116+
public synchronized void flush()
117117
{
118118
try
119119
{
@@ -180,7 +180,7 @@ else if ( e instanceof SocketTimeoutException )
180180
}
181181
}
182182

183-
private void queueMessage( Message msg, Collector collector )
183+
private synchronized void queueMessage( Message msg, Collector collector )
184184
{
185185
pendingMessages.add( msg );
186186
responseHandler.appendResultCollector( collector );

driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
*/
1919
package org.neo4j.driver.internal.net;
2020

21-
import java.util.LinkedList;
2221
import java.util.Map;
2322
import java.util.Queue;
23+
import java.util.concurrent.ConcurrentLinkedQueue;
2424

2525
import org.neo4j.driver.internal.messaging.MessageHandler;
2626
import org.neo4j.driver.internal.spi.Collector;
@@ -39,7 +39,7 @@
3939

4040
public class SocketResponseHandler implements MessageHandler
4141
{
42-
private final Queue<Collector> collectors = new LinkedList<>();
42+
private final Queue<Collector> collectors = new ConcurrentLinkedQueue<>();
4343

4444
/** If a failure occurs, the error gets stored here */
4545
private Neo4jException error;

driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,13 @@
2121
import org.junit.Rule;
2222
import org.junit.Test;
2323

24-
import org.neo4j.driver.v1.AuthToken;
25-
import org.neo4j.driver.v1.AuthTokens;
26-
import org.neo4j.driver.v1.Driver;
27-
import org.neo4j.driver.v1.GraphDatabase;
28-
import org.neo4j.driver.v1.Session;
29-
import org.neo4j.driver.v1.StatementResult;
24+
import org.neo4j.driver.v1.*;
3025
import org.neo4j.driver.v1.exceptions.ClientException;
3126
import org.neo4j.driver.v1.exceptions.Neo4jException;
3227
import org.neo4j.driver.v1.util.TestNeo4j;
3328

3429
import static org.hamcrest.CoreMatchers.equalTo;
30+
import static org.hamcrest.CoreMatchers.startsWith;
3531
import static org.hamcrest.Matchers.greaterThan;
3632
import static org.junit.Assert.assertFalse;
3733
import static org.junit.Assert.assertThat;
@@ -105,7 +101,8 @@ public void shouldKillLongRunningStatement() throws Throwable
105101
final int killTimeout = 1; // 1s
106102
long startTime = -1, endTime;
107103

108-
try( final Session session = driver.session() )
104+
final Session session = driver.session();
105+
try
109106
{
110107
StatementResult result =
111108
session.run( "CALL test.driver.longRunningStatement({seconds})",
@@ -123,13 +120,17 @@ public void shouldKillLongRunningStatement() throws Throwable
123120
{
124121
endTime = System.currentTimeMillis();
125122
assertTrue( startTime > 0 );
126-
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
123+
assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset
127124
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
128125
}
129126
catch ( Exception e )
130127
{
131128
fail( "Should be a Neo4jException" );
132129
}
130+
finally
131+
{
132+
session.close();
133+
}
133134
}
134135

135136
@Test
@@ -168,7 +169,7 @@ public void shouldKillLongStreamingResult() throws Throwable
168169
assertThat( recordCount, greaterThan(1) );
169170

170171
assertTrue( startTime > 0 );
171-
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
172+
assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset
172173
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
173174
}
174175
}
@@ -190,9 +191,74 @@ public void run()
190191
}
191192
finally
192193
{
193-
session.reset(); // kill the session after timeout
194+
session.reset(); // reset the session after timeout
194195
}
195196
}
196197
} ).start();
197198
}
199+
200+
@Test
201+
public void shouldAllowMoreStatementAfterSessionReset()
202+
{
203+
// Given
204+
try( Driver driver = GraphDatabase.driver( neo4j.uri() );
205+
Session session = driver.session() )
206+
{
207+
208+
session.run( "Return 1" ).consume();
209+
210+
// When reset the state of this session
211+
session.reset();
212+
213+
// Then can run successfully more statements without any error
214+
session.run( "Return 2" ).consume();
215+
}
216+
}
217+
218+
@Test
219+
public void shouldAllowMoreTxAfterSessionReset()
220+
{
221+
// Given
222+
try( Driver driver = GraphDatabase.driver( neo4j.uri() );
223+
Session session = driver.session() )
224+
{
225+
try( Transaction tx = session.beginTransaction() )
226+
{
227+
tx.run("Return 1");
228+
tx.success();
229+
}
230+
231+
// When reset the state of this session
232+
session.reset();
233+
234+
// Then can run more Tx
235+
try( Transaction tx = session.beginTransaction() )
236+
{
237+
tx.run("Return 2");
238+
tx.success();
239+
}
240+
}
241+
}
242+
243+
@Test
244+
public void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset()
245+
{
246+
// Given
247+
try( Driver driver = GraphDatabase.driver( neo4j.uri() );
248+
Session session = driver.session() )
249+
{
250+
try( Transaction tx = session.beginTransaction() )
251+
{
252+
// When reset the state of this session
253+
session.reset();
254+
// Then
255+
tx.run( "Return 1" );
256+
fail( "Should not allow tx run as tx is already failed." );
257+
}
258+
catch( Exception e )
259+
{
260+
assertThat( e.getMessage(), startsWith( "Cannot run more statements in this transaction" ) );
261+
}
262+
}
263+
}
198264
}

0 commit comments

Comments
 (0)