Skip to content

Commit 51248ce

Browse files
author
Zhen
committed
Muted ack_failure while a reset is sent and no reply has been recived
Given the following message senquence between server and client C S RUN -> FAIL, error e PULL_ALL -> IGNORED RESET -> SUCC In the previous PR, the error e could be thrown either after RESET or before RESET depending on if we called sync (receiveAll) after RESET or before RESET. In this PR, by replacing receiveAll with receiveOne multiple times, we ensure that if the sesssion get interrupted, the error e is always immediately thrown before reset. Therefore we know we could mute akc_failure after a reset is sent, before a reply to this reset is received.
1 parent 737384f commit 51248ce

File tree

6 files changed

+134
-51
lines changed

6 files changed

+134
-51
lines changed

driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,12 @@ public void resetAsync()
207207
delegate.resetAsync();
208208
}
209209

210+
@Override
211+
public boolean isAckFailureMuted()
212+
{
213+
return delegate.isAckFailureMuted();
214+
}
215+
210216
private void markAsAvailable()
211217
{
212218
inUse.set( false );

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class SocketConnection implements Connection
4747
private final Queue<Message> pendingMessages = new LinkedList<>();
4848
private final SocketResponseHandler responseHandler;
4949
private AtomicBoolean isInterrupted = new AtomicBoolean( false );
50+
private AtomicBoolean isAckFailureMuted = new AtomicBoolean( false );
5051
private final Collector.InitCollector initCollector = new Collector.InitCollector();
5152

5253
private final SocketClient socket;
@@ -134,16 +135,19 @@ private void ensureNotInterrupted()
134135
{
135136
if( isInterrupted.get() )
136137
{
137-
receiveAll();
138+
// receive each of it and throw error immediately
139+
while ( responseHandler.collectorsWaiting() > 0 )
140+
{
141+
receiveOne();
142+
}
138143
}
139144
}
140145
catch ( Neo4jException e )
141146
{
142147
throw new ClientException(
143-
"Failed to execute more statements as the session has been reset " +
144-
"and an error has occurred due to the cancellation of executing the previous statement. " +
148+
"An error has occurred due to the cancellation of executing a previous statement. " +
145149
"You received this error probably because you did not consume the result immediately after " +
146-
"running the statement which get cancelled in this session.", e );
150+
"running the statement which get reset in this session.", e );
147151
}
148152

149153
}
@@ -244,10 +248,18 @@ public synchronized void resetAsync()
244248
public void run()
245249
{
246250
isInterrupted.set( false );
251+
isAckFailureMuted.set( false );
247252
}
248253
} ) );
249254
flush();
250255
isInterrupted.set( true );
256+
isAckFailureMuted.set( true );
257+
}
258+
259+
@Override
260+
public boolean isAckFailureMuted()
261+
{
262+
return isAckFailureMuted.get();
251263
}
252264

253265
@Override

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,12 @@ public void resetAsync()
225225
}
226226
}
227227

