Skip to content

Commit bd55ea5

Browse files
committed
Fix bugs
Signed-off-by: Balaji Veeramani <[email protected]>
1 parent 46968e0 commit bd55ea5

File tree

1 file changed

+36
-27
lines changed

1 file changed

+36
-27
lines changed

python/ray/_private/test_utils.py

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
gcs_service_pb2,
4848
node_manager_pb2,
4949
)
50-
from ray.core.generated.autoscaler_pb2 import DrainNodeReason
5150
from ray.util.queue import Empty, Queue, _QueueActor
5251
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
5352

@@ -1576,35 +1575,15 @@ class EC2InstanceTerminator(NodeKillerBase):
15761575
def _kill_resource(self, node_id, node_to_kill_ip, _):
15771576
if node_to_kill_ip is not None:
15781577
try:
1579-
self._terminate_ec2_instance(node_to_kill_ip)
1578+
_terminate_ec2_instance(node_to_kill_ip)
15801579
except Exception:
15811580
pass
15821581
logging.info(f"Terminated instance, {node_id=}, address={node_to_kill_ip}")
15831582
self.killed.add(node_id)
15841583

1585-
def _terminate_ec2_instance(self, ip):
1586-
logging.info(f"Terminating instance, {ip=}")
1587-
# This command uses IMDSv2 to get the host instance id and region.
1588-
# After that it terminates itself using aws cli.
1589-
multi_line_command = (
1590-
'TOKEN=$(curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600");' # noqa: E501
1591-
'instanceId=$(curl -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/instance-id/);' # noqa: E501
1592-
'region=$(curl -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/placement/region);' # noqa: E501
1593-
"aws ec2 terminate-instances --region $region --instance-ids $instanceId" # noqa: E501
1594-
)
1595-
# This is a feature on Anyscale platform that enables
1596-
# easy ssh access to worker nodes.
1597-
ssh_command = f"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -p 2222 ray@{ip} '{multi_line_command}'" # noqa: E501
1598-
1599-
result = subprocess.run(
1600-
ssh_command, shell=True, capture_output=True, text=True, check=True
1601-
)
1602-
print(f"STDOUT:\n{result.stdout}\n")
1603-
print(f"STDERR:\n{result.stderr}\n")
1604-
16051584

16061585
@ray.remote(num_cpus=0)
1607-
class EC2InstanceTerminatorWithGracePeriod(EC2InstanceTerminator):
1586+
class EC2InstanceTerminatorWithGracePeriod(NodeKillerBase):
16081587
def __init__(self, *args, grace_period_s: int = 30, **kwargs):
16091588
super().__init__(*args, **kwargs)
16101589

@@ -1613,7 +1592,7 @@ def __init__(self, *args, grace_period_s: int = 30, **kwargs):
16131592

16141593
def _kill_resource(self, node_id, node_to_kill_ip, _):
16151594
# Clean up any completed threads.
1616-
for thread in self._kill_threads:
1595+
for thread in self._kill_threads.copy():
16171596
if not thread.is_alive():
16181597
thread.join()
16191598
self._kill_threads.remove(thread)
@@ -1624,7 +1603,7 @@ def _kill_node_with_grace_period(node_id, node_to_kill_ip):
16241603
# Wait for the grace period to finish.
16251604
time.sleep(self._grace_period_s)
16261605
# Kill the node.
1627-
super()._kill_resource(node_id, node_to_kill_ip, _)
1606+
_terminate_ec2_instance(node_id)
16281607

16291608
thread = threading.Thread(
16301609
target=_kill_node_with_grace_period,
@@ -1635,19 +1614,26 @@ def _kill_node_with_grace_period(node_id, node_to_kill_ip):
16351614
self._kill_threads.add(thread)
16361615

16371616
def _drain_node(self, node_id: str) -> None:
1617+
# We need to lazily import this object. Otherwise, Ray can't serialize the
1618+
# class.
1619+
from ray.core.generated import autoscaler_pb2
1620+
16381621
assert ray.NodeID.from_hex(node_id) != ray.NodeID.nil()
16391622

16401623
logging.info(f"Draining node {node_id}")
16411624
address = services.canonicalize_bootstrap_address_or_die(addr=None)
16421625
gcs_client = ray._raylet.GcsClient(address=address)
16431626
deadline_timestamp_ms = (time.time_ns() // 1e6) + (self._grace_period_s * 1e3)
16441627
is_accepted, _ = gcs_client.drain_node(
1645-
node_id, DrainNodeReason.PREEMPTION, "", deadline_timestamp_ms
1628+
node_id,
1629+
autoscaler_pb2.DrainNodeReason.Value("DRAIN_NODE_REASON_PREEMPTION"),
1630+
"",
1631+
deadline_timestamp_ms,
16461632
)
16471633
assert is_accepted, "Drain node request was rejected"
16481634

16491635
def _cleanup(self):
1650-
for thread in self._kill_threads:
1636+
for thread in self._kill_threads.copy():
16511637
thread.join()
16521638
self._kill_threads.remove(thread)
16531639

@@ -2244,3 +2230,26 @@ def f():
22442230
assert all(
22452231
[extra_usage_tags[k] == v for k, v in expected_extra_usage_tags.items()]
22462232
), extra_usage_tags
2233+
2234+
2235+
def _terminate_ec2_instance(ip):
2236+
logging.info(f"Terminating instance, {ip=}")
2237+
# This command uses IMDSv2 to get the host instance id and region.
2238+
# After that it terminates itself using aws cli.
2239+
multi_line_command = (
2240+
'TOKEN=$(curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600");' # noqa: E501
2241+
'instanceId=$(curl -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/instance-id/);' # noqa: E501
2242+
'region=$(curl -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/placement/region);' # noqa: E501
2243+
"aws ec2 terminate-instances --region $region --instance-ids $instanceId" # noqa: E501
2244+
)
2245+
# This is a feature on Anyscale platform that enables
2246+
# easy ssh access to worker nodes.
2247+
ssh_command = f"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -p 2222 ray@{ip} '{multi_line_command}'" # noqa: E501
2248+
2249+
try:
2250+
subprocess.run(
2251+
ssh_command, shell=True, capture_output=True, text=True, check=True
2252+
)
2253+
except subprocess.CalledProcessError as e:
2254+
print("Exit code:", e.returncode)
2255+
print("Stderr:", e.stderr)

0 commit comments

Comments
 (0)