Skip to content

Commit 1bccf56

Browse files
committed
balloons: add PreferSpreadOnPhysicalCores
- Enable using different CPU allocators in different balloon types. So far the same CPU allocator was used with the same options for all CPU allocations in the policy. - Add new PreferSpreadOnPhysicalCores option to toggle CPU allocations packing or spreading on physical CPU cores. This option is specific to balloon type with a policy level default. - Make existing AllocatorTopologyBalancing option specific to balloon type, too.
1 parent 2c11d0a commit 1bccf56

File tree

7 files changed

+434
-31
lines changed

7 files changed

+434
-31
lines changed

docs/policy/balloons.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,16 @@ Balloons policy parameters:
8585
pack new balloons tightly into the same NUMAs/dies/packages. This
8686
helps keeping large portions of hardware idle and entering into deep
8787
power saving states.
88+
- `PreferSpreadOnPhysicalCores` prefers allocating logical CPUs
89+
(possibly hyperthreads) for a balloon from separate physical CPU
90+
cores. This prevents workloads in the balloon from interfering with
91+
themselves as they do not compete on the resources of the same CPU
92+
cores. On the other hand, it allows more interference between
93+
workloads in different balloons. The default is `false`: balloons
94+
are packed tightly to a minimum number of physical CPU cores. The
95+
value set here is the default for all balloon types, but it can be
96+
overridden with the balloon type specific setting with the same
97+
name.
8898
- `BalloonTypes` is a list of balloon type definitions. Each type can
8999
be configured with the following parameters:
90100
- `Name` of the balloon type. This is used in pod annotations to
@@ -135,6 +145,8 @@ Balloons policy parameters:
135145
- `numa`: ...in the same numa node(s) as the balloon.
136146
- `core`: ...allowed to use idle CPU threads in the same cores with
137147
the balloon.
148+
- `PreferSpreadOnPhysicalCores` overrides the policy level option
149+
with the same name in the scope of this balloon type.
138150
- `AllocatorPriority` (0: High, 1: Normal, 2: Low, 3: None). CPU
139151
allocator parameter, used when creating new or resizing existing
140152
balloons. If there are balloon types with pre-created balloons

pkg/cri/resource-manager/policy/builtin/balloons/balloons-policy.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ type Balloon struct {
9292
// - len(PodIDs) is the number of pods in the balloon.
9393
// - len(PodIDs[podID]) is the number of containers of podID
9494
// currently assigned to the balloon.
95-
PodIDs map[string][]string
95+
PodIDs map[string][]string
96+
cpuTreeAllocator *cpuTreeAllocator
9697
}
9798