228+
@Override
229+
public boolean isAckFailureMuted()
230+
{
231+
return delegate.isAckFailureMuted();
232+
}
233+
228234
@Override
229235
public String server()
230236
{
@@ -254,7 +260,7 @@ private void onDelegateException( RuntimeException e )
254260
{
255261
unrecoverableErrorsOccurred = true;
256262
}
257-
else
263+
else if( !isAckFailureMuted() )
258264
{
259265
ackFailure();
260266
}

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ public interface Connection extends AutoCloseable
113113
*/
114114
void resetAsync();
115115

116+
/**
117+
* Return true if ack_failure message is temporarily muted as the failure message will be acked using reset instead
118+
* @return true if no ack_failre message should be sent when ackable failures are received.
119+
*/
120+
boolean isAckFailureMuted();
121+
116122
/**
117123
* Returns the version of the server connected to.
118124
* @return The version of the server connected to.

driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
*/
1919
package org.neo4j.driver.v1.integration;
2020

21+
import org.hamcrest.MatcherAssert;
2122
import org.junit.Rule;
2223
import org.junit.Test;
24+
import org.junit.rules.ExpectedException;
2325

2426
import org.neo4j.driver.v1.AuthToken;
2527
import org.neo4j.driver.v1.AuthTokens;
@@ -30,9 +32,11 @@
3032
import org.neo4j.driver.v1.Transaction;
3133
import org.neo4j.driver.v1.exceptions.ClientException;
3234
import org.neo4j.driver.v1.exceptions.Neo4jException;
35+
import org.neo4j.driver.v1.exceptions.TransientException;
3336
import org.neo4j.driver.v1.util.TestNeo4j;
3437

3538
import static org.hamcrest.CoreMatchers.equalTo;
39+
import static org.hamcrest.CoreMatchers.notNullValue;
3640
import static org.hamcrest.CoreMatchers.startsWith;
3741
import static org.hamcrest.Matchers.greaterThan;
3842
import static org.junit.Assert.assertFalse;
@@ -46,6 +50,9 @@ public class SessionIT
4650
@Rule
4751
public TestNeo4j neo4j = new TestNeo4j();
4852

53+
@Rule
54+
public ExpectedException exception = ExpectedException.none();
55+
4956
@Test
5057
public void shouldKnowSessionIsClosed() throws Throwable
5158
{
@@ -161,7 +168,7 @@ public void shouldKillLongStreamingResult() throws Throwable
161168
recordCount++;
162169
}
163170

164-
fail("Should have got an exception about statement get killed.");
171+
fail("Should have got an exception about streaming get killed.");
165172
}
166173
catch( ClientException e )
167174
{
@@ -175,6 +182,96 @@ public void shouldKillLongStreamingResult() throws Throwable
175182
}
176183
}
177184

