|
11 | 11 | //! src/backend/storage/file/fd.c
|
12 | 12 | //!
|
13 | 13 | use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
|
14 |
| -use crate::page_cache::PageWriteGuard; |
15 | 14 | use crate::tenant::TENANTS_SEGMENT_NAME;
|
16 | 15 | use camino::{Utf8Path, Utf8PathBuf};
|
17 | 16 | use once_cell::sync::OnceCell;
|
18 | 17 | use std::fs::{self, File, OpenOptions};
|
19 | 18 | use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
20 |
| -use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd}; |
21 | 19 | use std::os::unix::fs::FileExt;
|
22 | 20 | use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
23 | 21 | use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
@@ -372,7 +370,7 @@ impl VirtualFile {
|
372 | 370 | ///
|
373 | 371 | /// We are doing it via a macro as Rust doesn't support async closures that
|
374 | 372 | /// take on parameters with lifetimes.
|
375 |
| - async fn lock_file(&self) -> Result<FileGuard<'static>, Error> { |
| 373 | + async fn lock_file(&self) -> Result<FileGuard<'_>, Error> { |
376 | 374 | let open_files = get_open_files();
|
377 | 375 |
|
378 | 376 | let mut handle_guard = {
|
@@ -462,59 +460,24 @@ impl VirtualFile {
|
462 | 460 | }
|
463 | 461 |
|
464 | 462 | // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
465 |
| - pub async fn read_exact_at( |
466 |
| - &self, |
467 |
| - mut write_guard: PageWriteGuard<'static>, |
468 |
| - mut offset: u64, |
469 |
| - ) -> Result<PageWriteGuard<'static>, Error> { |
470 |
| - let file_guard: FileGuard<'static> = self.lock_file().await?; |
471 |
| - |
472 |
| - let system = tokio_epoll_uring::thread_local_system().await; |
473 |
| - struct PageWriteGuardBuf { |
474 |
| - buf: PageWriteGuard<'static>, |
475 |
| - init_up_to: usize, |
476 |
| - } |
477 |
| - unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { |
478 |
| - fn stable_ptr(&self) -> *const u8 { |
479 |
| - self.buf.as_ptr() |
480 |
| - } |
481 |
| - fn bytes_init(&self) -> usize { |
482 |
| - self.init_up_to |
483 |
| - } |
484 |
| - fn bytes_total(&self) -> usize { |
485 |
| - self.buf.len() |
486 |
| - } |
487 |
| - } |
488 |
| - unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { |
489 |
| - fn stable_mut_ptr(&mut self) -> *mut u8 { |
490 |
| - self.buf.as_mut_ptr() |
491 |
| - } |
492 |
| - |
493 |
| - unsafe fn set_init(&mut self, pos: usize) { |
494 |
| - assert!(pos <= self.buf.len()); |
495 |
| - self.init_up_to = pos; |
| 463 | + pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { |
| 464 | + while !buf.is_empty() { |
| 465 | + match self.read_at(buf, offset).await { |
| 466 | + Ok(0) => { |
| 467 | + return Err(Error::new( |
| 468 | + std::io::ErrorKind::UnexpectedEof, |
| 469 | + "failed to fill whole buffer", |
| 470 | + )) |
| 471 | + } |
| 472 | + Ok(n) => { |
| 473 | + buf = &mut buf[n..]; |
| 474 | + offset += n as u64; |
| 475 | + } |
| 476 | + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} |
| 477 | + Err(e) => return Err(e), |
496 | 478 | }
|
497 | 479 | }
|
498 |
| - let buf = PageWriteGuardBuf { |
499 |
| - buf: write_guard, |
500 |
| - init_up_to: 0, |
501 |
| - }; |
502 |
| - let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) }; |
503 |
| - let guard = scopeguard::guard(file_guard, |_| { |
504 |
| - panic!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)") |
505 |
| - }); |
506 |
| - let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await; |
507 |
| - let _ = OwnedFd::into_raw_fd(owned_fd); |
508 |
| - let _ = scopeguard::ScopeGuard::into_inner(guard); |
509 |
| - let PageWriteGuardBuf { |
510 |
| - buf: write_guard, |
511 |
| - init_up_to, |
512 |
| - } = buf; |
513 |
| - if let Ok(num_read) = res { |
514 |
| - assert!(init_up_to == num_read); // TODO need to deal with short reads here |
515 |
| - } |
516 |
| - res.map(|_| write_guard) |
517 |
| - .map_err(|e| Error::new(ErrorKind::Other, e)) |
| 480 | + Ok(()) |
518 | 481 | }
|
519 | 482 |
|
520 | 483 | // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
|
@@ -564,6 +527,18 @@ impl VirtualFile {
|
564 | 527 | Ok(n)
|
565 | 528 | }
|
566 | 529 |
|
| 530 | + pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> { |
| 531 | + let result = with_file!(self, StorageIoOperation::Read, |file| file |
| 532 | + .as_ref() |
| 533 | + .read_at(buf, offset)); |
| 534 | + if let Ok(size) = result { |
| 535 | + STORAGE_IO_SIZE |
| 536 | + .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) |
| 537 | + .add(size as i64); |
| 538 | + } |
| 539 | + result |
| 540 | + } |
| 541 | + |
567 | 542 | async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
568 | 543 | let result = with_file!(self, StorageIoOperation::Write, |file| file
|
569 | 544 | .as_ref()
|
|
0 commit comments