Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 2028e5a

Browse files
Eyal Faragohvanhovell
authored andcommitted
[SPARK-21907][CORE] oom during spill
## What changes were proposed in this pull request? 1. a test reproducing [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907) 2. a fix for the root cause of the issue. `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill` calls `org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset` which may trigger another spill, when this happens the `array` member is already de-allocated but still referenced by the code, this causes the nested spill to fail with an NPE in `org.apache.spark.memory.TaskMemoryManager.getPage`. This patch introduces a reproduction in a test case and a fix, the fix simply sets the in-mem sorter's array member to an empty array before actually performing the allocation. This prevents the spilling code from 'touching' the de-allocated array. ## How was this patch tested? introduced a new test case: `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite#testOOMDuringSpill`. Author: Eyal Farago <[email protected]> Closes apache#19181 from eyalfa/SPARK-21907__oom_during_spill.
1 parent 633ffd8 commit 2028e5a

File tree

5 files changed

+102
-5
lines changed

5 files changed

+102
-5
lines changed

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,10 @@ public UnsafeSorterIterator getSortedIterator() throws IOException {
480480
}
481481
}
482482

483+
@VisibleForTesting boolean hasSpaceForAnotherRecord() {
484+
return inMemSorter.hasSpaceForAnotherRecord();
485+
}
486+
483487
private static void spillIterator(UnsafeSorterIterator inMemIterator,
484488
UnsafeSorterSpillWriter spillWriter) throws IOException {
485489
while (inMemIterator.hasNext()) {

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,14 +162,24 @@ private int getUsableCapacity() {
162162
*/
163163
public void free() {
164164
if (consumer != null) {
165-
consumer.freeArray(array);
165+
if (array != null) {
166+
consumer.freeArray(array);
167+
}
166168
array = null;
167169
}
168170
}
169171

170172
public void reset() {
171173
if (consumer != null) {
172174
consumer.freeArray(array);
175+
// the call to consumer.allocateArray may trigger a spill
176+
// which in turn access this instance and eventually re-enter this method and try to free the array again.
177+
// by setting the array to null and its length to 0 we effectively make the spill code-path a no-op.
178+
// setting the array to null also indicates that it has already been de-allocated which prevents a double de-allocation in free().
179+
array = null;
180+
usableCapacity = 0;
181+
pos = 0;
182+
nullBoundaryPos = 0;
173183
array = consumer.allocateArray(initialSize);
174184
usableCapacity = getUsableCapacity();
175185
}

core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.LinkedList;
2424
import java.util.UUID;
2525

26+
import org.hamcrest.Matchers;
2627
import scala.Tuple2$;
2728

2829
import org.junit.After;
@@ -503,6 +504,38 @@ public void testGetIterator() throws Exception {
503504
verifyIntIterator(sorter.getIterator(279), 279, 300);
504505
}
505506

507+
@Test
508+
public void testOOMDuringSpill() throws Exception {
509+
final UnsafeExternalSorter sorter = newSorter();
510+
// we assume that given default configuration,
511+
// the size of the data we insert to the sorter (ints)
512+
// and assuming we shouldn't spill before pointers array is exhausted
513+
// (memory manager is not configured to throw at this point)
514+
// - so this loop runs a reasonable number of iterations (<2000).
515+
// test indeed completed within <30ms (on a quad i7 laptop).
516+
for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
517+
insertNumber(sorter, i);
518+
}
519+
// we expect the next insert to attempt growing the pointerssArray
520+
// first allocation is expected to fail, then a spill is triggered which attempts another allocation
521+
// which also fails and we expect to see this OOM here.
522+
// the original code messed with a released array within the spill code
523+
// and ended up with a failed assertion.
524+
// we also expect the location of the OOM to be org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
525+
memoryManager.markconsequentOOM(2);
526+
try {
527+
insertNumber(sorter, 1024);
528+
fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
529+
}
530+
// we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure)
531+
catch (OutOfMemoryError oom){
532+
String oomStackTrace = Utils.exceptionString(oom);
533+
assertThat("expected OutOfMemoryError in org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset",
534+
oomStackTrace,
535+
Matchers.containsString("org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset"));
536+
}
537+
}
538+
506539
private void verifyIntIterator(UnsafeSorterIterator iter, int start, int end)
507540
throws IOException {
508541
for (int i = start; i < end; i++) {

core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3636
import static org.hamcrest.Matchers.isIn;
3737
import static org.junit.Assert.assertEquals;
38+
import static org.junit.Assert.fail;
3839
import static org.mockito.Mockito.mock;
3940

4041
public class UnsafeInMemorySorterSuite {
@@ -139,4 +140,49 @@ public int compare(
139140
}
140141
assertEquals(dataToSort.length, iterLength);
141142
}
143+
144+
@Test
145+
public void freeAfterOOM() {
146+
final SparkConf sparkConf = new SparkConf();
147+
sparkConf.set("spark.memory.offHeap.enabled", "false");
148+
149+
final TestMemoryManager testMemoryManager =
150+
new TestMemoryManager(sparkConf);
151+
final TaskMemoryManager memoryManager = new TaskMemoryManager(
152+
testMemoryManager, 0);
153+
final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
154+
final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer);
155+
final Object baseObject = dataPage.getBaseObject();
156+
// Write the records into the data page:
157+
long position = dataPage.getBaseOffset();
158+
159+
final HashPartitioner hashPartitioner = new HashPartitioner(4);
160+
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
161+
final PrefixComparator prefixComparator = PrefixComparators.LONG;
162+
final RecordComparator recordComparator = new RecordComparator() {
163+
@Override
164+
public int compare(
165+
Object leftBaseObject,
166+
long leftBaseOffset,
167+
Object rightBaseObject,
168+
long rightBaseOffset) {
169+
return 0;
170+
}
171+
};
172+
UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager,
173+
recordComparator, prefixComparator, 100, shouldUseRadixSort());
174+
175+
testMemoryManager.markExecutionAsOutOfMemoryOnce();
176+
try {
177+
sorter.reset();
178+
fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
179+
} catch (OutOfMemoryError oom) {
180+
// as expected
181+
}
182+
// [SPARK-21907] this failed on NPE at org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108)
183+
sorter.free();
184+
// simulate a 'back to back' free.
185+
sorter.free();
186+
}
187+
142188
}

core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ class TestMemoryManager(conf: SparkConf)
2727
numBytes: Long,
2828
taskAttemptId: Long,
2929
memoryMode: MemoryMode): Long = {
30-
if (oomOnce) {
31-
oomOnce = false
30+
if (consequentOOM > 0) {
31+
consequentOOM -= 1
3232
0
3333
} else if (available >= numBytes) {
3434
available -= numBytes
@@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf)
5858

5959
override def maxOffHeapStorageMemory: Long = 0L
6060

61-
private var oomOnce = false
61+
private var consequentOOM = 0
6262
private var available = Long.MaxValue
6363

6464
def markExecutionAsOutOfMemoryOnce(): Unit = {
65-
oomOnce = true
65+
markconsequentOOM(1)
66+
}
67+
68+
def markconsequentOOM(n : Int) : Unit = {
69+
consequentOOM += n
6670
}
6771

6872
def limit(avail: Long): Unit = {

0 commit comments

Comments
 (0)