Skip to content

Commit b5616ce

Browse files
authored
feat: introduce tokio compact StreamExt and remove AsyncOpShm (#4)
Given the strong coupling between `AsyncReadShm` & `AsyncWriteShm` and `Stream`, and the need to explicitly call related `Stream` interfaces (such as `close` and `reuse`) after using the interfaces of AsyncOpShm, abstracting those traits is unnecessary. However, for compatibility with `AsyncRead` and `AsyncWrite` of TokIO, we added `StreamExt` to provide implementations for `AsyncRead` and `AsyncWrite`. But since explicit calls to `Stream`'s interfaces are still required in this case, we added detailed documentation to `StreamExt` to remind users of this. So we removed `AsyncReadShm` and `AsyncWriteShm` and Introduced `StreamExt` which implements `AsyncRead` and `AsyncWrite` of TokIO. Signed-off-by: Yu Li <[email protected]>
1 parent 012e11a commit b5616ce

File tree

24 files changed

+692
-173
lines changed

24 files changed

+692
-173
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "shmipc"
3-
version = "0.0.1"
3+
version = "0.1.0"
44
edition = "2024"
55
authors = ["Volo Team <[email protected]>"]
66
license = "Apache-2.0"
@@ -35,20 +35,13 @@ rand = "0.9"
3535
tokio-scoped = "0.2"
3636
tracing-subscriber = "0.3"
3737

38-
39-
[profile.dev]
40-
overflow-checks = false
41-
4238
[profile.release]
4339
opt-level = 3
4440
debug = true
4541
rpath = false
4642
lto = true
47-
debug-assertions = false
4843
codegen-units = 1
4944
panic = 'unwind'
50-
incremental = false
51-
overflow-checks = false
5245

5346
[workspace]
5447
members = ["examples"]

benches/bench.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ use std::os::unix::net::SocketAddr;
1616

1717
use criterion::{Criterion, criterion_group, criterion_main};
1818
use shmipc::{
19-
AsyncReadShm, AsyncWriteShm, BufferReader, BufferSlice, Error, LinkedBuffer, Listener,
20-
SessionManager, SessionManagerConfig, Stream,
19+
Error, Listener,
20+
buffer::{BufferReader, BufferSlice, LinkedBuffer},
2121
config::SizePercentPair,
2222
consts::MemMapType,
23+
session::{SessionManager, SessionManagerConfig},
24+
stream::Stream,
2325
transport::{DefaultUnixConnect, DefaultUnixListen},
2426
};
2527
use tokio::{
@@ -93,7 +95,7 @@ fn criterion_benchmark(c: &mut Criterion) {
9395
loop {
9496
tokio::select! {
9597
r = server.accept() => {
96-
let mut stream = r.unwrap().unwrap();
98+
let mut stream = r.unwrap();
9799
tokio::spawn(async move {
98100
loop {
99101
if !must_read(&mut stream, size).await {

examples/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ tracing-subscriber = "0.3"
1616
name = "greeter_client"
1717
path = "src/hello_world/greeter_client.rs"
1818

19+
[[bin]]
20+
name = "greeter_client_tokio"
21+
path = "src/hello_world/greeter_client_tokio.rs"
22+
1923
[[bin]]
2024
name = "greeter_server"
2125
path = "src/hello_world/greeter_server.rs"
26+
27+
[[bin]]
28+
name = "greeter_server_tokio"
29+
path = "src/hello_world/greeter_server_tokio.rs"

examples/src/hello_world/greeter_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::os::unix::net::SocketAddr;
1616

1717
use shmipc::{
18-
AsyncReadShm, AsyncWriteShm, SessionManager, SessionManagerConfig,
18+
session::{SessionManager, SessionManagerConfig},
1919
transport::DefaultUnixConnect,
2020
};
2121

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2025 CloudWeGo Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::os::unix::net::SocketAddr;
16+
17+
use shmipc::{
18+
compact::StreamExt,
19+
session::{SessionManager, SessionManagerConfig},
20+
transport::DefaultUnixConnect,
21+
};
22+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
23+
24+
#[tokio::main]
25+
async fn main() {
26+
tracing_subscriber::fmt::init();
27+
let dir = std::env::current_dir().unwrap();
28+
let uds_path = SocketAddr::from_pathname(dir.join("../ipc_test.sock")).unwrap();
29+
30+
let mut conf = SessionManagerConfig::new();
31+
conf.config_mut().mem_map_type = shmipc::consts::MemMapType::MemMapTypeMemFd;
32+
conf.config_mut().share_memory_path_prefix = "/dev/shm/client.ipc.shm".to_string();
33+
#[cfg(target_os = "macos")]
34+
{
35+
conf.config.share_memory_path_prefix = "/tmp/client.ipc.shm".to_string();
36+
conf.config.queue_path = "/tmp/client.ipc.shm_queue".to_string();
37+
}
38+
39+
let sm = SessionManager::new(conf, DefaultUnixConnect, uds_path)
40+
.await
41+
.unwrap();
42+
let stream = sm.get_stream().unwrap();
43+
let request_msg = "client say hello world!!!";
44+
45+
let mut conn = StreamExt::new(stream);
46+
47+
println!(
48+
"size: {}",
49+
conn.write(request_msg.as_bytes()).await.unwrap(),
50+
);
51+
conn.flush().await.unwrap();
52+
53+
const EXPECTED_RESP: &str = "server hello world!!!";
54+
let mut buf = vec![0; 4096];
55+
56+
conn.read_exact(&mut buf[..EXPECTED_RESP.len()])
57+
.await
58+
.unwrap();
59+
println!(
60+
"client stream receive response {}",
61+
str::from_utf8(&buf[..EXPECTED_RESP.len()]).unwrap()
62+
);
63+
conn.inner().reuse().await;
64+
sm.close().await;
65+
}

examples/src/hello_world/greeter_server.rs

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::os::unix::net::SocketAddr;
1616

1717
use nix::unistd::unlink;
18-
use shmipc::{AsyncReadShm, AsyncWriteShm, Listener, config::Config, transport::DefaultUnixListen};
18+
use shmipc::{Listener, config::Config, stream::Stream, transport::DefaultUnixListen};
1919

2020
#[tokio::main]
2121
async fn main() {
@@ -32,19 +32,37 @@ async fn main() {
3232
)
3333
.await
3434
.unwrap();
35-
let mut stream = ln.accept().await.unwrap().unwrap();
36-
let req_msg = stream
37-
.read_bytes("client say hello world!!!".len())
38-
.await
39-
.unwrap();
40-
println!(
41-
"server receive request {}",
42-
String::from_utf8(req_msg.to_vec()).unwrap()
43-
);
4435

45-
let resp_msg = "server hello world!!!";
46-
stream.write_bytes(resp_msg.as_bytes()).unwrap();
47-
stream.flush(true).await.unwrap();
36+
loop {
37+
let stream = match ln.accept().await {
38+
Ok(stream) => stream,
39+
Err(e) => {
40+
eprintln!("failed to accept conn, err: {e}");
41+
continue;
42+
}
43+
};
44+
tokio::spawn(handle_stream(stream));
45+
}
46+
}
47+
48+
async fn handle_stream(mut stream: Stream) {
49+
loop {
50+
let req_msg = match stream.read_bytes("client say hello world!!!".len()).await {
51+
Ok(msg) => msg,
52+
Err(e) => {
53+
eprintln!("failed to read msg, err: {e}");
54+
break;
55+
}
56+
};
57+
println!(
58+
"server receive request {}",
59+
String::from_utf8(req_msg.to_vec()).unwrap()
60+
);
61+
62+
let resp_msg = "server hello world!!!";
63+
stream.write_bytes(resp_msg.as_bytes()).unwrap();
64+
stream.flush(true).await.unwrap();
65+
}
4866
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
4967
stream.close().await.unwrap();
5068
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2025 CloudWeGo Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::os::unix::net::SocketAddr;
16+
17+
use nix::unistd::unlink;
18+
use shmipc::{
19+
Listener, compact::StreamExt, config::Config, stream::Stream, transport::DefaultUnixListen,
20+
};
21+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
22+
23+
#[tokio::main]
24+
async fn main() {
25+
tracing_subscriber::fmt::init();
26+
let dir = std::env::current_dir().unwrap();
27+
let binding = dir.join("../ipc_test.sock");
28+
let uds_path = binding.to_str().unwrap();
29+
_ = unlink(uds_path);
30+
31+
let mut ln = Listener::new(
32+
DefaultUnixListen,
33+
SocketAddr::from_pathname(binding).unwrap(),
34+
Config::default(),
35+
)
36+
.await
37+
.unwrap();
38+
39+
loop {
40+
let stream = match ln.accept().await {
41+
Ok(stream) => stream,
42+
Err(e) => {
43+
eprintln!("failed to accept conn, err: {e}");
44+
continue;
45+
}
46+
};
47+
tokio::spawn(handle_stream(stream));
48+
}
49+
}
50+
51+
async fn handle_stream(stream: Stream) {
52+
const EXPECTED_REQ: &str = "client say hello world!!!";
53+
let mut buf = vec![0; 4096];
54+
let mut conn = StreamExt::new(stream);
55+
56+
loop {
57+
match conn.read_exact(&mut buf[..EXPECTED_REQ.len()]).await {
58+
Ok(len) => println!("read {len}"),
59+
Err(e) => {
60+
eprintln!("failed to read msg, err: {e}");
61+
break;
62+
}
63+
}
64+
65+
println!(
66+
"server receive request {}",
67+
str::from_utf8(&buf[..EXPECTED_REQ.len()]).unwrap()
68+
);
69+
70+
let resp_msg = "server hello world!!!";
71+
conn.write_all(resp_msg.as_bytes()).await.unwrap();
72+
conn.flush().await.unwrap();
73+
}
74+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
75+
conn.shutdown().await.unwrap();
76+
}

src/buffer/buf.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,20 @@ pub enum Buf<'shm> {
2323
}
2424

2525
impl Buf<'_> {
26+
pub const fn len(&self) -> usize {
27+
match self {
28+
Self::Shm(s) => s.len(),
29+
Self::Exm(b) => b.len(),
30+
}
31+
}
32+
33+
pub const fn is_empty(&self) -> bool {
34+
match self {
35+
Self::Shm(s) => s.is_empty(),
36+
Self::Exm(b) => b.is_empty(),
37+
}
38+
}
39+
2640
/// # Safety
2741
///
2842
/// The caller must ensure that the shm buf is valid before the return value is out of usage.
@@ -217,3 +231,21 @@ impl PartialOrd<Buf<'_>> for &str {
217231
<[u8] as PartialOrd<[u8]>>::partial_cmp(self.as_bytes(), other)
218232
}
219233
}
234+
235+
impl bytes::Buf for Buf<'_> {
236+
fn remaining(&self) -> usize {
237+
self.len()
238+
}
239+
240+
fn chunk(&self) -> &[u8] {
241+
self.as_ref()
242+
}
243+
244+
fn advance(&mut self, cnt: usize) {
245+
// no need to care cnt and len, `bytes::Buf` for `&[u8]` and `Bytes` will process it
246+
match self {
247+
Self::Shm(s) => bytes::Buf::advance(s, cnt),
248+
Self::Exm(b) => bytes::Buf::advance(b, cnt),
249+
}
250+
}
251+
}

src/buffer/list.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -438,15 +438,14 @@ mod tests {
438438
}
439439

440440
#[test]
441+
#[should_panic]
441442
fn test_create_free_buffer_list() {
442-
assert!(
443-
BufferList::create(
444-
4294967295,
445-
4294967295,
446-
&MmapOptions::new().len(1).map_anon().unwrap(),
447-
4294967279
448-
)
449-
.is_err()
443+
BufferList::create(
444+
4294967295,
445+
4294967295,
446+
&MmapOptions::new().len(1).map_anon().unwrap(),
447+
4294967279,
450448
)
449+
.unwrap_err();
451450
}
452451
}

0 commit comments

Comments
 (0)