Skip to content

Commit 737384f

Browse files
author
Zhen
committed
Made reset method sync so that once isInterrupted atomic filed get set, no more messages will be enqueued or flushed until the current sent ones has been synced.
Always ackFail
1 parent dba1165 commit 737384f

File tree

5 files changed

+93
-52
lines changed

5 files changed

+93
-52
lines changed

driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,6 @@ public void resetAsync()
207207
delegate.resetAsync();
208208
}
209209

210-
@Override
211-
public boolean isInterrupted()
212-
{
213-
return delegate.isInterrupted();
214-
}
215-
216210
private void markAsAvailable()
217211
{
218212
inUse.set( false );

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class SocketConnection implements Connection
4646
{
4747
private final Queue<Message> pendingMessages = new LinkedList<>();
4848
private final SocketResponseHandler responseHandler;
49-
private AtomicBoolean interrupted = new AtomicBoolean( false );
49+
private AtomicBoolean isInterrupted = new AtomicBoolean( false );
5050
private final Collector.InitCollector initCollector = new Collector.InitCollector();
5151

5252
private final SocketClient socket;
@@ -115,6 +115,8 @@ public void sync()
115115
@Override
116116
public synchronized void flush()
117117
{
118+
ensureNotInterrupted();
119+
118120
try
119121
{
120122
socket.send( pendingMessages );
@@ -126,6 +128,26 @@ public synchronized void flush()
126128
}
127129
}
128130

131+
private void ensureNotInterrupted()
132+
{
133+
try
134+
{
135+
if( isInterrupted.get() )
136+
{
137+
receiveAll();
138+
}
139+
}
140+
catch ( Neo4jException e )
141+
{
142+
throw new ClientException(
143+
"Failed to execute more statements as the session has been reset " +
144+
"and an error has occurred due to the cancellation of executing the previous statement. " +
145+
"You received this error probably because you did not consume the result immediately after " +
146+
"running the statement which get cancelled in this session.", e );
147+
}
148+
149+
}
150+
129151
private void receiveAll()
130152
{
131153
try
@@ -159,6 +181,7 @@ private void assertNoServerFailure()
159181
{
160182
Neo4jException exception = responseHandler.serverFailure();
161183
responseHandler.clearError();
184+
isInterrupted.set( false );
162185
throw exception;
163186
}
164187
}
@@ -182,6 +205,8 @@ else if ( e instanceof SocketTimeoutException )
182205

183206
private synchronized void queueMessage( Message msg, Collector collector )
184207
{
208+
ensureNotInterrupted();
209+
185210
pendingMessages.add( msg );
186211
responseHandler.appendResultCollector( collector );
187212
}
@@ -211,26 +236,18 @@ public boolean hasUnrecoverableErrors()
211236
}
212237

213238
@Override
214-
public void resetAsync()
239+
public synchronized void resetAsync()
215240
{
216-
if( interrupted.compareAndSet( false, true ) )
241+
queueMessage( RESET, new Collector.ResetCollector( new Runnable()
217242
{
218-
queueMessage( RESET, new Collector.ResetCollector( new Runnable()
243+
@Override
244+
public void run()
219245
{
220-
@Override
221-
public void run()
222-
{
223-
interrupted.set( false );
224-
}
225-
} ) );
226-
flush();
227-
}
228-
}
229-
230-
@Override
231-
public boolean isInterrupted()
232-
{
233-
return interrupted.get();
246+
isInterrupted.set( false );
247+
}
248+
} ) );
249+
flush();
250+
isInterrupted.set( true );
234251
}
235252

236253
@Override

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,6 @@ public void resetAsync()
225225
}
226226
}
227227

