Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions crates/execution/assets/runners/python-runner.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,29 @@ function createPythonBridgeRpcBridge() {
maxBuffer,
}));
},
socketConnectSync(host, port) {
return JSON.stringify(requestSync('socketConnect', { hostname: host, port }));
},
socketSendSync(socketId, dataBase64) {
return JSON.stringify(requestSync('socketSend', { socketId, bodyBase64: dataBase64 }));
},
socketRecvSync(socketId, maxBuffer) {
return JSON.stringify(requestSync('socketRecv', { socketId, maxBuffer }));
},
socketCloseSync(socketId) {
return JSON.stringify(requestSync('socketClose', { socketId }));
},
udpCreateSync() {
return JSON.stringify(requestSync('udpCreate', {}));
},
udpSendtoSync(socketId, host, port, dataBase64) {
return JSON.stringify(
requestSync('udpSendto', { socketId, hostname: host, port, bodyBase64: dataBase64 }),
);
},
udpRecvfromSync(socketId, maxBuffer) {
return JSON.stringify(requestSync('udpRecvfrom', { socketId, maxBuffer }));
},
dispose() {},
};
}
Expand Down Expand Up @@ -1094,6 +1117,159 @@ def _agentos_gethostbyname(host):
_agentos_socket.getaddrinfo = _agentos_getaddrinfo
_agentos_socket.gethostbyname = _agentos_gethostbyname

# Raw socket bridge: back socket.socket() with the host (outbound TCP + UDP).
# Reads poll (the host uses a short read timeout) so the synchronous RPC never
# stalls the sidecar; the loop below re-polls to emulate blocking semantics.
import base64 as _agentos_base64
import time as _agentos_time

_agentos_original_socket_class = _agentos_socket.socket

class _SecureExecSocket:
def __init__(self, family=None, type=None, proto=0, fileno=None):
self.family = family if family is not None else _agentos_socket.AF_INET
self.type = type if type is not None else _agentos_socket.SOCK_STREAM
self.proto = proto
self._timeout = None # None blocks; 0 is non-blocking; >0 is a deadline
self._id = None
self._closed = False
self._is_udp = self.type == _agentos_socket.SOCK_DGRAM
if self._is_udp:
resp = _agentos_json.loads(_agentos_rpc.udpCreateSync())
self._id = int(resp["socketId"])

def connect(self, address):
host, port = address[0], address[1]
resp = _agentos_json.loads(_agentos_rpc.socketConnectSync(str(host), int(port)))
self._id = int(resp["socketId"])

def connect_ex(self, address):
try:
self.connect(address)
return 0
except OSError as exc:
return exc.errno or 1

def _ensure_id(self):
if self._id is None:
raise OSError(9, "Bad file descriptor")
return self._id

def send(self, data, flags=0):
sid = self._ensure_id()
b64 = _agentos_base64.b64encode(bytes(data)).decode("ascii")
resp = _agentos_json.loads(_agentos_rpc.socketSendSync(sid, b64))
return int(resp.get("bytesSent", len(data)))

def sendall(self, data, flags=0):
payload = bytes(data)
total = 0
while total < len(payload):
total += self.send(payload[total:], flags)
return None

def _poll(self, bufsize, recv_fn):
deadline = None
if self._timeout is not None and self._timeout > 0:
deadline = _agentos_time.monotonic() + self._timeout
while True:
resp = _agentos_json.loads(recv_fn(int(bufsize)))
if resp.get("closed"):
return b"", resp
data = resp.get("dataBase64") or ""
if data:
return _agentos_base64.b64decode(data), resp
if resp.get("timedOut"):
if self._timeout == 0:
raise BlockingIOError(11, "Resource temporarily unavailable")
if deadline is not None and _agentos_time.monotonic() >= deadline:
raise _agentos_socket.timeout("timed out")
continue
return b"", resp

def recv(self, bufsize, flags=0):
sid = self._ensure_id()
data, _ = self._poll(bufsize, lambda n: _agentos_rpc.socketRecvSync(sid, n))
return data

def sendto(self, data, *args):
address = args[-1]
host, port = address[0], address[1]
if self._id is None:
resp = _agentos_json.loads(_agentos_rpc.udpCreateSync())
self._id = int(resp["socketId"])
b64 = _agentos_base64.b64encode(bytes(data)).decode("ascii")
resp = _agentos_json.loads(
_agentos_rpc.udpSendtoSync(self._id, str(host), int(port), b64)
)
return int(resp.get("bytesSent", len(data)))

def recvfrom(self, bufsize, flags=0):
sid = self._ensure_id()
data, resp = self._poll(bufsize, lambda n: _agentos_rpc.udpRecvfromSync(sid, n))
addr = (resp.get("host", ""), int(resp.get("port", 0))) if resp else ("", 0)
return data, addr

def settimeout(self, value):
self._timeout = value

def gettimeout(self):
return self._timeout

