Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 86 additions & 39 deletions src/pipelines/exec/navi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use {
pub(crate) struct StepPath(SmallVec<[usize; 8]>);

const PROLOGUE_INDEX: usize = usize::MIN;
const EPILOGUE_INDEX: usize = usize::MAX;
const EPILOGUE_START_INDEX: usize = usize::MAX - 1024; // Reserve space for epilogue steps
const STEP0_INDEX: usize = PROLOGUE_INDEX + 1;

/// Public API
Expand Down Expand Up @@ -86,7 +86,7 @@ impl StepPath {

/// Returns `true` if the path is pointing to an epilogue of a pipeline.
pub(crate) fn is_epilogue(&self) -> bool {
self.leaf() == EPILOGUE_INDEX
self.leaf() >= EPILOGUE_START_INDEX
}
}

Expand Down Expand Up @@ -231,9 +231,15 @@ impl StepPath {
Self(smallvec![PROLOGUE_INDEX])
}

/// Returns a leaf step path pointing at the epilogue step.
/// Returns a leaf step path pointing at the first epilogue step.
pub(in crate::pipelines) fn epilogue() -> Self {
Self(smallvec![EPILOGUE_INDEX])
Self::epilogue_step(0)
}

/// Returns a leaf step path pointing at a specific epilogue step with the
/// given index.
pub(in crate::pipelines) fn epilogue_step(epilogue_index: usize) -> Self {
Self(smallvec![epilogue_index + EPILOGUE_START_INDEX])
}

/// Returns a new step path that points to the first non-prologue and
Expand Down Expand Up @@ -275,14 +281,18 @@ impl core::fmt::Display for StepPath {
if let Some(&first) = iter.next() {
match first {
PROLOGUE_INDEX => write!(f, "p"),
EPILOGUE_INDEX => write!(f, "e"),
idx if idx >= EPILOGUE_START_INDEX => {
write!(f, "e{}", idx - EPILOGUE_START_INDEX)
}
index => write!(f, "{index}"),
}?;

for &index in iter {
match index {
PROLOGUE_INDEX => write!(f, "_p"),
EPILOGUE_INDEX => write!(f, "_e"),
idx if idx >= EPILOGUE_START_INDEX => {
write!(f, "_e{}", idx - EPILOGUE_START_INDEX)
}
index => write!(f, "_{index}"),
}?;
}
Expand Down Expand Up @@ -318,7 +328,7 @@ impl<'a, P: Platform> StepNavigator<'a, P> {
/// In pipelines with a prologue, this will point to the prologue step.
/// In pipelines without a prologue, this will point to the first step.
/// In pipelines with no steps, but with an epilogue, this will point to the
/// epilogue step.
/// first epilogue step.
///
/// If the first item in the pipeline is a nested pipeline, this will dig
/// deeper into the nested pipeline to find the first executable item.
Expand All @@ -336,9 +346,10 @@ impl<'a, P: Platform> StepNavigator<'a, P> {

// pipeline has no prologue
if pipeline.steps().is_empty() {
// If there are no steps, but there is an epilogue, return it.
if pipeline.epilogue().is_some() {
return Some(Self(StepPath::epilogue(), vec![pipeline]));
// If there are no steps, but there is an epilogue, return the first
// epilogue step.
if !pipeline.epilogue().is_empty() {
return Self(StepPath::epilogue(), vec![pipeline]).enter();
}

// this is an empty pipeline, there is nothing executable.
Expand All @@ -360,9 +371,17 @@ impl<'a, P: Platform> StepNavigator<'a, P> {
.prologue()
.expect("Step path points to a non-existing prologue")
} else if self.is_epilogue() {
enclosing_pipeline
let epilogue_index = step_index - EPILOGUE_START_INDEX;
let StepOrPipeline::Step(step) = enclosing_pipeline
.epilogue()
.get(epilogue_index)
.expect("Step path points to a non-existing epilogue")
else {
unreachable!(
"StepNavigator should not point to a pipeline, only to steps"
)
};
step
} else {
let StepOrPipeline::Step(step) = enclosing_pipeline
.steps()
Expand Down Expand Up @@ -400,8 +419,16 @@ impl<'a, P: Platform> StepNavigator<'a, P> {
/// Returns `None` if there are no more steps to execute in the pipeline.
pub(crate) fn next_ok(self) -> Option<Self> {
if self.is_epilogue() {
// the loop is over.
return self.next_in_parent();
// we are in an epilogue step, check if there are more epilogue steps
let enclosing_pipeline = self.pipeline();
let epilogue_index = self.0.leaf() - EPILOGUE_START_INDEX;

if epilogue_index + 1 < enclosing_pipeline.epilogue().len() {
// there are more epilogue steps, go to the next one
return Self(self.0.increment_leaf(), self.1.clone()).enter();
}
// this is the last epilogue step, we are done with this pipeline
return self.next_in_parent();
}

if self.is_prologue() {
Expand Down Expand Up @@ -519,45 +546,65 @@ impl<P: Platform> StepNavigator<'_, P> {
"StepNavigator should always have at least one enclosing pipeline",
);

if path.is_prologue() || path.is_epilogue() {
if path.is_prologue() {
assert!(
enclosing_pipeline.prologue().is_some()
|| enclosing_pipeline.epilogue().is_some(),
"path is prologue or epilogue, but enclosing pipeline has none",
enclosing_pipeline.prologue().is_some(),
"path is prologue, but enclosing pipeline has none",
);
// if we are in a prologue or epilogue, we can just return ourselves.
// if we are in a prologue, we can just return ourselves.
return Some(Self(path, ancestors));
}

let step_index = path
.leaf()
.checked_sub(STEP0_INDEX)
.expect("path is not prologue");

match enclosing_pipeline.steps().get(step_index)? {
StepOrPipeline::Step(_) => {
// if we are pointing at a step, we can just return ourselves.
Some(Self(path, ancestors))
if path.is_epilogue() {
let epilogue_index = path.leaf() - EPILOGUE_START_INDEX;
match enclosing_pipeline.epilogue().get(epilogue_index)? {
StepOrPipeline::Step(_) => {
// if we are pointing at an epilogue step, we can just return
// ourselves.
Some(Self(path, ancestors))
}
StepOrPipeline::Pipeline(_, nested) => {
// if we are pointing at a pipeline, we need to dig into its
// entrypoint.
Some(StepNavigator(path, ancestors).join(Self::entrypoint(nested)?))
}
}
StepOrPipeline::Pipeline(_, nested) => {
// if we are pointing at a pipeline, we need to dig into its entrypoint.
Some(StepNavigator(path, ancestors).join(Self::entrypoint(nested)?))
} else {
let step_index = path
.leaf()
.checked_sub(STEP0_INDEX)
.expect("path is not prologue or epilogue");

match enclosing_pipeline.steps().get(step_index)? {
StepOrPipeline::Step(_) => {
// if we are pointing at a step, we can just return ourselves.
Some(Self(path, ancestors))
}
StepOrPipeline::Pipeline(_, nested) => {
// if we are pointing at a pipeline, we need to dig into its
// entrypoint.
Some(StepNavigator(path, ancestors).join(Self::entrypoint(nested)?))
}
}
}
}

/// Finds the next step to run when a loop is finished.
///
/// The next step could be either the epilogue of the current pipeline,
/// or the next step in the parent pipeline.
/// The next step could be either the first epilogue step of the current
/// pipeline, or the next step in the parent pipeline.
fn after_loop(self) -> Option<Self> {
if self.pipeline().epilogue().is_some() {
// we've reached the epilogue of this pipeline, regardless of the
// looping behavior, we should go to the next step in the parent pipeline.
Some(Self(self.0.replace_leaf(EPILOGUE_INDEX), self.1.clone()))
} else {
self.next_in_parent()
}
if self.pipeline().epilogue().is_empty() {
self.next_in_parent()
} else {
// we've reached the epilogue of this pipeline, go to the first epilogue
// step
Some(Self(
self.0.replace_leaf(EPILOGUE_START_INDEX),
self.1.clone(),
))
.and_then(|nav| nav.enter())
}
}

/// Finds the next step to run after the prologue of the current pipeline.
Expand Down
37 changes: 28 additions & 9 deletions src/pipelines/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ struct Frame<'a, P: Platform> {
path: StepPath,
next_ix: usize,
yielded_prologue: bool,
yielded_epilogue: bool,
epilogue_ix: usize,
}

impl<'a, P: Platform> StepPathIter<'a, P> {
Expand All @@ -22,7 +22,7 @@ impl<'a, P: Platform> StepPathIter<'a, P> {
next_ix: 0,
path: StepPath::empty(),
yielded_prologue: false,
yielded_epilogue: false,
epilogue_ix: 0,
}],
}
}
Expand Down Expand Up @@ -56,17 +56,36 @@ impl<P: Platform> Iterator for StepPathIter<'_, P> {
path: next_path,
next_ix: 0,
yielded_prologue: false,
yielded_epilogue: false,
epilogue_ix: 0,
});
continue;
}
}
}

// Yield epilogue once, if present.
if !frame.yielded_epilogue && frame.pipeline.epilogue.is_some() {
frame.yielded_epilogue = true;
return Some(frame.path.clone().concat(StepPath::epilogue()));
// Walk epilogue steps/pipelines; descend into nested pipelines.
if frame.epilogue_ix < frame.pipeline.epilogue.len() {
let ix = frame.epilogue_ix;
frame.epilogue_ix += 1;
match &frame.pipeline.epilogue[ix] {
StepOrPipeline::Step(_) => {
return Some(
frame.path.clone().concat(StepPath::epilogue_step(ix)),
);
}
StepOrPipeline::Pipeline(_, nested) => {
let next_path =
frame.path.clone().concat(StepPath::epilogue_step(ix));
self.stack.push(Frame {
pipeline: nested,
path: next_path,
next_ix: 0,
yielded_prologue: false,
epilogue_ix: 0,
});
continue;
}
}
}

// Done with this frame; pop and continue with parent.
Expand Down Expand Up @@ -102,7 +121,7 @@ mod tests {
.with_step(Step3)
.with_epilogue(EpilogueOne);

let expected = vec!["p", "1", "2", "3", "e"];
let expected = vec!["p", "1", "2", "3", "e0"];
let actual = pipeline
.iter_steps()
.map(|step| step.to_string())
Expand All @@ -125,7 +144,7 @@ mod tests {
.with_epilogue(EpilogueOne);

let expected = vec![
"p", "1_p", "1_1", "1_2_1", "1_2_2", "1_2_3", "1_2_e", "1_3", "e",
"p", "1_p", "1_1", "1_2_1", "1_2_2", "1_2_3", "1_2_e0", "1_3", "e0",
];

let actual = pipeline
Expand Down
35 changes: 27 additions & 8 deletions src/pipelines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub enum Behavior {
}

pub struct Pipeline<P: Platform> {
epilogue: Option<Arc<StepInstance<P>>>,
epilogue: Vec<StepOrPipeline<P>>,
prologue: Option<Arc<StepInstance<P>>>,
steps: Vec<StepOrPipeline<P>>,
limits: Option<Arc<dyn ScopedLimits<P>>>,
Expand All @@ -53,7 +53,7 @@ impl<P: Platform> Default for Pipeline<P> {
#[track_caller]
fn default() -> Self {
Self {
epilogue: None,
epilogue: Vec::new(),
prologue: None,
steps: Vec::new(),
limits: None,
Expand Down Expand Up @@ -82,11 +82,30 @@ impl<P: Platform> Pipeline<P> {
}

/// A step that happens as the last step of the block after the whole payload
/// has been built.
/// has been built. Can be called multiple times to add multiple epilogue
/// steps.
#[must_use]
pub fn with_epilogue(self, step: impl Step<P>) -> Self {
let mut this = self;
this.epilogue = Some(Arc::new(StepInstance::new(step)));
this
.epilogue
.push(StepOrPipeline::Step(Arc::new(StepInstance::new(step))));
this
}

/// Adds a nested pipeline to the epilogue.
#[must_use]
#[track_caller]
pub fn with_epilogue_pipeline<T>(
self,
behavior: Behavior,
nested: impl IntoPipeline<P, T>,
) -> Self {
let mut this = self;
let nested_pipeline = nested.into_pipeline();
this
.epilogue
.push(StepOrPipeline::Pipeline(behavior, nested_pipeline));
this
}

Expand Down Expand Up @@ -151,7 +170,7 @@ impl<P: Platform> Pipeline<P> {
impl<P: Platform> Pipeline<P> {
/// Returns true if the pipeline has no steps, prologue or epilogue.
pub fn is_empty(&self) -> bool {
self.prologue.is_none() && self.epilogue.is_none() && self.steps.is_empty()
self.prologue.is_none() && self.epilogue.is_empty() && self.steps.is_empty()
}

/// An optional name of the pipeline.
Expand All @@ -176,8 +195,8 @@ impl<P: Platform> Pipeline<P> {
self.prologue.as_ref()
}

pub(crate) fn epilogue(&self) -> Option<&Arc<StepInstance<P>>> {
self.epilogue.as_ref()
pub(crate) fn epilogue(&self) -> &[StepOrPipeline<P>] {
&self.epilogue
}

pub(crate) fn steps(&self) -> &[StepOrPipeline<P>] {
Expand Down Expand Up @@ -315,7 +334,7 @@ impl<P: Platform> core::fmt::Debug for Pipeline<P> {
f.debug_struct("Pipeline")
.field("name", &self.name())
.field("prologue", &self.prologue.as_ref().map(|p| p.name()))
.field("epilogue", &self.epilogue.as_ref().map(|e| e.name()))
.field("epilogue", &self.epilogue)
.field("steps", &self.steps)
.field("limits", &self.limits.is_some())
.finish_non_exhaustive()
Expand Down
Loading