Skip to content

Commit 8165fa2

Browse files
committed
Fix Warnings
1 parent 574a77b commit 8165fa2

File tree

5 files changed

+31
-72
lines changed

5 files changed

+31
-72
lines changed

tokio/src/runtime/scheduler/multi_thread/queue.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub(crate) trait Owner<T: 'static>: Send + Sync {
3030

3131
/// Returns a tuple with the lower bound and an Option for the upper bound of remaining
3232
/// slots for enqueuing in the queue.
33-
fn remaining_slots_hint(&self) -> (u32, Option<u32>);
33+
fn remaining_slots_hint(&self) -> (u16, Option<u16>);
3434

3535
/// Returns true if there are entries in the queue.
3636
fn has_tasks(&self) -> bool;

tokio/src/runtime/scheduler/multi_thread/queue/bwosq.rs

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,22 +57,16 @@ impl<T> super::Owner<T> for Local<T> {
5757
metrics: &mut MetricsBatch,
5858
) {
5959
if let Err(t) = self.inner.enqueue(task) {
60-
inject.push(t);
61-
// note: the current implementation is slow
62-
// if self.inner.has_stealers() {
63-
// inject.push(t);
64-
// } else {
65-
// // push overflow of old queue
66-
// if let Some(block_iter) = self.inner.dequeue_block() {
67-
// // could use `and_then` to chain block dequeues a couple of times if
68-
// // successfull, if we want to steal more than one block
69-
// inject.push_batch(block_iter.chain(std::iter::once(t)))
70-
// } else {
71-
// // Give up and use inject queue.
72-
// inject.push(t)
73-
// }
74-
// }
75-
// Add 1 to factor in the task currently being scheduled.
60+
if self.inner.next_block_has_stealers() {
61+
inject.push(t);
62+
} else {
63+
// push overflow of old queue
64+
if let Some(block_iter) = self.inner.dequeue_block() {
65+
inject.push_batch(block_iter.chain(std::iter::once(t)))
66+
} else {
67+
inject.push(t)
68+
}
69+
}
7670
metrics.incr_overflow_count();
7771
};
7872
}
@@ -94,12 +88,13 @@ impl<T> super::Owner<T> for Local<T> {
9488
let _num_enqueued = self.inner.enqueue_batch_unchecked(tasks);
9589
}
9690

