-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcore.py
More file actions
126 lines (116 loc) · 4.97 KB
/
core.py
File metadata and controls
126 lines (116 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from __future__ import annotations
import math, random, statistics
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
EPS = 1e-9
@dataclass
class Task:
cpu: float
duration: int
remaining: int = field(init=False)
def __post_init__(self): self.remaining = self.duration
@dataclass
class Node:
capacity: float
used: float = 0.0
tasks: List[Task] = field(default_factory=list)
@property
def util(self) -> float:
if self.capacity <= 0: return 0.0
return min(1.0, self.used / self.capacity)
def can_fit(self, t: Task) -> bool: return self.used + t.cpu <= self.capacity + EPS
def place(self, t: Task) -> bool:
if self.can_fit(t):
self.tasks.append(t); self.used += t.cpu; return True
return False
def step(self):
still = []
for t in self.tasks:
t.remaining -= 1
if t.remaining > 0: still.append(t)
else: self.used -= t.cpu
self.tasks = still
class Workload:
def __init__(self, seed=0, arrival_rate=1.0, cpu_shape=(0.05,0.2), dur_range=(5,50)):
self.rng = random.Random(seed)
self.arrival_rate = arrival_rate
self.cpu_shape = cpu_shape
self.dur_range = dur_range
def poisson(self) -> int:
lam = self.arrival_rate
L = math.exp(-lam); k=0; p=1.0
while p > L: k += 1; p *= self.rng.random()
return max(0,k-1)
def new_tasks(self) -> List[Task]:
k = self.poisson(); out = []
a,b = self.cpu_shape
for _ in range(k):
cpu = a + (b-a)*self.rng.random()
dur = self.rng.randint(self.dur_range[0], self.dur_range[1])
out.append(Task(cpu=cpu, duration=dur))
return out
class Scheduler:
"""p∈[0,1]: 0→spread (prefer low-util nodes), 1→pack (prefer high-util nodes)."""
def __init__(self, nodes: List[Node], p: float = 0.5, rng: Optional[random.Random] = None):
self.nodes = nodes; self.p = max(0.0, min(1.0, p)); self.rng = rng or random.Random(0)
def place_task(self, task: Task) -> bool:
jitter = 1e-6
if self.p < 0.5:
order = sorted(self.nodes, key=lambda n: (n.util + jitter*self.rng.random()))
else:
order = sorted(self.nodes, key=lambda n: (-(n.util) + jitter*self.rng.random()))
for n in order:
if n.place(task): return True
return False
def step(self):
for n in self.nodes: n.step()
def sigma_coherence(nodes: List[Node]) -> float:
utils = [n.util for n in nodes]
mu = sum(utils)/(len(utils) or 1)
if mu <= EPS: return 1.0
sd = statistics.pstdev(utils) if len(utils)>1 else 0.0
return max(0.0, 1.0 - sd/(mu + EPS))
def entropy_H(nodes: List[Node]) -> float:
counts = [len(n.tasks) for n in nodes]; M = sum(counts)
if M <= 0: return 0.0
H = 0.0
for c in counts:
if c>0:
p = c/M; H -= p*math.log(p+EPS)
return H
class HeuristicCtrl:
def __init__(self, lr=0.06, w_sigma=1.0, w_drift=0.7, w_H=0.3):
self.lr=lr; self.w_sigma=w_sigma; self.w_drift=w_drift; self.w_H=w_H
def update(self, p: float, nodes: List[Node], qlen: int, prev_qlen: int):
sig = sigma_coherence(nodes); H = entropy_H(nodes); drift = qlen - prev_qlen
N = len(nodes); Hopt = math.log(max(1,N))
ctrl = ( self.w_sigma*(1.0 - sig)
- self.w_drift*math.copysign(math.sqrt(abs(drift)), drift)
+ self.w_H*(Hopt - H) )
p_new = max(0.0, min(1.0, p + self.lr*ctrl))
Phi = (1.0 - sig)**2 + 0.4*(drift**2) + 0.6*(Hopt - H)**2
return p_new, sig, H, drift, Phi
class PIDCtrl:
def __init__(self, target_q=0.0, kp=0.005, ki=0.0005, kd=0.01, ks=0.02):
self.tq=target_q; self.kp=kp; self.ki=ki; self.kd=kd; self.ks=ks
self.integral=0.0; self.prev=None
def update(self, p: float, nodes: List[Node], qlen: int, prev_qlen: int):
sig = sigma_coherence(nodes); H = entropy_H(nodes)
err = qlen - self.tq; self.integral += err
deriv = 0.0 if self.prev is None else (err - self.prev); self.prev = err
ctrl = self.kp*err + self.ki*self.integral + self.kd*deriv + self.ks*(1.0 - sig)
p_new = max(0.0, min(1.0, p - ctrl))
N = len(nodes); Hopt = math.log(max(1,N))
Phi = (1.0 - sig)**2 + 0.4*(err**2) + 0.6*(Hopt - H)**2
drift = qlen - prev_qlen
return p_new, sig, H, drift, Phi
class BayesCtrl:
def __init__(self): pass
def update(self, p: float, nodes: List[Node], qlen: int, prev_qlen: int):
sig = sigma_coherence(nodes); H = entropy_H(nodes); drift = qlen - prev_qlen
N = len(nodes); Hopt = math.log(max(1,N))
l_sigma = max(EPS, sig); l_drift = math.exp(-abs(drift)); l_H = math.exp(-abs(H - Hopt))
post = l_sigma*l_drift*l_H; post = post/(1.0+post) # [0,1)
p_new = max(0.0, min(1.0, p + 0.2*(post - 0.5)))
Phi = (1.0 - sig)**2 + 0.4*(drift**2) + 0.6*(Hopt - H)**2
return p_new, sig, H, drift, Phi