9899
var log logger.Logger = logger.NewLogger("policy")
@@ -544,14 +545,32 @@ func (p *balloons) newBalloon(blnDef *BalloonDef, confCpus bool) (*Balloon, erro
544545
break
545546
}
546547
}
548+
// Configure new cpuTreeAllocator for this balloon if there
549+
// are type specific allocator options, otherwise use policy
550+
// default allocator.
551+
cpuTreeAllocator := p.cpuTreeAllocator
552+
if blnDef.AllocatorTopologyBalancing != nil || blnDef.PreferSpreadOnPhysicalCores != nil {
553+
allocatorOptions := cpuTreeAllocatorOptions{
554+
topologyBalancing: p.bpoptions.AllocatorTopologyBalancing,
555+
preferSpreadOnPhysicalCores: p.bpoptions.PreferSpreadOnPhysicalCores,
556+
}
557+
if blnDef.AllocatorTopologyBalancing != nil {
558+
allocatorOptions.topologyBalancing = *blnDef.AllocatorTopologyBalancing
559+
}
560+
if blnDef.PreferSpreadOnPhysicalCores != nil {
561+
allocatorOptions.preferSpreadOnPhysicalCores = *blnDef.PreferSpreadOnPhysicalCores
562+
}
563+
cpuTreeAllocator = p.cpuTree.NewAllocator(allocatorOptions)
564+
}
565+
547566
// Allocate CPUs
548567
if blnDef == p.reservedBalloonDef ||
549568
(blnDef == p.defaultBalloonDef && blnDef.MinCpus == 0 && blnDef.MaxCpus == 0) {
550569
// The reserved balloon uses ReservedResources CPUs.
551570
// So does the default balloon unless its CPU counts are tweaked.
552571
cpus = p.reserved
553572
} else {
554-
addFromCpus, _, err := p.cpuTreeAllocator.ResizeCpus(cpuset.New(), p.freeCpus, blnDef.MinCpus)
573+
addFromCpus, _, err := cpuTreeAllocator.ResizeCpus(cpuset.New(), p.freeCpus, blnDef.MinCpus)
555574
if err != nil {
556575
return nil, balloonsError("failed to choose a cpuset for allocating first %d CPUs from %#s", blnDef.MinCpus, p.freeCpus)
557576
}
@@ -562,12 +581,13 @@ func (p *balloons) newBalloon(blnDef *BalloonDef, confCpus bool) (*Balloon, erro
562581
p.freeCpus = p.freeCpus.Difference(cpus)
563582
}
564583
bln := &Balloon{
565-
Def: blnDef,
566-
Instance: freeInstance,
567-
PodIDs: make(map[string][]string),
568-
Cpus: cpus,
569-
SharedIdleCpus: cpuset.New(),
570-
Mems: p.closestMems(cpus),
584+
Def: blnDef,
585+
Instance: freeInstance,
586+
PodIDs: make(map[string][]string),
587+
Cpus: cpus,
588+
SharedIdleCpus: cpuset.New(),
589+
Mems: p.closestMems(cpus),
590+
cpuTreeAllocator: cpuTreeAllocator,
571591
}
572592
if confCpus {
573593
if err = p.useCpuClass(bln); err != nil {
@@ -1046,7 +1066,8 @@ func (p *balloons) setConfig(bpoptions *BalloonsOptions) error {
10461066
p.freeCpus = p.allowed.Clone()
10471067
p.freeCpus = p.freeCpus.Difference(p.reserved)
10481068
p.cpuTreeAllocator = p.cpuTree.NewAllocator(cpuTreeAllocatorOptions{
1049-
topologyBalancing: bpoptions.AllocatorTopologyBalancing,
1069+
topologyBalancing: bpoptions.AllocatorTopologyBalancing,
1070+
preferSpreadOnPhysicalCores: bpoptions.PreferSpreadOnPhysicalCores,
10501071
})
10511072
// Instantiate built-in reserved and default balloons.
10521073
reservedBalloon, err := p.newBalloon(p.reservedBalloonDef, false)
@@ -1153,7 +1174,7 @@ func (p *balloons) resizeBalloon(bln *Balloon, newMilliCpus int) error {
11531174
defer p.useCpuClass(bln)
11541175
if cpuCountDelta > 0 {
11551176
// Inflate the balloon.
1156-
addFromCpus, _, err := p.cpuTreeAllocator.ResizeCpus(bln.Cpus, p.freeCpus, cpuCountDelta)
1177+
addFromCpus, _, err := bln.cpuTreeAllocator.ResizeCpus(bln.Cpus, p.freeCpus, cpuCountDelta)
11571178
if err != nil {
11581179
return balloonsError("resize/inflate: failed to choose a cpuset for allocating additional %d CPUs: %w", cpuCountDelta, err)
11591180
}
@@ -1167,7 +1188,7 @@ func (p *balloons) resizeBalloon(bln *Balloon, newMilliCpus int) error {
11671188
p.updatePinning(p.shareIdleCpus(p.freeCpus, newCpus)...)
11681189
} else {
11691190
// Deflate the balloon.
1170-
_, removeFromCpus, err := p.cpuTreeAllocator.ResizeCpus(bln.Cpus, p.freeCpus, cpuCountDelta)
1191+
_, removeFromCpus, err := bln.cpuTreeAllocator.ResizeCpus(bln.Cpus, p.freeCpus, cpuCountDelta)
11711192
if err != nil {
11721193
return balloonsError("resize/deflate: failed to choose a cpuset for releasing %d CPUs: %w", -cpuCountDelta, err)
11731194
}

pkg/cri/resource-manager/policy/builtin/balloons/cputree.go

Lines changed: 180 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ type cpuTreeAllocatorOptions struct {
7979
// topologyBalancing true prefers allocating from branches
8080
// with most free CPUs (spread allocations), while false is
8181
// the opposite (packed allocations).
82-
topologyBalancing bool
82+
topologyBalancing bool
83+
preferSpreadOnPhysicalCores bool
8384
}
8485

8586
// Strings returns topology level as a string
@@ -131,6 +132,19 @@ func (t *cpuTreeNode) String() string {
131132
return fmt.Sprintf("%s%v", t.name, t.children)
132133
}
133134

135+
func (t *cpuTreeNode) PrettyPrint() string {
136+
origDepth := t.Depth()
137+
lines := []string{}
138+
t.DepthFirstWalk(func(tn *cpuTreeNode) error {
139+
lines = append(lines,
140+
fmt.Sprintf("%s%s: %q cpus: %s",
141+
strings.Repeat(" ", (tn.Depth()-origDepth)*4),
142+
tn.level, tn.name, tn.cpus))
143+
return nil
144+
})
145+
return strings.Join(lines, "\n")
146+
}
147+
134148
// String returns cpuTreeNodeAttributes as a string.
135149
func (tna cpuTreeNodeAttributes) String() string {
136150
return fmt.Sprintf("%s{%d,%v,%d,%d}", tna.t.name, tna.depth,
@@ -146,6 +160,34 @@ func NewCpuTree(name string) *cpuTreeNode {
146160
}
147161
}
148162

163+
func (t *cpuTreeNode) CopyTree() *cpuTreeNode {
164+
newNode := t.CopyNode()
165+
newNode.children = make([]*cpuTreeNode, 0, len(t.children))
166+
for _, child := range t.children {
167+
newNode.AddChild(child.CopyTree())
168+
}
169+
return newNode
170+
}
171+
172+
func (t *cpuTreeNode) CopyNode() *cpuTreeNode {
173+
newNode := cpuTreeNode{
174+
name: t.name,
175+
level: t.level,
176+
parent: t.parent,
177+
children: t.children,
178+
cpus: t.cpus,
179+
}
180+
return &newNode
181+
}
182+
183+
// Depth returns the distance from the root node.
184+
func (t *cpuTreeNode) Depth() int {
185+
if t.parent == nil {
186+
return 0
187+
}
188+
return t.parent.Depth() + 1
189+
}
190+
149191
// AddChild adds new child node to a CPU tree node.
150192
func (t *cpuTreeNode) AddChild(child *cpuTreeNode) {
151193
child.parent = t
@@ -165,6 +207,38 @@ func (t *cpuTreeNode) Cpus() cpuset.CPUSet {
165207
return t.cpus
166208
}
167209

210+
// SiblingIndex returns the index of this node among its parents
211+
// children. Returns -1 for the root node, -2 if this node is not
212+
// listed among the children of its parent.
213+
func (t *cpuTreeNode) SiblingIndex() int {
214+
if t.parent == nil {
215+
return -1
216+
}
217+
for idx, child := range t.parent.children {
218+
if child == t {
219+
return idx
220+
}
221+
}
222+
return -2
223+
}
224+
225+
func (t *cpuTreeNode) FindLeafWithCpu(cpu int) *cpuTreeNode {
226+
var found *cpuTreeNode
227+
t.DepthFirstWalk(func(tn *cpuTreeNode) error {
228+
if len(tn.children) > 0 {
229+
return nil
230+
}
231+
for _, cpuHere := range tn.cpus.List() {
232+
if cpu == cpuHere {
233+
found = tn
234+
return WalkStop
235+
}
236+
}
237+
return nil // not found here, no more children to search
238+
})
239+
return found
240+
}
241+
168242
// WalkSkipChildren error returned from a DepthFirstWalk handler
169243
// prevents walking deeper in the tree. The caller of the
170244
// DepthFirstWalk will get no error.
@@ -236,13 +310,18 @@ func NewCpuTreeFromSystem() (*cpuTreeNode, error) {
236310
nodeTree.level = CPUTopologyLevelNuma
237311
dieTree.AddChild(nodeTree)
238312
node := sys.Node(nodeID)
313+
threadsSeen := map[int]struct{}{}
239314
for _, cpuID := range node.CPUSet().List() {
315+
if _, alreadySeen := threadsSeen[cpuID]; alreadySeen {
316+
continue
317+
}
240318
cpuTree := NewCpuTree(fmt.Sprintf("p%dd%dn%dcpu%d", packageID, dieID, nodeID, cpuID))
241319

242320
cpuTree.level = CPUTopologyLevelCore
243321
nodeTree.AddChild(cpuTree)
244322
cpu := sys.CPU(cpuID)
245323
for _, threadID := range cpu.ThreadCPUSet().List() {
324+
threadsSeen[threadID] = struct{}{}
246325
threadTree := NewCpuTree(fmt.Sprintf("p%dd%dn%dcpu%dt%d", packageID, dieID, nodeID, cpuID, threadID))
247326
threadTree.level = CPUTopologyLevelThread
248327
cpuTree.AddChild(threadTree)
@@ -312,13 +391,83 @@ func (t *cpuTreeNode) toAttributedSlice(
312391
}
313392
}
314393

394+
// SplitLevel returns the root node of a new CPU tree where all
395+
// branches of a topology level have been split into new classes.
396+
func (t *cpuTreeNode) SplitLevel(splitLevel CPUTopologyLevel, cpuClassifier func(int) int) *cpuTreeNode {
397+
newRoot := t.CopyTree()
398+
newRoot.DepthFirstWalk(func(tn *cpuTreeNode) error {
399+
// Dive into the level that will be split.
400+
if tn.level != splitLevel {
401+
return nil
402+
}
403+
// Classify CPUs to the map: class -> list of cpus
404+
classCpus := map[int][]int{}
405+
for _, cpu := range t.cpus.List() {
406+
class := cpuClassifier(cpu)
407+
classCpus[class] = append(classCpus[class], cpu)
408+
}
409+
// Clear existing children of this node. New children
410+
// will be classes whose children are masked versions
411+
// of original children of this node.
412+
origChildren := tn.children
413+
tn.children = make([]*cpuTreeNode, 0, len(classCpus))
414+
// Add new child corresponding each class.
415+
for class, cpus := range classCpus {
416+
cpuMask := cpuset.New(cpus...)
417+
newNode := NewCpuTree(fmt.Sprintf("%sclass%d", tn.name, class))
418+
tn.AddChild(newNode)
419+
newNode.cpus = tn.cpus.Intersection(cpuMask)
420+
newNode.level = tn.level
421+
newNode.parent = tn
422+
for _, child := range origChildren {
423+
newChild := child.CopyTree()
424+
newChild.DepthFirstWalk(func(cn *cpuTreeNode) error {
425+
cn.cpus = cn.cpus.Intersection(cpuMask)
426+
if cn.cpus.Size() == 0 && cn.parent != nil {
427+
// all cpus masked
428+
// out: cut out this
429+
// branch
430+
newSiblings := []*cpuTreeNode{}
431+
for _, child := range cn.parent.children {
432+
if child != cn {
433+
newSiblings = append(newSiblings, child)
434+
}
435+
}
436+
cn.parent.children = newSiblings
437+
return WalkSkipChildren
438+
}
439+
return nil
440+
})
441+
newNode.AddChild(newChild)
442+
}
443+
}
444+
return WalkSkipChildren
445+
})
446+
return newRoot
447+
}
448+
315449
// NewAllocator returns new CPU allocator for allocating CPUs from a
316450
// CPU tree branch.
317451
func (t *cpuTreeNode) NewAllocator(options cpuTreeAllocatorOptions) *cpuTreeAllocator {
318452
ta := &cpuTreeAllocator{
319453
root: t,
320454
options: options,
321455
}
456+
if options.preferSpreadOnPhysicalCores {
457+
newTree := t.SplitLevel(CPUTopologyLevelNuma,
458+
// CPU classifier: class of the CPU equals to
459+
// the index in the child list of its parent
460+
// node in the tree. Expect leaf node is a
461+
// hyperthread, parent a physical core.
462+
func(cpu int) int {
463+
leaf := t.FindLeafWithCpu(cpu)
464+
if leaf == nil {
465+
log.Fatalf("SplitLevel CPU classifier: cpu %d not in tree:\n%s\n\n", cpu, t.PrettyPrint())
466+
}
467+
return leaf.SiblingIndex()
468+
})
469+
ta.root = newTree
470+
}
322471
return ta
323472
}
324473

@@ -409,7 +558,36 @@ func (ta *cpuTreeAllocator) sorterRelease(tnas []cpuTreeNodeAttributes) func(int
409558
// abs(delta) CPUs can be freed.
410559
func (ta *cpuTreeAllocator) ResizeCpus(currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
411560
if delta > 0 {
412-
return ta.resizeCpus(currentCpus, freeCpus, delta)
561+
addFromSuperset, removeFromSuperset, err := ta.resizeCpus(currentCpus, freeCpus, delta)
562+
if !ta.options.preferSpreadOnPhysicalCores || addFromSuperset.Size() == delta {
563+
return addFromSuperset, removeFromSuperset, err
564+
}
565+
// addFromSuperset contains more CPUs (equally good
566+
// choices) than actually needed. In case of
567+
// preferSpreadOnPhysicalCores, however, selecting any
568+
// of these does not result in equally good
569+
// result. Therefore, in this case, construct addFrom
570+
// set by adding one CPU at a time.
571+
addFrom := cpuset.New()
572+
for n := 0; n < delta; n++ {
573+
addSingleFrom, _, err := ta.resizeCpus(currentCpus, freeCpus, 1)
574+
if err != nil {
575+
return addFromSuperset, removeFromSuperset, err
576+
}
577+
if addSingleFrom.Size() != 1 {
578+
return addFromSuperset, removeFromSuperset, fmt.Errorf("internal error: failed to find single CPU to allocate, "+
579+
"currentCpus=%s freeCpus=%s expectedSingle=%s",
580+
currentCpus, freeCpus, addSingleFrom)
581+
}
582+
addFrom = addFrom.Union(addSingleFrom)
583+
if addFrom.Size() != n+1 {
584+
return addFromSuperset, removeFromSuperset, fmt.Errorf("internal error: double add the same CPU (%s) to cpuset %s on round %d",
585+
addSingleFrom, addFrom, n+1)
586+
}
587+
currentCpus = currentCpus.Union(addSingleFrom)
588+
freeCpus = freeCpus.Difference(addSingleFrom)
589+
}
590+
return addFrom, removeFromSuperset, nil
413591
}
414592
// In multi-CPU removal, remove CPUs one by one instead of
415593
// trying to find a single topology element from which all of

0 commit comments

Comments
 (0)