Skip to content

Commit 1368146

Browse files
CMLDEV-575 PCAP Support (#186)
1 parent 4f2e7bb commit 1368146

File tree

2 files changed

+317
-0
lines changed

2 files changed

+317
-0
lines changed

tests/test_pcap.py

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
#
2+
# This file is part of VIRL 2
3+
# Copyright (c) 2019-2025, Cisco Systems, Inc.
4+
# All rights reserved.
5+
#
6+
# Python bindings for the Cisco VIRL 2 Network Simulation Platform
7+
#
8+
# Licensed under the Apache License, Version 2.0 (the "License");
9+
# you may not use this file except in compliance with the License.
10+
# You may obtain a copy of the License at
11+
#
12+
# http://www.apache.org/licenses/LICENSE-2.0
13+
#
14+
# Unless required by applicable law or agreed to in writing, software
15+
# distributed under the License is distributed on an "AS IS" BASIS,
16+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
# See the License for the specific language governing permissions and
18+
# limitations under the License.
19+
#
20+
21+
from unittest.mock import Mock
22+
23+
import pytest
24+
import respx
25+
26+
from virl2_client.models.link import Link
27+
28+
29+
@pytest.fixture
30+
def mock_link():
31+
"""Create a mock link with mocked session for testing."""
32+
mock_session = Mock()
33+
mock_lab = Mock()
34+
mock_lab._url_for.return_value = "labs/test-lab"
35+
mock_interface_a = Mock()
36+
mock_interface_b = Mock()
37+
38+
link = Link(mock_lab, "test-link", mock_interface_a, mock_interface_b)
39+
link._session = mock_session
40+
return link
41+
42+
43+
def test_url_templates_exist():
44+
"""Test that all required URL templates are defined."""
45+
required_templates = [
46+
"capture_start",
47+
"capture_stop",
48+
"capture_status",
49+
"capture_key",
50+
]
51+
52+
for template in required_templates:
53+
assert template in Link._URL_TEMPLATES
54+
assert "{lab}/links/{id}/capture/" in Link._URL_TEMPLATES[template]
55+
56+
57+
@respx.mock
58+
def test_start_capture_with_params(mock_link):
59+
"""Test start_capture with explicit parameters."""
60+
expected_response = {
61+
"config": {
62+
"link_capture_key": "test-key-123",
63+
"maxpackets": 100,
64+
"encap": "ethernet",
65+
},
66+
"starttime": "2026-01-12T10:00:00Z",
67+
"packetscaptured": 0,
68+
}
69+
70+
mock_link._session.put.return_value.json.return_value = expected_response
71+
72+
result = mock_link.start_capture(maxpackets=100)
73+
74+
assert result == expected_response
75+
assert result["config"]["maxpackets"] == 100
76+
assert "link_capture_key" in result["config"]
77+
78+
79+
@respx.mock
80+
def test_start_capture_defaults(mock_link):
81+
"""Test start_capture without parameters uses server defaults."""
82+
expected_response = {
83+
"config": {
84+
"link_capture_key": "default-key-456",
85+
"maxpackets": 1000000,
86+
"maxtime": 86400,
87+
"encap": "ethernet",
88+
},
89+
"starttime": "2026-01-12T10:00:00Z",
90+
"packetscaptured": 0,
91+
}
92+
93+
mock_link._session.put.return_value.json.return_value = expected_response
94+
95+
result = mock_link.start_capture()
96+
97+
assert result == expected_response
98+
assert result["config"]["maxpackets"] == 1000000
99+
assert result["config"]["maxtime"] == 86400
100+
101+
102+
@respx.mock
103+
def test_capture_status(mock_link):
104+
"""Test capture_status with mocked HTTP call."""
105+
expected_status = {
106+
"config": {
107+
"link_capture_key": "status-key-456",
108+
"maxpackets": 200,
109+
"encap": "ethernet",
110+
},
111+
"starttime": "2026-01-12T09:30:00Z",
112+
"packetscaptured": 15,
113+
}
114+
115+
mock_link._session.get.return_value.json.return_value = expected_status
116+
117+
result = mock_link.capture_status()
118+
119+
assert result == expected_status
120+
assert result["packetscaptured"] == 15
121+
122+
123+
@respx.mock
124+
def test_capture_key(mock_link):
125+
"""Test capture_key with mocked HTTP call."""
126+
expected_key = "capture-key-789"
127+
mock_link._session.get.return_value.json.return_value = expected_key
128+
129+
result = mock_link.capture_key()
130+
131+
assert result == expected_key
132+
133+
134+
@respx.mock
135+
def test_stop_capture(mock_link):
136+
"""Test stop_capture with mocked HTTP call."""
137+
mock_link._session.put.return_value = Mock()
138+
139+
result = mock_link.stop_capture()
140+
141+
mock_link._session.put.assert_called_once()
142+
assert result is None
143+
144+
145+
@respx.mock
146+
def test_download_capture_auto_key(mock_link):
147+
"""Test download_capture with automatic key retrieval."""
148+
149+
def mock_get_side_effect(url):
150+
mock_response = Mock()
151+
if "capture/key" in url:
152+
mock_response.json.return_value = "auto-retrieved-key"
153+
else:
154+
mock_response.content = b"PCAP file content"
155+
return mock_response
156+
157+
mock_link._session.get.side_effect = mock_get_side_effect
158+
159+
result = mock_link.download_capture()
160+
161+
assert result == b"PCAP file content"
162+
assert mock_link._session.get.call_count == 2
163+
164+
165+
@respx.mock
166+
def test_get_capture_packets(mock_link):
167+
"""Test get_capture_packets with mocked HTTP call."""
168+
expected_packets = [
169+
{"packet": {"timestamp": "2026-01-12T10:00:01Z", "size": 64}},
170+
{"packet": {"timestamp": "2026-01-12T10:00:02Z", "size": 128}},
171+
]
172+
173+
def mock_get_side_effect(url):
174+
mock_response = Mock()
175+
if "capture/key" in url:
176+
mock_response.json.return_value = "packet-key-123"
177+
else:
178+
mock_response.json.return_value = expected_packets
179+
return mock_response
180+
181+
mock_link._session.get.side_effect = mock_get_side_effect
182+
183+
result = mock_link.get_capture_packets()
184+
185+
assert result == expected_packets
186+
assert len(result) == 2
187+
188+
189+
@respx.mock
190+
def test_download_capture_packet(mock_link):
191+
"""Test download_capture_packet with mocked HTTP call."""
192+
expected_packet_data = {
193+
"packet": {"timestamp": "2026-01-12T10:00:05Z", "size": 256}
194+
}
195+
196+
def mock_get_side_effect(url):
197+
mock_response = Mock()
198+
if "capture/key" in url:
199+
mock_response.json.return_value = "packet-download-key"
200+
else:
201+
mock_response.json.return_value = expected_packet_data
202+
return mock_response
203+
204+
mock_link._session.get.side_effect = mock_get_side_effect
205+
206+
result = mock_link.download_capture_packet(packet_id=5)
207+
208+
assert result == expected_packet_data

virl2_client/models/link.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ class Link:
4545
"start": "{lab}/links/{id}/state/start",
4646
"stop": "{lab}/links/{id}/state/stop",
4747
"condition": "{lab}/links/{id}/condition",
48+
"capture_start": "{lab}/links/{id}/capture/start",
49+
"capture_stop": "{lab}/links/{id}/capture/stop",
50+
"capture_status": "{lab}/links/{id}/capture/status",
51+
"capture_key": "{lab}/links/{id}/capture/key",
4852
}
4953

