26
26
import com .google .devtools .build .lib .profiler .AutoProfiler ;
27
27
import com .google .devtools .build .lib .profiler .Profiler ;
28
28
import com .google .devtools .build .lib .profiler .ProfilerTask ;
29
- import com .google .devtools .build .lib .util .Pair ;
30
29
import com .google .devtools .build .lib .worker .Worker ;
31
30
import com .google .devtools .build .lib .worker .WorkerKey ;
32
31
import com .google .devtools .build .lib .worker .WorkerPool ;
@@ -200,20 +199,22 @@ public double getUsedCPU() {
200
199
ImmutableMap .of (ResourceSet .CPU , 0.6 );
201
200
private static final int MAX_ACTIONS_PER_CPU = 3 ;
202
201
202
+ // Pair of requested resources and latch represented it for waiting.
203
+ record WaitingRequest (ResourceRequest getResourceRequest , ResourceLatch getResourceLatch ) {}
204
+ ;
205
+
203
206
// Lists of blocked threads. Associated CountDownLatch object will always
204
207
// be initialized to 1 during creation in the acquire() method.
205
208
// We use LinkedList because we will need to remove elements from the middle frequently in the
206
209
// middle of iterating through the list.
207
210
@ SuppressWarnings ("JdkObsolete" )
208
- private final Deque <Pair < ResourceRequest , LatchWithWorker > > localRequests = new LinkedList <>();
211
+ private final Deque <WaitingRequest > localRequests = new LinkedList <>();
209
212
210
213
@ SuppressWarnings ("JdkObsolete" )
211
- private final Deque <Pair <ResourceRequest , LatchWithWorker >> dynamicWorkerRequests =
212
- new LinkedList <>();
214
+ private final Deque <WaitingRequest > dynamicWorkerRequests = new LinkedList <>();
213
215
214
216
@ SuppressWarnings ("JdkObsolete" )
215
- private final Deque <Pair <ResourceRequest , LatchWithWorker >> dynamicStandaloneRequests =
216
- new LinkedList <>();
217
+ private final Deque <WaitingRequest > dynamicStandaloneRequests = new LinkedList <>();
217
218
218
219
private WorkerPool workerPool ;
219
220
@@ -290,7 +291,7 @@ public void run() {
290
291
synchronized void windowUpdate () throws IOException , InterruptedException {
291
292
windowRequestIds .clear ();
292
293
windowEstimationCpu = 0.0 ;
293
- processAllWaitingThreads ();
294
+ processAllWaitingRequests ();
294
295
}
295
296
296
297
@ VisibleForTesting
@@ -306,14 +307,14 @@ public static ResourceManager instanceForTestingOnly() {
306
307
public synchronized void resetResourceUsage () {
307
308
usedResources = new HashMap <>();
308
309
usedLocalTestCount = 0 ;
309
- for (Pair < ResourceRequest , LatchWithWorker > request : localRequests ) {
310
- request .second . latch .countDown ();
310
+ for (WaitingRequest request : localRequests ) {
311
+ request .getResourceLatch (). getLatch () .countDown ();
311
312
}
312
- for (Pair < ResourceRequest , LatchWithWorker > request : dynamicWorkerRequests ) {
313
- request .second . latch .countDown ();
313
+ for (WaitingRequest request : dynamicWorkerRequests ) {
314
+ request .getResourceLatch (). getLatch () .countDown ();
314
315
}
315
- for (Pair < ResourceRequest , LatchWithWorker > request : dynamicStandaloneRequests ) {
316
- request .second . latch .countDown ();
316
+ for (WaitingRequest request : dynamicStandaloneRequests ) {
317
+ request .getResourceLatch (). getLatch () .countDown ();
317
318
}
318
319
localRequests .clear ();
319
320
dynamicWorkerRequests .clear ();
@@ -375,29 +376,29 @@ public ResourceHandle acquireResources(
375
376
Preconditions .checkState (
376
377
!threadHasResources (), "acquireResources with existing resource lock during %s" , owner );
377
378
378
- LatchWithWorker latchWithWorker = null ;
379
+ ResourceLatch resourceLatch = null ;
379
380
380
381
ResourceRequest request =
381
382
new ResourceRequest (owner , resources , priority , requestIdGenerator .getAndIncrement ());
382
383
383
384
AutoProfiler p =
384
385
profiled ("Acquiring resources for: " + owner .describe (), ProfilerTask .ACTION_LOCK );
385
386
try {
386
- latchWithWorker = acquire (request );
387
- if (latchWithWorker . latch != null ) {
388
- latchWithWorker . latch .await ();
387
+ resourceLatch = acquire (request );
388
+ if (resourceLatch . getLatch () != null ) {
389
+ resourceLatch . getLatch () .await ();
389
390
}
390
391
} catch (InterruptedException e ) {
391
- // Synchronize on this to avoid any racing with #processWaitingThreads
392
+ // Synchronize on this to avoid any racing with #processWaitingRequests
392
393
synchronized (this ) {
393
- if (latchWithWorker != null ) {
394
- if (latchWithWorker . latch == null || latchWithWorker . latch .getCount () == 0 ) {
394
+ if (resourceLatch != null ) {
395
+ if (resourceLatch . getLatch () == null || resourceLatch . getLatch () .getCount () == 0 ) {
395
396
// Resources already acquired by other side. Release them, but not inside this
396
397
// synchronized block to avoid deadlock.
397
- release (request , latchWithWorker . worker );
398
+ release (request , resourceLatch . getWorker () );
398
399
} else {
399
400
// Inform other side that resources shouldn't be acquired.
400
- latchWithWorker . latch .countDown ();
401
+ resourceLatch . getLatch () .countDown ();
401
402
}
402
403
}
403
404
}
@@ -409,8 +410,8 @@ public ResourceHandle acquireResources(
409
410
CountDownLatch latch ;
410
411
Worker worker ;
411
412
synchronized (this ) {
412
- latch = latchWithWorker . latch ;
413
- worker = latchWithWorker . worker ;
413
+ latch = resourceLatch . getLatch () ;
414
+ worker = resourceLatch . getWorker () ;
414
415
}
415
416
416
417
// Profile acquisition only if it waited for resource to become available.
@@ -510,30 +511,30 @@ public void acquireResourceOwnership() {
510
511
* resources. The latch isn't null if we could not acquire the resources right now and need to
511
512
* wait.
512
513
*/
513
- private synchronized LatchWithWorker acquire (ResourceRequest request )
514
+ private synchronized ResourceLatch acquire (ResourceRequest request )
514
515
throws IOException , InterruptedException {
515
516
if (areResourcesAvailable (request .getResourceSet ())) {
516
517
Worker worker = incrementResources (request );
517
- return new LatchWithWorker (/* latch= */ null , worker );
518
+ return new ResourceLatch (/* latch= */ null , worker );
518
519
}
519
- Pair < ResourceRequest , LatchWithWorker > requestWithLatch =
520
- new Pair <> (request , new LatchWithWorker (new CountDownLatch (1 ), /* worker= */ null ));
520
+ WaitingRequest waitingRequest =
521
+ new WaitingRequest (request , new ResourceLatch (new CountDownLatch (1 ), /* worker= */ null ));
521
522
switch (request .getPriority ()) {
522
523
case LOCAL :
523
- localRequests .addLast (requestWithLatch );
524
+ localRequests .addLast (waitingRequest );
524
525
break ;
525
526
case DYNAMIC_WORKER :
526
527
// Dynamic requests should be LIFO, because we are more likely to win the race on newer
527
528
// actions.
528
- dynamicWorkerRequests .addFirst (requestWithLatch );
529
+ dynamicWorkerRequests .addFirst (waitingRequest );
529
530
break ;
530
531
case DYNAMIC_STANDALONE :
531
532
// Dynamic requests should be LIFO, because we are more likely to win the race on newer
532
533
// actions.
533
- dynamicStandaloneRequests .addFirst (requestWithLatch );
534
+ dynamicStandaloneRequests .addFirst (waitingRequest );
534
535
break ;
535
536
}
536
- return requestWithLatch . second ;
537
+ return waitingRequest . getResourceLatch () ;
537
538
}
538
539
539
540
/**
@@ -571,38 +572,38 @@ private synchronized boolean release(ResourceRequest request, @Nullable Worker w
571
572
}
572
573
runningActions --;
573
574
574
- return processAllWaitingThreads ();
575
+ return processAllWaitingRequests ();
575
576
}
576
577
577
578
@ CanIgnoreReturnValue
578
- private synchronized boolean processAllWaitingThreads () throws IOException , InterruptedException {
579
+ private synchronized boolean processAllWaitingRequests ()
580
+ throws IOException , InterruptedException {
579
581
boolean anyProcessed = false ;
580
582
if (!localRequests .isEmpty ()) {
581
- processWaitingThreads (localRequests );
583
+ processWaitingRequests (localRequests );
582
584
anyProcessed = true ;
583
585
}
584
586
if (!dynamicWorkerRequests .isEmpty ()) {
585
- processWaitingThreads (dynamicWorkerRequests );
587
+ processWaitingRequests (dynamicWorkerRequests );
586
588
anyProcessed = true ;
587
589
}
588
590
if (!dynamicStandaloneRequests .isEmpty ()) {
589
- processWaitingThreads (dynamicStandaloneRequests );
591
+ processWaitingRequests (dynamicStandaloneRequests );
590
592
anyProcessed = true ;
591
593
}
592
594
return anyProcessed ;
593
595
}
594
596
595
- private synchronized void processWaitingThreads (
596
- Deque <Pair <ResourceRequest , LatchWithWorker >> requests )
597
+ private synchronized void processWaitingRequests (Deque <WaitingRequest > requests )
597
598
throws IOException , InterruptedException {
598
- Iterator <Pair < ResourceRequest , LatchWithWorker > > iterator = requests .iterator ();
599
+ Iterator <WaitingRequest > iterator = requests .iterator ();
599
600
while (iterator .hasNext ()) {
600
- Pair < ResourceRequest , LatchWithWorker > request = iterator .next ();
601
- if (request .second . latch .getCount () != 0 ) {
602
- if (areResourcesAvailable (request .first .getResourceSet ())) {
603
- Worker worker = incrementResources (request .first );
604
- request .second . worker = worker ;
605
- request .second . latch .countDown ();
601
+ WaitingRequest request = iterator .next ();
602
+ if (request .getResourceLatch (). getLatch () .getCount () != 0 ) {
603
+ if (areResourcesAvailable (request .getResourceRequest () .getResourceSet ())) {
604
+ Worker worker = incrementResources (request .getResourceRequest () );
605
+ request .getResourceLatch (). setWorker ( worker ) ;
606
+ request .getResourceLatch (). getLatch () .countDown ();
606
607
iterator .remove ();
607
608
}
608
609
} else {
@@ -716,13 +717,27 @@ synchronized int getWaitCount() {
716
717
return localRequests .size () + dynamicStandaloneRequests .size () + dynamicWorkerRequests .size ();
717
718
}
718
719
719
- private static class LatchWithWorker {
720
- public final CountDownLatch latch ;
721
- public Worker worker ;
720
+ // Latch which indicates the availability of resources. Also via this latch worker could be passed
721
+ // when it's ready.
722
+ private static class ResourceLatch {
723
+ private final CountDownLatch latch ;
724
+ private Worker worker ;
722
725
723
- public LatchWithWorker (CountDownLatch latch , Worker worker ) {
726
+ public ResourceLatch (CountDownLatch latch , Worker worker ) {
724
727
this .latch = latch ;
725
728
this .worker = worker ;
726
729
}
730
+
731
+ public CountDownLatch getLatch () {
732
+ return latch ;
733
+ }
734
+
735
+ public Worker getWorker () {
736
+ return worker ;
737
+ }
738
+
739
+ public void setWorker (Worker worker ) {
740
+ this .worker = worker ;
741
+ }
727
742
}
728
743
}
0 commit comments