diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/StepContribution.java b/spring-batch-core/src/main/java/org/springframework/batch/core/StepContribution.java index fcbeaa9284..ab3b6ab52b 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/StepContribution.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/StepContribution.java @@ -155,6 +155,15 @@ public void incrementWriteSkipCount() { writeSkipCount++; } + /** + * Increment the write skip count for this contribution. + * @param count The {@code long} amount to increment by. + * @since 6.0.0 + */ + public void incrementWriteSkipCount(long count) { + writeSkipCount += count; + } + /** * */ diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java index ecb797111c..2b2cf75ea0 100755 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java @@ -347,6 +347,7 @@ protected void write(final StepContribution contribution, final Chunk inputs, stopTimer(sample, contribution.getStepExecution(), "chunk.write", status, "Chunk writing"); } contribution.incrementWriteCount(outputs.size()); + contribution.incrementWriteSkipCount(outputs.getSkipsSize()); } else { scan(contribution, inputs, outputs, chunkMonitor, false); diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java index 101945bb22..e3eed2d040 100755 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java @@ -309,6 +309,7 @@ protected void write(StepContribution contribution, Chunk inputs, Chunk ou stopTimer(sample, contribution.getStepExecution(), "chunk.write", status, "Chunk writing"); } contribution.incrementWriteCount(outputs.size()); + contribution.incrementWriteSkipCount(outputs.getSkipsSize()); } protected Chunk transform(StepContribution contribution, Chunk inputs) throws Exception { diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessorTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessorTests.java index d30e06917e..4a1dc3149c 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessorTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessorTests.java @@ -230,6 +230,25 @@ void testWriteSkipOnException() throws Exception { assertEquals(0, contribution.getFilterCount()); } + @Test + void testWriteSkipOnIteratorRemove() throws Exception { + processor.setItemWriter(chunk -> { + Chunk.ChunkIterator iterator = chunk.iterator(); + while (iterator.hasNext()) { + String item = iterator.next(); + if (item.equals("skip")) { + iterator.remove((Exception) null); + } + } + }); + Chunk inputs = new Chunk<>(Arrays.asList("3", "skip", "2")); + processor.process(contribution, inputs); + assertEquals(1, contribution.getSkipCount()); + assertEquals(2, contribution.getWriteCount()); + assertEquals(1, contribution.getWriteSkipCount()); + assertEquals(0, contribution.getFilterCount()); + } + @Test void testWriteSkipOnExceptionWithTrivialChunk() throws Exception { processor.setWriteSkipPolicy(new AlwaysSkipItemSkipPolicy()); diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleChunkProcessorTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleChunkProcessorTests.java index 5ebcb49ced..54cf58f8aa 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleChunkProcessorTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleChunkProcessorTests.java @@ -51,7 +51,15 @@ public void write(Chunk chunk) throws Exception { if (chunk.getItems().contains("fail")) { throw new RuntimeException("Planned failure!"); } - list.addAll(chunk.getItems()); + Chunk.ChunkIterator iterator = chunk.iterator(); + while (iterator.hasNext()) { + String item = iterator.next(); + if (item.equals("skip")) { + iterator.remove((Exception) null); + } else { + list.add(item); + } + } } }); @@ -88,4 +96,15 @@ void testTransform() throws Exception { assertTrue(outputs.isEnd()); } + @Test + void testWriteWithSkip() throws Exception { + Chunk inputs = new Chunk<>(); + inputs.add("foo"); + inputs.add("skip"); + inputs.add("bar"); + processor.process(contribution, inputs); + assertEquals(2, contribution.getWriteCount()); + assertEquals(1, contribution.getWriteSkipCount()); + } + } diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/Chunk.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/Chunk.java index 52895ca79d..f5e381e881 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/Chunk.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/Chunk.java @@ -152,6 +152,14 @@ public int size() { return items.size(); } + /** + * @return the number of skipped items + * @since 6.0.0 + */ + public int getSkipsSize() { + return skips.size(); + } + /** * Flag to indicate if the source data is exhausted. *