Skip to content

Commit dba1165

Browse files
committed
Automatically close on session.reset()
In the event of a reset the transaction should be completely closed and the session should be ready to create new transactions without explicitly closing any transactions.
1 parent 9858f73 commit dba1165

File tree

2 files changed

+131
-25
lines changed

2 files changed

+131
-25
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,13 @@ public class NetworkSession implements Session
4949
@Override
5050
public void run()
5151
{
52-
if ( currentTransaction != null )
52+
synchronized ( NetworkSession.this )
5353
{
54-
lastBookmark = currentTransaction.bookmark();
55-
currentTransaction = null;
54+
if ( currentTransaction != null )
55+
{
56+
lastBookmark = currentTransaction.bookmark();
57+
currentTransaction = null;
58+
}
5659
}
5760
}
5861
};
@@ -73,9 +76,9 @@ public StatementResult run( String statementText )
7376
}
7477

7578
@Override
76-
public StatementResult run( String statementText, Map<String, Object> statementParameters )
79+
public StatementResult run( String statementText, Map<String,Object> statementParameters )
7780
{
78-
Value params = statementParameters == null ? Values.EmptyMap : value(statementParameters);
81+
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters );
7982
return run( statementText, params );
8083
}
8184

@@ -97,21 +100,24 @@ public StatementResult run( Statement statement )
97100
{
98101
ensureConnectionIsValidBeforeRunningSession();
99102
InternalStatementResult cursor = new InternalStatementResult( connection, null, statement );
100-
connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ), cursor.runResponseCollector() );
103+
connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ),
104+
cursor.runResponseCollector() );
101105
connection.pullAll( cursor.pullAllResponseCollector() );
102106
connection.flush();
103107
return cursor;
104108
}
105109

