Skip to content

Commit 31cf724

Browse files
autoscaler support
Signed-off-by: Jonathan Nitisastro <[email protected]>
1 parent fc76d06 commit 31cf724

File tree

8 files changed

+75
-30
lines changed

8 files changed

+75
-30
lines changed

python/ray/autoscaler/_private/fake_multi_node/node_provider.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ def create_node_with_resources_and_labels(
316316
num_cpus=resources.pop("CPU", 0),
317317
num_gpus=resources.pop("GPU", 0),
318318
object_store_memory=resources.pop("object_store_memory", None),
319+
_gpu_memory=resources.pop("gpu_memory", 0),
319320
resources=resources,
320321
labels=labels,
321322
redis_address="{}:6379".format(

python/ray/autoscaler/_private/resource_demand_scheduler.py

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def __init__(
109109
upscaling_speed: float,
110110
) -> None:
111111
self.provider = provider
112-
self.node_types = copy.deepcopy(node_types)
112+
self.node_types = self._adjust_node_types(copy.deepcopy(node_types))
113113
self.node_resource_updated = set()
114114
self.max_workers = max_workers
115115
self.head_node_type = head_node_type
@@ -151,12 +151,27 @@ def reset_config(
151151
inferered resources are not lost.
152152
"""
153153
self.provider = provider
154-
self.node_types = copy.deepcopy(node_types)
154+
self.node_types = self._adjust_node_types(copy.deepcopy(node_types))
155155
self.node_resource_updated = set()
156156
self.max_workers = max_workers
157157
self.head_node_type = head_node_type
158158
self.upscaling_speed = upscaling_speed
159159

160+
def _adjust_node_types(self, node_types):
161+
# update available_node_types gpu_memory to gpu_memory_per_gpu
162+
for node_type, node_config in node_types.items():
163+
resources = node_config["resources"]
164+
if "gpu_memory" in resources:
165+
if "GPU" in resources and resources["GPU"] > 0:
166+
resources["node:gpu_memory_per_gpu"] = (
167+
resources["gpu_memory"] / resources["GPU"]
168+
)
169+
else:
170+
resources["node:gpu_memory_per_gpu"] = 0
171+
del resources["gpu_memory"]
172+
node_types[node_type] = node_config
173+
return node_types
174+
160175
def is_feasible(self, bundle: ResourceDict) -> bool:
161176
for node_type, config in self.node_types.items():
162177
max_of_type = config.get("max_workers", 0)
@@ -372,6 +387,10 @@ def _update_node_resources_from_runtime(
372387
for key in ["CPU", "GPU", "memory", "object_store_memory"]:
373388
if key in runtime_resources:
374389
resources[key] = runtime_resources[key]
390+
if "gpu_memory" in runtime_resources and "GPU" in runtime_resources:
391+
resources["node:gpu_memory_per_gpu"] = int(
392+
runtime_resources["gpu_memory"]
393+
) / int(runtime_resources["GPU"])
375394
self.node_types[node_type]["resources"] = resources
376395

377396
node_kind = tags[TAG_RAY_NODE_KIND]
@@ -823,7 +842,7 @@ def _resource_based_utilization_scorer(
823842
num_matching_resource_types = 0
824843
for k, v in node_resources.items():
825844
# Don't divide by zero.
826-
if v < 1:
845+
if v < 1 or k == "node::gpu_memory_per_gpu":
827846
# Could test v == 0 on the nose, but v < 1 feels safer.
828847
# (Note that node resources are integers.)
829848
continue
@@ -931,8 +950,31 @@ def get_bin_pack_residual(
931950
return unfulfilled, nodes + used
932951

933952

953+
def _convert_relative_resources(
954+
node: ResourceDict, resources: ResourceDict
955+
) -> Optional[ResourceDict]:
956+
# return None if relative resources can't be converted
957+
adjusted_resources = resources.copy()
958+
if "gpu_memory" in resources:
959+
if (
960+
"node:gpu_memory_per_gpu" not in node
961+
or node["node:gpu_memory_per_gpu"] == 0
962+
):
963+
return None
964+
adjusted_resources["GPU"] = (
965+
resources["gpu_memory"] / node["node:gpu_memory_per_gpu"]
966+
)
967+
if adjusted_resources["GPU"] > 1.0:
968+
return None
969+
del adjusted_resources["gpu_memory"]
970+
return adjusted_resources
971+
972+
934973
def _fits(node: ResourceDict, resources: ResourceDict) -> bool:
935-
for k, v in resources.items():
974+
adjusted_resources = _convert_relative_resources(node, resources)
975+
if adjusted_resources is None:
976+
return False
977+
for k, v in adjusted_resources.items():
936978
# TODO(jjyao): Change ResourceDict to a class so we can
937979
# hide the implicit resource handling.
938980
if v > node.get(
@@ -943,7 +985,10 @@ def _fits(node: ResourceDict, resources: ResourceDict) -> bool:
943985

944986

945987
def _inplace_subtract(node: ResourceDict, resources: ResourceDict) -> None:
946-
for k, v in resources.items():
988+
adjusted_resources = _convert_relative_resources(node, resources)
989+
if adjusted_resources is None:
990+
return
991+
for k, v in adjusted_resources.items():
947992
if v == 0:
948993
# This is an edge case since some reasonable programs/computers can
949994
# do `ray.autoscaler.sdk.request_resources({"GPU": 0}"})`.

python/ray/cluster_utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ def start(self, _system_config=None, override_env: Optional[Dict] = None):
8585
self._head_resources.pop("object_store_memory")
8686
)
8787
)
88+
if "gpu_memory" in self._head_resources:
89+
cmd.append("--gpu-memory={}".format(self._head_resources.pop("gpu_memory")))
8890
if self._head_resources:
8991
cmd.append("--resources='{}'".format(json.dumps(self._head_resources)))
9092
if _system_config is not None:

src/ray/common/scheduling/cluster_resource_data.cc

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,12 @@ NodeResources ResourceMapToNodeResources(
5959
auto node_labels_copy = node_labels;
6060

6161
if (resource_map_total.find("gpu_memory") != resource_map_total.end()) {
62-
node_labels_copy["gpu_memory"] = std::to_string(resource_map_total.at("gpu_memory") /
63-
resource_map_total.at("GPU"));
62+
node_labels_copy["_gpu_memory_per_gpu"] = std::to_string(
63+
resource_map_total.at("gpu_memory") / resource_map_total.at("GPU"));
6464
resource_map_total_copy.erase("gpu_memory");
6565
resource_map_available_copy.erase("gpu_memory");
6666
} else {
67-
node_labels_copy["gpu_memory"] = "0";
67+
node_labels_copy["_gpu_memory_per_gpu"] = "0";
6868
}
6969

7070
node_resources.total = NodeResourceSet(resource_map_total_copy);
@@ -107,7 +107,7 @@ bool NodeResources::IsAvailable(const ResourceRequest &resource_request,
107107
return false;
108108
}
109109
const ResourceSet resource_request_adjusted =
110-
this->ConvertRelativeResource(resource_request.GetResourceSet());
110+
this->ConvertRelativeResources(resource_request.GetResourceSet());
111111
if (!this->normal_task_resources.IsEmpty()) {
112112
auto available_resources = this->available;
113113
available_resources -= this->normal_task_resources;
@@ -118,7 +118,7 @@ bool NodeResources::IsAvailable(const ResourceRequest &resource_request,
118118

119119
bool NodeResources::IsFeasible(const ResourceRequest &resource_request) const {
120120
const ResourceSet resource_request_adjusted =
121-
this->ConvertRelativeResource(resource_request.GetResourceSet());
121+
this->ConvertRelativeResources(resource_request.GetResourceSet());
122122
return this->total >= resource_request_adjusted;
123123
}
124124

@@ -143,21 +143,21 @@ std::string NodeResources::DebugString() const {
143143
return buffer.str();
144144
}
145145

146-
const ResourceSet NodeResources::ConvertRelativeResource(
146+
const ResourceSet NodeResources::ConvertRelativeResources(
147147
const ResourceSet &resource) const {
148148
ResourceSet adjusted_resource = resource;
149149
// convert gpu_memory to GPU
150150
if (resource.Has(ResourceID::GPU_Memory())) {
151-
double total_gpu_memory = 0;
152-
if (this->labels.find("gpu_memory") != this->labels.end()) {
151+
double total_gpu_memory_per_gpu = 0;
152+
if (this->labels.find("_gpu_memory_per_gpu") != this->labels.end()) {
153153
// TODO: raise exception if this is not true
154-
total_gpu_memory = std::stod(this->labels.at("gpu_memory"));
154+
total_gpu_memory_per_gpu = std::stod(this->labels.at("_gpu_memory_per_gpu"));
155155
}
156156
double num_gpus_request = 0;
157-
if (total_gpu_memory > 0) {
157+
if (total_gpu_memory_per_gpu > 0) {
158158
// round up to closes kResourceUnitScaling
159159
num_gpus_request =
160-
(resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_memory) +
160+
(resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_memory_per_gpu) +
161161
1 / static_cast<double>(2 * kResourceUnitScaling);
162162
}
163163
adjusted_resource.Set(ResourceID::GPU(), num_gpus_request);
@@ -193,20 +193,20 @@ const NodeResourceInstanceSet &NodeResourceInstances::GetTotalResourceInstances(
193193
return this->total;
194194
};
195195

196-
const ResourceSet NodeResourceInstances::ConvertRelativeResource(
196+
const ResourceSet NodeResourceInstances::ConvertRelativeResources(
197197
const ResourceSet &resource) const {
198198
ResourceSet adjusted_resource = resource;
199199
// convert gpu_memory to GPU
200200
if (resource.Has(ResourceID::GPU_Memory())) {
201-
double total_gpu_memory = 0;
202-
if (this->labels.find("gpu_memory") != this->labels.end()) {
203-
total_gpu_memory = std::stod(this->labels.at("gpu_memory"));
201+
double total_gpu_memory_per_gpu = 0;
202+
if (this->labels.find("_gpu_memory_per_gpu") != this->labels.end()) {
203+
total_gpu_memory_per_gpu = std::stod(this->labels.at("_gpu_memory_per_gpu"));
204204
}
205205
double num_gpus_request = 0;
206-
if (total_gpu_memory > 0) {
206+
if (total_gpu_memory_per_gpu > 0) {
207207
// round up to closes kResourceUnitScaling
208208
num_gpus_request =
209-
(resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_memory) +
209+
(resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_memory_per_gpu) +
210210
1 / static_cast<double>(2 * kResourceUnitScaling);
211211
}
212212
adjusted_resource.Set(ResourceID::GPU(), num_gpus_request);

src/ray/common/scheduling/cluster_resource_data.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ class NodeResources {
335335
std::string DictString() const;
336336
// Returns adjusted ResourceSet after converting resource relative to others.
337337
// For example: gpu_memory => num_gpus = gpu_memory / total.gpu_memory.
338-
const ResourceSet ConvertRelativeResource(const ResourceSet &resource) const;
338+
const ResourceSet ConvertRelativeResources(const ResourceSet &resource) const;
339339
};
340340

341341
/// Total and available capacities of each resource instance.
@@ -357,7 +357,7 @@ class NodeResourceInstances {
357357

358358
// Returns adjusted ResourceSet after converting resource relative to others.
359359
// For example: gpu_memory => num_gpus = gpu_memory / total.gpu_memory.
360-
const ResourceSet ConvertRelativeResource(const ResourceSet &resource) const;
360+
const ResourceSet ConvertRelativeResources(const ResourceSet &resource) const;
361361
};
362362

363363
struct Node {

src/ray/common/scheduling/resource_instance_set.cc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ bool NodeResourceInstanceSet::operator==(const NodeResourceInstanceSet &other) c
9393
std::optional<absl::flat_hash_map<ResourceID, std::vector<FixedPoint>>>
9494
NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) {
9595
absl::flat_hash_map<ResourceID, std::vector<FixedPoint>> allocations;
96-
// update this to TryAllocateBundle
9796
for (const auto &[resource_id, demand] : resource_demands.Resources()) {
9897
auto allocation = TryAllocate(resource_id, demand);
9998
if (allocation) {
@@ -135,8 +134,6 @@ std::optional<std::vector<FixedPoint>> NodeResourceInstanceSet::TryAllocate(
135134
return std::nullopt;
136135
}
137136
}
138-
// need to update this to support instance > 1
139-
// still unit tho, might need to create different TryAllocate function
140137

141138
// If resources has multiple instances, each instance has total capacity of 1.
142139
//

src/ray/raylet/scheduling/cluster_resource_manager.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ bool ClusterResourceManager::SubtractNodeAvailableResources(
188188

189189
NodeResources *resources = it->second.GetMutableLocalView();
190190
const ResourceSet adjusted_resource_request =
191-
resources->ConvertRelativeResource(resource_request.GetResourceSet());
191+
resources->ConvertRelativeResources(resource_request.GetResourceSet());
192192

193193
resources->available -= adjusted_resource_request;
194194
resources->available.RemoveNegative();
@@ -217,7 +217,7 @@ bool ClusterResourceManager::HasSufficientResource(
217217
}
218218

219219
const ResourceSet adjusted_resource_request =
220-
resources.ConvertRelativeResource(resource_request.GetResourceSet());
220+
resources.ConvertRelativeResources(resource_request.GetResourceSet());
221221

222222
return resources.available >= adjusted_resource_request;
223223
}

src/ray/raylet/scheduling/local_resource_manager.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ bool LocalResourceManager::AllocateTaskResourceInstances(
8181
std::shared_ptr<TaskResourceInstances> task_allocation) {
8282
RAY_CHECK(task_allocation != nullptr);
8383
const ResourceSet adjusted_resource_request =
84-
local_resources_.ConvertRelativeResource(resource_request.GetResourceSet());
84+
local_resources_.ConvertRelativeResources(resource_request.GetResourceSet());
8585
if (resource_request.GetResourceSet().Has(ResourceID::GPU_Memory()) &&
8686
adjusted_resource_request.Get(ResourceID::GPU()) > 1) {
8787
return false;

0 commit comments

Comments
 (0)