46
46
import java .util .PriorityQueue ;
47
47
import java .util .Set ;
48
48
import java .util .UUID ;
49
+ import java .util .concurrent .ThreadFactory ;
49
50
import java .util .concurrent .TimeUnit ;
50
51
import java .util .concurrent .atomic .AtomicLong ;
51
52
import java .util .concurrent .atomic .AtomicReference ;
52
53
import java .util .function .Supplier ;
53
54
import java .util .zip .GZIPOutputStream ;
54
55
import javax .annotation .Nullable ;
56
+ import javax .annotation .concurrent .GuardedBy ;
55
57
56
58
/**
57
59
* Blaze internal profiler. Provides facility to report various Blaze tasks and store them
@@ -274,8 +276,7 @@ private static final class SlowestTaskAggregator {
274
276
275
277
// @ThreadSafe
276
278
void add (TaskData taskData ) {
277
- Extrema <SlowTask > extrema =
278
- extremaAggregators [(int ) (Thread .currentThread ().getId () % SHARDS )];
279
+ Extrema <SlowTask > extrema = extremaAggregators [(int ) (taskData .threadId % SHARDS )];
279
280
synchronized (extrema ) {
280
281
extrema .aggregate (new SlowTask (taskData ));
281
282
}
@@ -659,12 +660,7 @@ private void logTask(
659
660
*/
660
661
public void logSimpleTask (long startTimeNanos , ProfilerTask type , String description ) {
661
662
if (clock != null ) {
662
- logTask (
663
- Thread .currentThread ().getId (),
664
- startTimeNanos ,
665
- clock .nanoTime () - startTimeNanos ,
666
- type ,
667
- description );
663
+ logTask (getLaneId (), startTimeNanos , clock .nanoTime () - startTimeNanos , type , description );
668
664
}
669
665
}
670
666
@@ -682,12 +678,7 @@ public void logSimpleTask(long startTimeNanos, ProfilerTask type, String descrip
682
678
*/
683
679
public void logSimpleTask (
684
680
long startTimeNanos , long stopTimeNanos , ProfilerTask type , String description ) {
685
- logTask (
686
- Thread .currentThread ().getId (),
687
- startTimeNanos ,
688
- stopTimeNanos - startTimeNanos ,
689
- type ,
690
- description );
681
+ logTask (getLaneId (), startTimeNanos , stopTimeNanos - startTimeNanos , type , description );
691
682
}
692
683
693
684
/**
@@ -702,12 +693,12 @@ public void logSimpleTask(
702
693
*/
703
694
public void logSimpleTaskDuration (
704
695
long startTimeNanos , Duration duration , ProfilerTask type , String description ) {
705
- logTask (Thread . currentThread (). getId (), startTimeNanos , duration .toNanos (), type , description );
696
+ logTask (getLaneId (), startTimeNanos , duration .toNanos (), type , description );
706
697
}
707
698
708
699
/** Used to log "events" happening at a specific time - tasks with zero duration. */
709
700
public void logEventAtTime (long atTimeNanos , ProfilerTask type , String description ) {
710
- logTask (Thread . currentThread (). getId (), atTimeNanos , 0 , type , description );
701
+ logTask (getLaneId (), atTimeNanos , 0 , type , description );
711
702
}
712
703
713
704
/** Used to log "events" - tasks with zero duration. */
@@ -741,7 +732,7 @@ private SilentCloseable reallyProfile(long laneId, ProfilerTask type, String des
741
732
* @param description task description. May be stored until the end of the build.
742
733
*/
743
734
public SilentCloseable profile (ProfilerTask type , String description ) {
744
- return profile (Thread . currentThread (). getId (), type , description );
735
+ return profile (getLaneId (), type , description );
745
736
}
746
737
747
738
private SilentCloseable profile (long laneId , ProfilerTask type , String description ) {
@@ -753,7 +744,7 @@ private SilentCloseable profile(long laneId, ProfilerTask type, String descripti
753
744
* profiling.
754
745
*/
755
746
public SilentCloseable profile (ProfilerTask type , Supplier <String > description ) {
756
- return profile (Thread . currentThread (). getId (), type , description );
747
+ return profile (getLaneId (), type , description );
757
748
}
758
749
759
750
private SilentCloseable profile (long laneId , ProfilerTask type , Supplier <String > description ) {
@@ -799,7 +790,7 @@ public SilentCloseable profileAction(
799
790
final long startTimeNanos = clock .nanoTime ();
800
791
return () ->
801
792
completeAction (
802
- Thread . currentThread (). getId (),
793
+ getLaneId (),
803
794
startTimeNanos ,
804
795
type ,
805
796
description ,
@@ -824,7 +815,7 @@ private boolean countAction(ProfilerTask type, TaskData taskData) {
824
815
}
825
816
826
817
public void completeTask (long startTimeNanos , ProfilerTask type , String description ) {
827
- completeTask (Thread . currentThread (). getId (), startTimeNanos , type , description );
818
+ completeTask (getLaneId (), startTimeNanos , type , description );
828
819
}
829
820
830
821
/** Records the end of the task. */
@@ -969,72 +960,145 @@ public interface FutureSupplier<T> {
969
960
}
970
961
971
962
public <T > ListenableFuture <T > profileAsync (
972
- ProfilerTaskType type , String description , FutureSupplier <T > futureSupplier ) {
963
+ String prefix , String description , FutureSupplier <T > futureSupplier ) {
973
964
if (!(isActive () && isProfiling (ProfilerTask .INFO ))) {
974
965
return futureSupplier .get (new ScopedProfiler (/* active= */ false , 0 ));
975
966
}
976
967
977
- long laneId = laneIdGenerator .acquire (type );
968
+ var lane = multiLaneGenerator .acquire (prefix );
978
969
final long startTimeNanos = clock .nanoTime ();
979
- var scopedProfiler = new ScopedProfiler (/* active= */ true , laneId );
970
+ var scopedProfiler = new ScopedProfiler (/* active= */ true , lane . id () );
980
971
var future = futureSupplier .get (scopedProfiler );
981
972
future .addListener (
982
973
() -> {
983
974
long endTimeNanos = clock .nanoTime ();
984
975
long duration = endTimeNanos - startTimeNanos ;
985
976
recordTask (
986
- new TaskData (laneId , startTimeNanos , duration , ProfilerTask .INFO , description ));
987
- laneIdGenerator .release (type , laneId );
977
+ new TaskData (lane . id () , startTimeNanos , duration , ProfilerTask .INFO , description ));
978
+ multiLaneGenerator .release (prefix , lane );
988
979
},
989
980
MoreExecutors .directExecutor ());
990
981
return future ;
991
982
}
992
983
993
- private static final long LANE_ID_BASE = 1_000_000 ;
994
- private final AtomicLong nextLaneId = new AtomicLong (LANE_ID_BASE );
995
- private final TaskTypeLaneIdGenerator laneIdGenerator = new TaskTypeLaneIdGenerator ();
984
+ private final ThreadLocal <String > virtualThreadPrefix = ThreadLocal .withInitial (() -> null );
985
+ private final ThreadLocal <Lane > borrowedLane = ThreadLocal .withInitial (() -> null );
996
986
997
- private class TaskTypeLaneIdGenerator {
998
- private final Map <ProfilerTaskType , LaneIdGenerator > typeToLaneIdGenerator =
999
- Maps .newConcurrentMap ();
987
+ private void registerVirtualThread (String prefix ) {
988
+ var thread = Thread .currentThread ();
989
+ var threadId = thread .threadId ();
990
+ virtualThreadPrefix .set (prefix );
991
+ thread .setName (prefix + threadId );
992
+ }
993
+
994
+ private void deregisterVirtualThread () {
995
+ var prefix = checkNotNull (virtualThreadPrefix .get ());
996
+ virtualThreadPrefix .remove ();
997
+ var lane = borrowedLane .get ();
998
+ if (lane != null ) {
999
+ borrowedLane .remove ();
1000
+ multiLaneGenerator .release (prefix , lane );
1001
+ }
1002
+ }
1003
+
1004
+ private final AtomicLong nextLaneId = new AtomicLong (1_000_000 );
1005
+ private final MultiLaneGenerator multiLaneGenerator = new MultiLaneGenerator ();
1006
+
1007
+ private class MultiLaneGenerator {
1008
+ private final Map <String , LaneGenerator > laneGenerators = Maps .newConcurrentMap ();
1000
1009
1001
- public long acquire (ProfilerTaskType type ) {
1002
- var laneIdGenerator =
1003
- typeToLaneIdGenerator .computeIfAbsent (type , unused -> new LaneIdGenerator (type ));
1004
- return laneIdGenerator .acquire ();
1010
+ private Lane acquire (String prefix ) {
1011
+ checkState (isActive ());
1012
+ var laneGenerator =
1013
+ laneGenerators .computeIfAbsent (prefix , unused -> new LaneGenerator (prefix ));
1014
+ return laneGenerator .acquire ();
1005
1015
}
1006
1016
1007
- public void release (ProfilerTaskType type , long laneId ) {
1008
- var laneIdGenerator = checkNotNull (typeToLaneIdGenerator .get (type ));
1009
- laneIdGenerator .release (laneId );
1017
+ private void release (String prefix , Lane lane ) {
1018
+ checkState (isActive ());
1019
+ var laneGenerator = checkNotNull (laneGenerators .get (prefix ));
1020
+ laneGenerator .release (lane );
1010
1021
}
1011
1022
}
1012
1023
1013
- private class LaneIdGenerator {
1014
- private final ProfilerTaskType type ;
1015
- private final PriorityQueue <Long > availableLaneIds = new PriorityQueue <>();
1024
+ private record Lane (long id ) implements Comparable <Lane > {
1025
+ @ Override
1026
+ public int compareTo (Lane o ) {
1027
+ return Long .compare (id , o .id );
1028
+ }
1029
+ }
1030
+
1031
+ private class LaneGenerator {
1032
+ private final String prefix ;
1033
+
1034
+ @ GuardedBy ("this" )
1035
+ private final PriorityQueue <Lane > availableLanes = new PriorityQueue <>();
1016
1036
1037
+ @ GuardedBy ("this" )
1017
1038
private int count = 0 ;
1018
1039
1019
- private LaneIdGenerator ( ProfilerTaskType type ) {
1020
- this .type = type ;
1040
+ private LaneGenerator ( String prefix ) {
1041
+ this .prefix = prefix ;
1021
1042
}
1022
1043
1023
- public synchronized long acquire () {
1024
- if (!availableLaneIds .isEmpty ()) {
1025
- return availableLaneIds .poll ();
1044
+ public Lane acquire () {
1045
+ long newLaneId ;
1046
+ String newLaneName ;
1047
+ synchronized (this ) {
1048
+ if (!availableLanes .isEmpty ()) {
1049
+ return availableLanes .poll ();
1050
+ }
1051
+
1052
+ newLaneId = nextLaneId .getAndIncrement ();
1053
+ int newLaneIndex = count ++;
1054
+ newLaneName = prefix + newLaneIndex + " (Virtual)" ;
1026
1055
}
1027
- var newLaneId = Profiler . this . nextLaneId . getAndIncrement ();
1028
- var threadMetadata = new ThreadMetadata (type . getName ( count ++) , newLaneId , LANE_ID_BASE );
1056
+
1057
+ var threadMetadata = new ThreadMetadata (newLaneName , newLaneId );
1029
1058
var writer = Profiler .this .writerRef .get ();
1030
1059
if (writer != null ) {
1031
1060
writer .enqueue (threadMetadata );
1032
1061
}
1033
- return newLaneId ;
1062
+ return new Lane ( newLaneId ) ;
1034
1063
}
1035
1064
1036
- public synchronized void release (long laneId ) {
1037
- availableLaneIds .add (laneId );
1065
+ public synchronized void release (Lane lane ) {
1066
+ availableLanes .add (lane );
1038
1067
}
1039
1068
}
1069
+
1070
+ private long getLaneId () {
1071
+ var currentThread = Thread .currentThread ();
1072
+ var threadId = currentThread .threadId ();
1073
+ if (!currentThread .isVirtual ()) {
1074
+ return threadId ;
1075
+ }
1076
+
1077
+ var lane = borrowedLane .get ();
1078
+ if (lane == null ) {
1079
+ var prefix = virtualThreadPrefix .get ();
1080
+ checkNotNull (
1081
+ prefix ,
1082
+ "Current virtual thread is not registered. Did you use"
1083
+ + " Profiler#profileableVirtualThreadFactor to create a VirtualThread?" );
1084
+ lane = multiLaneGenerator .acquire (prefix );
1085
+ borrowedLane .set (lane );
1086
+ }
1087
+ return lane .id ();
1088
+ }
1089
+
1090
+ public ThreadFactory profileableVirtualThreadFactory (String prefix ) {
1091
+ return r ->
1092
+ Thread .ofVirtual ()
1093
+ .unstarted (
1094
+ () -> {
1095
+ var profiler = Profiler .instance ();
1096
+ profiler .registerVirtualThread (prefix );
1097
+ try {
1098
+ r .run ();
1099
+ } finally {
1100
+ profiler .deregisterVirtualThread ();
1101
+ }
1102
+ });
1103
+ }
1040
1104
}
0 commit comments