def setblocking(self, flag):
self._timeout = None if flag else 0.0

def setsockopt(self, *args, **kwargs):
return None

def getsockopt(self, *args, **kwargs):
return 0

def fileno(self):
return self._id if self._id is not None else -1

def getpeername(self):
return ("", 0)

def getsockname(self):
return ("0.0.0.0", 0)

def close(self):
if self._closed:
return
self._closed = True
if self._id is not None:
try:
_agentos_rpc.socketCloseSync(self._id)
except Exception:
pass
self._id = None

def __enter__(self):
return self

def __exit__(self, *exc):
self.close()

def __del__(self):
try:
self.close()
except Exception:
pass

def _agentos_socket_factory(family=-1, type=-1, proto=0, fileno=None):
fam = family if family != -1 else _agentos_socket.AF_INET
typ = type if type != -1 else _agentos_socket.SOCK_STREAM
if (
fileno is None
and fam in (_agentos_socket.AF_INET, _agentos_socket.AF_INET6)
and typ in (_agentos_socket.SOCK_STREAM, _agentos_socket.SOCK_DGRAM)
):
return _SecureExecSocket(fam, typ, proto)
return _agentos_original_socket_class(family, type, proto, fileno)

_agentos_socket.socket = _agentos_socket_factory

class _SecureExecRequestsResponse:
def __init__(self, payload):
self.status_code = int(payload.get("status", 0))
Expand Down
67 changes: 67 additions & 0 deletions crates/execution/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ pub enum PythonVfsRpcMethod {
HttpRequest,
DnsLookup,
SubprocessRun,
SocketConnect,
SocketSend,
SocketRecv,
SocketClose,
UdpCreate,
UdpSendto,
UdpRecvfrom,
}

impl PythonVfsRpcMethod {
Expand All @@ -72,6 +79,13 @@ impl PythonVfsRpcMethod {
"httpRequest" => Some(Self::HttpRequest),
"dnsLookup" => Some(Self::DnsLookup),
"subprocessRun" => Some(Self::SubprocessRun),
"socketConnect" => Some(Self::SocketConnect),
"socketSend" => Some(Self::SocketSend),
"socketRecv" => Some(Self::SocketRecv),
"socketClose" => Some(Self::SocketClose),
"udpCreate" => Some(Self::UdpCreate),
"udpSendto" => Some(Self::UdpSendto),
"udpRecvfrom" => Some(Self::UdpRecvfrom),
_ => None,
}
}
Expand All @@ -92,6 +106,10 @@ pub struct PythonVfsRpcRequest {
pub body_base64: Option<String>,
pub hostname: Option<String>,
pub family: Option<u8>,
/// Port for socket connect/sendto.
pub port: Option<u16>,
/// Socket handle for send/recv/close/sendto/recvfrom.
pub socket_id: Option<u64>,
pub command: Option<String>,
pub args: Vec<String>,
pub cwd: Option<String>,
Expand Down Expand Up @@ -136,6 +154,23 @@ pub enum PythonVfsRpcResponsePayload {
stderr: String,
max_buffer_exceeded: bool,
},
SocketCreated {
socket_id: u64,
},
SocketSent {
bytes_sent: usize,
},
SocketReceived {
data_base64: String,
closed: bool,
timed_out: bool,
},
UdpReceived {
data_base64: String,
host: String,
port: u16,
timed_out: bool,
},
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -163,6 +198,10 @@ struct PythonVfsBridgeRequestWire {
#[serde(default)]
family: Option<u8>,
#[serde(default)]
port: Option<u16>,
#[serde(default, rename = "socketId")]
socket_id: Option<u64>,
#[serde(default)]
command: Option<String>,
#[serde(default)]
args: Vec<String>,
Expand Down Expand Up @@ -477,6 +516,32 @@ impl PythonExecution {
"stderr": stderr,
"maxBufferExceeded": max_buffer_exceeded,
}),
PythonVfsRpcResponsePayload::SocketCreated { socket_id } => json!({
"socketId": socket_id,
}),
PythonVfsRpcResponsePayload::SocketSent { bytes_sent } => json!({
"bytesSent": bytes_sent,
}),
PythonVfsRpcResponsePayload::SocketReceived {
data_base64,
closed,
timed_out,
} => json!({
"dataBase64": data_base64,
"closed": closed,
"timedOut": timed_out,
}),
PythonVfsRpcResponsePayload::UdpReceived {
data_base64,
host,
port,
timed_out,
} => json!({
"dataBase64": data_base64,
"host": host,
"port": port,
"timedOut": timed_out,
}),
};

self.inner
Expand Down Expand Up @@ -1195,6 +1260,8 @@ fn parse_python_bridge_sync_rpc_request(
body_base64: wire.body_base64,
hostname: wire.hostname,
family: wire.family,
port: wire.port,
socket_id: wire.socket_id,
command: wire.command,
args: wire.args,
cwd: wire.cwd,
Expand Down
Loading
Loading