97-
fn remaining_slots_hint(&self) -> (u32, Option<u32>) {
91+
fn remaining_slots_hint(&self) -> (u16, Option<u16>) {
9892
let min_slots = self.inner.min_remaining_slots();
93+
debug_assert!(min_slots <= u16::MAX.into());
9994
// Note: If we do change from a linked list of blocks to an array of blocks,
10095
// we may be able to quickly calculate an approximate upper bound based
10196
// on the consumer cache _index_.
102-
(min_slots as u32, None)
97+
(min_slots as u16, None)
10398
}
10499

105100
fn pop(&mut self) -> Option<task::Notified<T>> {
@@ -128,7 +123,7 @@ impl<T> super::Stealer<T> for Steal<T> {
128123
) -> Option<task::Notified<T>> {
129124
// In the rare case that the `dst` queue is at the same time also full, because the
130125
// producer is blocked waiting on a stealer we only attempt to steal a single task
131-
if dst.remaining_slots_hint().0 < ELEMENTS_PER_BLOCK as u32 {
126+
if dst.remaining_slots_hint().0 < ELEMENTS_PER_BLOCK as u16 {
132127
dst_metrics.incr_steal_count(1);
133128
dst_metrics.incr_steal_operations();
134129
// We could evaluate stealing exactly the amount of remaining slots + 1.

tokio/src/runtime/scheduler/multi_thread/queue/bwosq/bwosqueue/metadata.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ impl<const NUM_ELEMENTS_PER_BLOCK: usize> Index<NUM_ELEMENTS_PER_BLOCK> {
4242

4343
/// True if the block is empty
4444
#[inline(always)]
45+
#[allow(dead_code)]
4546
pub(crate) fn is_empty(&self) -> bool {
4647
self.0 == 0
4748
}

tokio/src/runtime/scheduler/multi_thread/queue/bwosq/bwosqueue/mod.rs

Lines changed: 10 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ pub(crate) struct Owner<E, const NUM_BLOCKS: usize, const ENTRIES_PER_BLOCK: usi
6262
pcache: CachePadded<*const Block<E, { ENTRIES_PER_BLOCK }>>,
6363
/// Consumer cache (single consumer) - points to block in self.queue.
6464
ccache: CachePadded<*const Block<E, { ENTRIES_PER_BLOCK }>>,
65-
/// Stealer position cache - Allows the owner to quickly check if there are any stealers
66-
spos: CachePadded<Arc<AtomicUsize>>,
6765
/// `Arc` to the actual queue to ensure the queue lives at least as long as the Owner.
6866
#[allow(dead_code)]
6967
queue: Pin<Arc<BwsQueue<E, NUM_BLOCKS, ENTRIES_PER_BLOCK>>>,
@@ -373,37 +371,15 @@ impl<E, const NUM_BLOCKS: usize, const ENTRIES_PER_BLOCK: usize>
373371
true
374372
}
375373

376-
// /// Advance consumer to the next block, unless the producer has not reached the block yet.
377-
// fn try_advance_consumer_block(
378-
// &mut self,
379-
// next_block: &Block<E, ENTRIES_PER_BLOCK>,
380-
// curr_consumed: IndexAndVersion<ENTRIES_PER_BLOCK>,
381-
// ) -> Result<(), ()> {
382-
// if self.can_advance_consumer_block(next_block, curr_consumed) {
383-
// *self.ccache = next_block;
384-
// Ok(())
385-
// } else {
386-
// Err(())
387-
// }
388-
// }
389-
390-
/// Todo: Ideally we would not have this function.
391-
pub(crate) fn has_stealers(&self) -> bool {
392-
let curr_spos = self.spos.load(Relaxed);
393-
// spos increments beyond NUM_BLOCKS to prevent ABA problems.
394-
let start_block_idx = curr_spos % NUM_BLOCKS;
395-
for i in 0..NUM_BLOCKS {
396-
let block_idx = (start_block_idx + i) % NUM_BLOCKS;
397-
let blk: &Block<E, ENTRIES_PER_BLOCK> = &self.queue.blocks[block_idx];
398-
let stolen = blk.stolen.load(Relaxed);
399-
let reserved = blk.reserved.load(Relaxed);
400-
if reserved != stolen {
401-
return true;
402-
} else if !reserved.index().is_full() {
403-
return false;
404-
}
405-
}
406-
false
374+
/// Check if there are any entries in the next block that are currently being stolen.
375+
pub(crate) fn next_block_has_stealers(&self) -> bool {
376+
// SAFETY: `pcache` always points to a valid `Block` in the queue. We never create a mutable reference
377+
// to a Block, so it is safe to construct a shared reference here.
378+
let blk = unsafe { &**self.pcache };
379+
let reserved = blk.reserved.load(Relaxed);
380+
let stolen = blk.stolen.load(Relaxed);
381+
// If reserved and stolen don't match then there is still an active stealer in the block.
382+
stolen != reserved
407383
}
408384

409385
/// Check if there are entries that can be stolen from the queue.
@@ -450,18 +426,6 @@ impl<E, const NUM_BLOCKS: usize, const ENTRIES_PER_BLOCK: usize>
450426
free_slots
451427
}
452428

453-
/// Returns `true` if enqueuing one block of entries would succeed.
454-
pub(crate) fn can_enqueue_block(&self) -> bool {
455-
// Note: the current implementation of this function is overly conservative but fast.
456-
let current_block = unsafe { &*(**self.pcache).next() };
457-
let committed = current_block.committed.load(Relaxed);
458-
if committed.index().is_empty() {
459-
true
460-
} else {
461-
self.is_next_block_writable(current_block, committed.version())
462-
}
463-
}
464-
465429
/// `true` if there is at least one entry that can be dequeued.
466430
///
467431
/// It is possible that a dequeue can still fail, since the item was stolen after we checked
@@ -740,7 +704,7 @@ impl<E, const NUM_BLOCKS: usize, const ENTRIES_PER_BLOCK: usize>
740704

741705
/// The estimated number of entries currently enqueued.
742706
#[cfg(feature = "stats")]
743-
pub fn estimated_queue_entries(&self) -> usize {
707+
pub(crate) fn estimated_queue_entries(&self) -> usize {
744708
self.queue.estimated_len()
745709
}
746710
}
@@ -851,7 +815,6 @@ pub(crate) fn new<E, const NUM_BLOCKS: usize, const ENTRIES_PER_BLOCK: usize>()
851815
Owner {
852816
pcache: CachePadded::new(first_block),
853817
ccache: CachePadded::new(first_block),
854-
spos: CachePadded::new(stealer_position.clone()),
855818
queue: q.clone(),
856819
},
857820
Stealer {

tokio/src/runtime/scheduler/multi_thread/queue/tokioq.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ impl<T: 'static> super::Owner<T> for Local<T> {
395395

396396
/// Returns a tuple with the lower bound and an Option for the upper bound of remaining
397397
/// slots for enqueuing in the queue.
398-
fn remaining_slots_hint(&self) -> (u32, Option<u32>) {
398+
fn remaining_slots_hint(&self) -> (u16, Option<u16>) {
399399
// Safety: We own the queue and thus are the only ones that could potentially mutate
400400
// `inner.tail`.
401401
let tail = unsafe { self.inner.tail.unsync_load() };
@@ -410,8 +410,8 @@ impl<T: 'static> super::Owner<T> for Local<T> {
410410
// `tail` is always larger then `steal`, since the counter is monotonically increasing,
411411
// at least until it wraps around at `UnsignedShort::MAX`. wrapping_sub always gives the
412412
// correct difference.
413-
let capacity = LOCAL_QUEUE_CAPACITY as UnsignedShort - (tail.wrapping_sub(steal)) as u32;
414-
(capacity, Some(capacity))
413+
let capacity = LOCAL_QUEUE_CAPACITY as UnsignedShort - (tail.wrapping_sub(steal));
414+
(capacity as u16, Some(capacity as u16))
415415
}
416416

417417
/// Pops a task from the local queue.
@@ -546,7 +546,7 @@ impl<T: 'static> super::Stealer<T> for Steal<T> {
546546

547547
impl<T> Steal<T> {
548548
/// Steal half of the queues item, but not more than `max`.
549-
fn steal_half_max(&self, max: u32) -> Option<StealerIterator<'_, T>> {
549+
fn steal_half_max(&self, max: u16) -> Option<StealerIterator<'_, T>> {
550550
let mut prev_packed = self.0.head.load(Acquire);
551551
let mut next_packed;
552552

@@ -561,7 +561,7 @@ impl<T> Steal<T> {
561561
// Number of available tasks to steal
562562
let n = src_tail.wrapping_sub(src_head_real);
563563
let n = n - n / 2;
564-
let n = cmp::min(n, max);
564+
let n = cmp::min(n, max as UnsignedShort);
565565

566566
if n == 0 {
567567
// No tasks available to steal

0 commit comments

Comments
 (0)