5054
def __init__(
@@ -410,3 +414,108 @@ def set_condition_by_name(self, name: str) -> None:
410414

411415
latency, bandwidth, loss = options[name]
412416
self.set_condition(bandwidth=bandwidth, latency=latency, loss=loss)
417+
418+
@check_stale
419+
def start_capture(
420+
self,
421+
maxpackets: int | None = None,
422+
maxtime: int | None = None,
423+
bpfilter: str | None = None,
424+
encap: str = "ethernet",
425+
) -> dict:
426+
"""
427+
Start a packet capture on this link.
428+
429+
:param maxpackets: Maximum number of packets to capture (1-1000000). If None, server sets default.
430+
:param maxtime: Maximum time in seconds to capture (1-86400). If None, server sets default.
431+
:param bpfilter: Berkeley packet filter string (1-128 chars).
432+
:param encap: Link encapsulation type.
433+
:returns: Dictionary containing the capture status and configuration.
434+
"""
435+
url = self._url_for("capture_start")
436+
data: dict[str, str | int] = {"encap": encap}
437+
438+
if maxpackets is not None:
439+
data["maxpackets"] = maxpackets
440+
if maxtime is not None:
441+
data["maxtime"] = maxtime
442+
if bpfilter is not None:
443+
data["bpfilter"] = bpfilter
444+
445+
_LOGGER.info(f"Starting packet capture on link {self.id}")
446+
return self._session.put(url, json=data).json()
447+
448+
@check_stale
449+
def stop_capture(self) -> None:
450+
"""
451+
Stop the packet capture on this link.
452+
"""
453+
url = self._url_for("capture_stop")
454+
_LOGGER.info(f"Stopping packet capture on link {self.id}")
455+
self._session.put(url)
456+
457+
@check_stale
458+
def capture_status(self) -> dict:
459+
"""
460+
Get the current packet capture status for this link.
461+
462+
:returns: Dictionary containing capture configuration, start time, and packet count.
463+
"""
464+
url = self._url_for("capture_status")
465+
return self._session.get(url).json()
466+
467+
@check_stale
468+
def capture_key(self) -> str:
469+
"""
470+
Get the capture key (UUID) for the packet capture on this link.
471+
472+
:returns: The capture key as a string.
473+
:raises: HTTP exception if no capture is running on this link.
474+
"""
475+
url = self._url_for("capture_key")
476+
return self._session.get(url).json()
477+
478+
def download_capture(self, capture_key: str | None = None) -> bytes:
479+
"""
480+
Download the PCAP file for this link's capture.
481+
482+
:param capture_key: The capture key. If None, will fetch it automatically.
483+
:returns: The PCAP file content as bytes.
484+
"""
485+
if capture_key is None:
486+
capture_key = self.capture_key()
487+
488+
url = f"{self._lab._session.base_url}/api/v0/pcap/{capture_key}"
489+
_LOGGER.info(f"Downloading PCAP for capture key {capture_key}")
490+
return self._session.get(url).content
491+
492+
def get_capture_packets(self, capture_key: str | None = None) -> list[dict]:
493+
"""
494+
Get a list of all captured packets in decoded format.
495+
496+
:param capture_key: The capture key. If None, will fetch it automatically.
497+
:returns: List of packet dictionaries with decoded packet information.
498+
"""
499+
if capture_key is None:
500+
capture_key = self.capture_key()
501+
502+
url = f"{self._lab._session.base_url}/api/v0/pcap/{capture_key}/packets"
503+
_LOGGER.info(f"Getting packet list for capture key {capture_key}")
504+
return self._session.get(url).json()
505+
506+
def download_capture_packet(
507+
self, packet_id: int, capture_key: str | None = None
508+
) -> dict:
509+
"""
510+
Download a specific packet from the capture in decoded format.
511+
512+
:param packet_id: The ID of the packet to download (1-based).
513+
:param capture_key: The capture key. If None, will fetch it automatically.
514+
:returns: Dictionary containing the decoded packet information.
515+
"""
516+
if capture_key is None:
517+
capture_key = self.capture_key()
518+
519+
url = f"{self._lab._session.base_url}/api/v0/pcap/{capture_key}/packet/{packet_id}"
520+
_LOGGER.info(f"Downloading packet {packet_id} for capture key {capture_key}")
521+
return self._session.get(url).json()

0 commit comments

Comments
 (0)