106-
public void reset()
110+
public synchronized void reset()
107111
{
108112
ensureSessionIsOpen();
109113
ensureNoUnrecoverableError();
110114
ensureConnectionIsOpen();
111115

112-
if( currentTransaction != null )
116+
if ( currentTransaction != null )
113117
{
114118
currentTransaction.markToClose();
119+
lastBookmark = currentTransaction.bookmark();
120+
currentTransaction = null;
115121
}
116122
connection.resetAsync();
117123
}
@@ -126,21 +132,24 @@ public boolean isOpen()
126132
public void close()
127133
{
128134
// Use atomic operation to protect from closing the connection twice (putting back to the pool twice).
129-
if( !isOpen.compareAndSet( true, false ) )
135+
if ( !isOpen.compareAndSet( true, false ) )
130136
{
131137
throw new ClientException( "This session has already been closed." );
132138
}
133139
else
134140
{
135-
if ( currentTransaction != null )
141+
synchronized ( this )
136142
{
137-
try
138-
{
139-
currentTransaction.close();
140-
}
141-
catch ( Throwable e )
143+
if ( currentTransaction != null )
142144
{
143-
// Best-effort
145+
try
146+
{
147+
currentTransaction.close();
148+
}
149+
catch ( Throwable e )
150+
{
151+
// Best-effort
152+
}
144153
}
145154
}
146155
try
@@ -167,7 +176,7 @@ public Transaction beginTransaction()
167176
}
168177

169178
@Override
170-
public Transaction beginTransaction( String bookmark )
179+
public synchronized Transaction beginTransaction( String bookmark )
171180
{
172181
ensureConnectionIsValidBeforeOpeningTransaction();
173182
currentTransaction = new ExplicitTransaction( connection, txCleanup, bookmark );
@@ -224,7 +233,7 @@ private void ensureConnectionIsValidBeforeOpeningTransaction()
224233
@Override
225234
protected void finalize() throws Throwable
226235
{
227-
if( isOpen.compareAndSet( true, false ) )
236+
if ( isOpen.compareAndSet( true, false ) )
228237
{
229238
logger.error( "Neo4j Session object leaked, please ensure that your application calls the `close` " +
230239
"method on Sessions before disposing of the objects.", null );
@@ -235,14 +244,15 @@ protected void finalize() throws Throwable
235244

236245
private void ensureNoUnrecoverableError()
237246
{
238-
if( connection.hasUnrecoverableErrors() )
247+
if ( connection.hasUnrecoverableErrors() )
239248
{
240249
throw new ClientException( "Cannot run more statements in the current session as an unrecoverable error " +
241250
"has happened. Please close the current session and re-run your statement in a" +
242251
" new session." );
243252
}
244253
}
245254

255+
//should be called from a synchronized block
246256
private void ensureNoOpenTransactionBeforeRunningSession()
247257
{
248258
if ( currentTransaction != null )
@@ -252,6 +262,7 @@ private void ensureNoOpenTransactionBeforeRunningSession()
252262
}
253263
}
254264

265+
//should be called from a synchronized block
255266
private void ensureNoOpenTransactionBeforeOpeningTransaction()
256267
{
257268
if ( currentTransaction != null )
@@ -273,12 +284,13 @@ private void ensureConnectionIsOpen()
273284

274285
private void ensureSessionIsOpen()
275286
{
276-
if( !isOpen() )
287+
if ( !isOpen() )
277288
{
278289
throw new ClientException(
279290
"No more interaction with this session is allowed " +
280291
"as the current session is already closed or marked as closed. " +
281-
"You get this error either because you have a bad reference to a session that has already be closed " +
292+
"You get this error either because you have a bad reference to a session that has already be " +
293+
"closed " +
282294
"or you are trying to reuse a session that you have called `reset` on it." );
283295
}
284296
}

driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java

Lines changed: 98 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import org.junit.rules.ExpectedException;
2424

2525
import java.util.Map;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.TimeUnit;
2629

2730
import org.neo4j.driver.v1.Record;
2831
import org.neo4j.driver.v1.StatementResult;
@@ -142,7 +145,7 @@ public void shouldBeOpenBeforeCommit() throws Throwable
142145
public void shouldHandleNullParametersGracefully()
143146
{
144147
// When
145-
session.run("match (n) return count(n)", (Value)null);
148+
session.run( "match (n) return count(n)", (Value) null );
146149

147150
// Then
148151
// pass - no exception thrown
@@ -155,7 +158,7 @@ public void shouldHandleFailureAfterClosingTransaction()
155158
{
156159
// GIVEN a successful query in a transaction
157160
Transaction tx = session.beginTransaction();
158-
StatementResult result = tx.run("CREATE (n) RETURN n");
161+
StatementResult result = tx.run( "CREATE (n) RETURN n" );
159162
result.consume();
160163
tx.success();
161164
tx.close();
@@ -164,7 +167,7 @@ public void shouldHandleFailureAfterClosingTransaction()
164167
exception.expect( ClientException.class );
165168

166169
//WHEN running a malformed query in the original session
167-
session.run("CREAT (n) RETURN n").consume();
170+
session.run( "CREAT (n) RETURN n" ).consume();
168171
}
169172

170173
@SuppressWarnings( "ConstantConditions" )
@@ -204,11 +207,102 @@ public void shouldHandleNullMapParameters() throws Throwable
204207
// When
205208
try ( Transaction tx = session.beginTransaction() )
206209
{
207-
Map<String, Object> params = null;
210+
Map<String,Object> params = null;
208211
tx.run( "CREATE (n:FirstNode)", params );
209212
tx.success();
210213
}
211214

212215
// Then it wasn't the end of the world as we know it
213216
}
217+
218+
@Test
219+
public void shouldHandleReset() throws Throwable
220+
{
221+
Transaction tx = session.beginTransaction();
222+
tx.run( "UNWIND range(1,1) AS n RETURN n AS number" );
223+
//TODO I believe there is a bug server side, if the RESET comes in
224+
//too fast, stuff breaks
225+
Thread.sleep( 100 );
226+
session.reset();
227+
tx = session.beginTransaction();
228+
tx.run( "CREATE (n:FirstNode)" );
229+
tx.success();
230+
tx.close();
231+
232+
// Then the outcome of both statements should be visible
233+
StatementResult result = session.run( "MATCH (n) RETURN count(n)" );
234+
long nodes = result.single().get( "count(n)" ).asLong();
235+
assertThat( nodes, equalTo( 1L ) );
236+
}
237+
238+
@Test
239+
public void shouldHandleResetBeforeRun() throws Throwable
240+
{
241+
// Expect
242+
exception.expect( ClientException.class );
243+
exception.expectMessage( "Cannot run more statements in this transaction, because previous statements in the " +
244+
"transaction has failed and the transaction has been rolled back. Please start a new" +
245+
" transaction to run another statement." );
246+
// When
247+
Transaction tx = session.beginTransaction();
248+
session.reset();
249+
tx.run( "CREATE (n:FirstNode)" );
250+
}
251+
252+
private Transaction globalTx = null;
253+
@Test
254+
public void shouldHandleResetFromMultipleThreads() throws Throwable
255+
{
256+
// When
257+
ExecutorService runner = Executors.newFixedThreadPool( 2 );
258+
runner.execute( new Runnable()
259+
260+
{
261+
@Override
262+
public void run()
263+
{
264+
globalTx = session.beginTransaction();
265+
globalTx.run( "CREATE (n:FirstNode)" );
266+
try
267+
{
268+
Thread.sleep( 1000 );
269+
}
270+
catch ( InterruptedException e )
271+
{
272+
new AssertionError( e );
273+
}
274+
275+
globalTx = session.beginTransaction();
276+
globalTx.run( "CREATE (n:FirstNode)" );
277+
globalTx.success();
278+
globalTx.close();
279+
280+
}
281+
} );
282+
runner.execute( new Runnable()
283+
284+
{
285+
@Override
286+
public void run()
287+
{
288+
try
289+
{
290+
Thread.sleep( 500 );
291+
}
292+
catch ( InterruptedException e )
293+
{
294+
new AssertionError( e );
295+
}
296+
297+
session.reset();
298+
}
299+
} );
300+
301+
runner.awaitTermination( 5, TimeUnit.SECONDS );
302+
303+
// Then the outcome of both statements should be visible
304+
StatementResult result = session.run( "MATCH (n) RETURN count(n)" );
305+
long nodes = result.single().get( "count(n)" ).asLong();
306+
assertThat( nodes, equalTo( 1L ) );
307+
}
214308
}

0 commit comments

Comments
 (0)