228-
@Override
229-
public boolean isInterrupted()
230-
{
231-
return delegate.isInterrupted();
232-
}
233-
234228
@Override
235229
public String server()
236230
{
@@ -260,7 +254,7 @@ private void onDelegateException( RuntimeException e )
260254
{
261255
unrecoverableErrorsOccurred = true;
262256
}
263-
else if( !isInterrupted() )
257+
else
264258
{
265259
ackFailure();
266260
}

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,6 @@ public interface Connection extends AutoCloseable
113113
*/
114114
void resetAsync();
115115

116-
/**
117-
* Return true if the current session statement execution has been interrupted by another thread, otherwise false.
118-
* @return true if the current session statement execution has been interrupted by another thread, otherwise false
119-
*/
120-
boolean isInterrupted();
121-
122116
/**
123117
* Returns the version of the server connected to.
124118
* @return The version of the server connected to.

driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@
3232
import org.neo4j.driver.v1.Transaction;
3333
import org.neo4j.driver.v1.Value;
3434
import org.neo4j.driver.v1.exceptions.ClientException;
35+
import org.neo4j.driver.v1.exceptions.TransientException;
3536
import org.neo4j.driver.v1.util.TestNeo4jSession;
3637

3738
import static org.hamcrest.CoreMatchers.equalTo;
39+
import static org.hamcrest.CoreMatchers.startsWith;
3840
import static org.hamcrest.MatcherAssert.assertThat;
3941
import static org.junit.Assert.assertFalse;
4042
import static org.junit.Assert.assertTrue;
@@ -215,24 +217,64 @@ public void shouldHandleNullMapParameters() throws Throwable
215217
// Then it wasn't the end of the world as we know it
216218
}
217219

220+
218221
@Test
219-
public void shouldHandleReset() throws Throwable
222+
public void shouldThrowExceptionDueToNotConsumingResetFailure() throws Throwable
220223
{
221-
Transaction tx = session.beginTransaction();
222-
tx.run( "UNWIND range(1,1) AS n RETURN n AS number" );
223-
//TODO I believe there is a bug server side, if the RESET comes in
224-
//too fast, stuff breaks
225-
Thread.sleep( 100 );
226-
session.reset();
227-
tx = session.beginTransaction();
228-
tx.run( "CREATE (n:FirstNode)" );
229-
tx.success();
230-
tx.close();
224+
// Given
225+
Transaction tx = session.beginTransaction();
226+
tx.run( "UNWIND range(1,1) AS n RETURN n AS number" );
227+
session.reset();
228+
229+
exception.expect( ClientException.class );
230+
exception.expectMessage( startsWith(
231+
"Failed to execute more statements as the session has been reset " +
232+
"and an error has occurred due to the cancellation of executing the previous statement." ) );
233+
234+
// When & Then
235+
tx = session.beginTransaction();
236+
}
237+
238+
@Test
239+
public void shouldBeAbleToRunMoreStatementAfterConsumingResetFailure() throws Throwable
240+
{
241+
Transaction tx = session.beginTransaction();
242+
StatementResult failingResult = tx.run( "UNWIND range(1,1) AS n RETURN n AS number" );
243+
session.reset();
244+
245+
exception.expect( TransientException.class );
246+
exception.expectMessage( startsWith(
247+
"The transaction has been terminated." ) );
248+
249+
failingResult.consume(); // fail with errors
250+
251+
tx = session.beginTransaction();
252+
tx.run( "CREATE (n:FirstNode)" );
253+
tx.success();
254+
tx.close();
231255

232-
// Then the outcome of both statements should be visible
233-
StatementResult result = session.run( "MATCH (n) RETURN count(n)" );
234-
long nodes = result.single().get( "count(n)" ).asLong();
235-
assertThat( nodes, equalTo( 1L ) );
256+
// Then the outcome of both statements should be visible
257+
StatementResult result = session.run( "MATCH (n) RETURN count(n)" );
258+
long nodes = result.single().get( "count(n)" ).asLong();
259+
assertThat( nodes, equalTo( 1L ) );
260+
}
261+
262+
@Test
263+
public void shouldBeAbleToRunMoreStatementAfterEmptyReset() throws Throwable
264+
{
265+
// Given
266+
session.reset();
267+
268+
// When
269+
Transaction tx = session.beginTransaction();
270+
tx.run( "CREATE (n:FirstNode)" );
271+
tx.success();
272+
tx.close();
273+
274+
// Then the outcome of both statements should be visible
275+
StatementResult result = session.run( "MATCH (n) RETURN count(n)" );
276+
long nodes = result.single().get( "count(n)" ).asLong();
277+
assertThat( nodes, equalTo( 1L ) );
236278
}
237279

238280
@Test

0 commit comments

Comments
 (0)