185+
@Test
186+
public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable
187+
{
188+
// Given
189+
neo4j.ensureProcedures( "longRunningStatement.jar" );
190+
Driver driver = GraphDatabase.driver( neo4j.uri() );
191+
192+
try( Session session = driver.session() )
193+
{
194+
Transaction tx = session.beginTransaction();
195+
196+
tx.run("CALL test.driver.longRunningStatement({seconds})",
197+
parameters( "seconds", 10 ) );
198+
Thread.sleep( 1* 1000 );
199+
session.reset();
200+
201+
exception.expect( ClientException.class );
202+
exception.expectMessage( startsWith(
203+
"An error has occurred due to the cancellation of executing a previous statement." ) );
204+
205+
// When & Then
206+
tx = session.beginTransaction();
207+
assertThat( tx, notNullValue() );
208+
}
209+
}
210+
211+
@Test
212+
public void shouldThrowExceptionOnCloseIfResetFailureIsNotConsumed() throws Throwable
213+
{
214+
// Given
215+
neo4j.ensureProcedures( "longRunningStatement.jar" );
216+
Driver driver = GraphDatabase.driver( neo4j.uri() );
217+
218+
Session session = driver.session();
219+
session.run( "CALL test.driver.longRunningStatement({seconds})",
220+
parameters( "seconds", 10 ) );
221+
Thread.sleep( 1 * 1000 );
222+
session.reset();
223+
224+
exception.expect( ClientException.class );
225+
exception.expectMessage( startsWith(
226+
"An error has occurred due to the cancellation of executing a previous statement." ) );
227+
228+
// When & Then
229+
session.close();
230+
}
231+
232+
@Test
233+
public void shouldBeAbleToBeginTxAfterResetFailureIsConsumed() throws Throwable
234+
{
235+
// Given
236+
neo4j.ensureProcedures( "longRunningStatement.jar" );
237+
Driver driver = GraphDatabase.driver( neo4j.uri() );
238+
239+
try( Session session = driver.session() )
240+
{
241+
Transaction tx = session.beginTransaction();
242+
243+
StatementResult procedureResult = tx.run("CALL test.driver.longRunningStatement({seconds})",
244+
parameters( "seconds", 10 ) );
245+
Thread.sleep( 1* 1000 );
246+
session.reset();
247+
248+
try
249+
{
250+
procedureResult.consume();
251+
fail( "Should procedure call with an exception as we interrupted procedure call" );
252+
}
253+
catch ( TransientException e )
254+
{
255+
MatcherAssert.assertThat( e.getMessage(), startsWith( "The transaction has been terminated." ) );
256+
}
257+
catch ( Throwable e )
258+
{
259+
fail( "Expected exception is different from what we've received: " + e.getMessage() );
260+
}
261+
262+
// When
263+
tx = session.beginTransaction();
264+
tx.run( "CREATE (n:FirstNode)" );
265+
tx.success();
266+
tx.close();
267+
268+
// Then
269+
StatementResult result = session.run( "MATCH (n) RETURN count(n)" );
270+
long nodes = result.single().get( "count(n)" ).asLong();
271+
MatcherAssert.assertThat( nodes, equalTo( 1L ) );
272+
}
273+
}
274+
178275
private void resetSessionAfterTimeout( final Session session, final int timeout )
179276
{
180277
new Thread( new Runnable()

driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java

Lines changed: 1 addition & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,9 @@
3232
import org.neo4j.driver.v1.Transaction;
3333
import org.neo4j.driver.v1.Value;
3434
import org.neo4j.driver.v1.exceptions.ClientException;
35-
import org.neo4j.driver.v1.exceptions.TransientException;
3635
import org.neo4j.driver.v1.util.TestNeo4jSession;
3736

3837
import static org.hamcrest.CoreMatchers.equalTo;
39-
import static org.hamcrest.CoreMatchers.startsWith;
4038
import static org.hamcrest.MatcherAssert.assertThat;
4139
import static org.junit.Assert.assertFalse;
4240
import static org.junit.Assert.assertTrue;
@@ -217,50 +215,8 @@ public void shouldHandleNullMapParameters() throws Throwable
217215
// Then it wasn't the end of the world as we know it
218216
}
219217

220-
221-
@Test
222-
public void shouldThrowExceptionDueToNotConsumingResetFailure() throws Throwable
223-
{
224-
// Given
225-
Transaction tx = session.beginTransaction();
226-
tx.run( "UNWIND range(1,1) AS n RETURN n AS number" );
227-
session.reset();
228-
229-
exception.expect( ClientException.class );
230-
exception.expectMessage( startsWith(
231-
"Failed to execute more statements as the session has been reset " +
232-
"and an error has occurred due to the cancellation of executing the previous statement." ) );
233-
234-
// When & Then
235-
tx = session.beginTransaction();
236-
}
237-
238-
@Test
239-
public void shouldBeAbleToRunMoreStatementAfterConsumingResetFailure() throws Throwable
240-
{
241-
Transaction tx = session.beginTransaction();
242-
StatementResult failingResult = tx.run( "UNWIND range(1,1) AS n RETURN n AS number" );
243-
session.reset();
244-
245-
exception.expect( TransientException.class );
246-
exception.expectMessage( startsWith(
247-
"The transaction has been terminated." ) );
248-
249-
failingResult.consume(); // fail with errors
250-
251-
tx = session.beginTransaction();
252-
tx.run( "CREATE (n:FirstNode)" );
253-
tx.success();
254-
tx.close();
255-
256-
// Then the outcome of both statements should be visible
257-
StatementResult result = session.run( "MATCH (n) RETURN count(n)" );
258-
long nodes = result.single().get( "count(n)" ).asLong();
259-
assertThat( nodes, equalTo( 1L ) );
260-
}
261-
262218
@Test
263-
public void shouldBeAbleToRunMoreStatementAfterEmptyReset() throws Throwable
219+
public void shouldBeAbleToRunMoreStatementsAfterResetOnNoErrorState() throws Throwable
264220
{
265221
// Given
266222
session.reset();

0 commit comments

Comments
 (0)