Skip to content

Commit fb7f075

Browse files
committed
estib: napify long running process
Building a POC on top of byron’s POC: - Export a function from Rust to Node that takes in a duration in ms and a callback. - The function returns immediately, doesn’t block. - The callback get’s event payloads from the Rust side. - A normal abort signal can be used to abort the process at anytime.
1 parent 15d15f9 commit fb7f075

File tree

5 files changed

+301
-19
lines changed

5 files changed

+301
-19
lines changed

crates/but-api/src/lib.rs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,30 @@ pub mod poc {
5151
time::{Duration, Instant},
5252
};
5353

54+
#[cfg(feature = "napi")]
55+
use std::{
56+
collections::HashMap,
57+
sync::{Mutex, OnceLock, atomic::AtomicU32},
58+
};
59+
60+
#[cfg(feature = "napi")]
61+
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
62+
63+
#[cfg(feature = "napi")]
64+
static NEXT_TASK_ID: AtomicU32 = AtomicU32::new(1);
65+
#[cfg(feature = "napi")]
66+
static TASK_INTERRUPTS: OnceLock<Mutex<HashMap<u32, Arc<AtomicBool>>>> = OnceLock::new();
67+
68+
#[cfg(feature = "napi")]
69+
fn task_interrupts() -> &'static Mutex<HashMap<u32, Arc<AtomicBool>>> {
70+
TASK_INTERRUPTS.get_or_init(|| Mutex::new(HashMap::new()))
71+
}
72+
73+
#[cfg(feature = "napi")]
74+
fn emit_event(callback: &ThreadsafeFunction<LongRunningEvent>, event: LongRunningEvent) {
75+
let _ = callback.call(Ok(event), ThreadsafeFunctionCallMode::NonBlocking);
76+
}
77+
5478
/// Either the actual data that is more and more complete, or increments that can be merged
5579
/// into the actual data by the receiver.
5680
/// Sending all data whenever it changes is probably better.
@@ -105,6 +129,143 @@ pub mod poc {
105129
rx
106130
}
107131

132+
/// Kinds of events emitted to JavaScript while a long-running task executes.
133+
#[cfg(feature = "napi")]
134+
#[napi_derive::napi(string_enum)]
135+
pub enum LongRunningEventKind {
136+
/// The task produced an intermediate progress update.
137+
Progress,
138+
/// The task finished successfully.
139+
Done,
140+
/// The task stopped because interruption was requested.
141+
Cancelled,
142+
/// The task failed with an error.
143+
Error,
144+
}
145+
146+
/// Event payload for Node callbacks.
147+
#[cfg(feature = "napi")]
148+
#[napi_derive::napi(object)]
149+
pub struct LongRunningEvent {
150+
/// The id of the task that emitted this event.
151+
pub task_id: u32,
152+
/// The event category.
153+
pub kind: LongRunningEventKind,
154+
/// The latest completed step if available.
155+
pub step: Option<u32>,
156+
/// An optional error message for failed tasks.
157+
pub message: Option<String>,
158+
}
159+
160+
/// Start a long-running task and stream progress to `callback` via a ThreadsafeFunction.
161+
///
162+
/// Returns the task id, which can be used to interrupt processing with
163+
/// [`long_running_cancel_tsfn()`].
164+
#[cfg(feature = "napi")]
165+
#[napi_derive::napi(js_name = "longRunningStartTsfn")]
166+
pub fn long_running_start_tsfn(
167+
duration_ms: u32,
168+
callback: ThreadsafeFunction<LongRunningEvent>,
169+
) -> napi::Result<u32> {
170+
let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
171+
let should_interrupt = Arc::new(AtomicBool::new(false));
172+
173+
let mut tasks = task_interrupts().lock().map_err(|_| {
174+
napi::Error::new(
175+
napi::Status::GenericFailure,
176+
"task registry is poisoned".to_string(),
177+
)
178+
})?;
179+
tasks.insert(task_id, should_interrupt.clone());
180+
drop(tasks);
181+
182+
let rx = long_running_non_blocking_thread(
183+
Duration::from_millis(u64::from(duration_ms)),
184+
gix::progress::Discard,
185+
should_interrupt.clone(),
186+
);
187+
188+
thread::spawn(move || {
189+
let mut last_step = None;
190+
let mut failed = false;
191+
192+
for result in rx {
193+
match result {
194+
Ok(data) => {
195+
let step = u32::try_from(data.0).unwrap_or(u32::MAX);
196+
last_step = Some(step);
197+
emit_event(
198+
&callback,
199+
LongRunningEvent {
200+
task_id,
201+
kind: LongRunningEventKind::Progress,
202+
step: Some(step),
203+
message: None,
204+
},
205+
);
206+
}
207+
Err(err) => {
208+
failed = true;
209+
emit_event(
210+
&callback,
211+
LongRunningEvent {
212+
task_id,
213+
kind: LongRunningEventKind::Error,
214+
step: last_step,
215+
message: Some(format!("{err:#}")),
216+
},
217+
);
218+
break;
219+
}
220+
}
221+
}
222+
223+
if !failed {
224+
let kind = if should_interrupt.load(Ordering::Relaxed) {
225+
LongRunningEventKind::Cancelled
226+
} else {
227+
LongRunningEventKind::Done
228+
};
229+
emit_event(
230+
&callback,
231+
LongRunningEvent {
232+
task_id,
233+
kind,
234+
step: last_step,
235+
message: None,
236+
},
237+
);
238+
}
239+
240+
if let Ok(mut tasks) = task_interrupts().lock() {
241+
tasks.remove(&task_id);
242+
}
243+
});
244+
245+
Ok(task_id)
246+
}
247+
248+
/// Interrupt a task started with [`long_running_start_tsfn()`].
249+
///
250+
/// Returns `true` if interruption was requested successfully.
251+
#[cfg(feature = "napi")]
252+
#[napi_derive::napi(js_name = "longRunningCancelTsfn")]
253+
pub fn long_running_cancel_tsfn(task_id: u32) -> napi::Result<bool> {
254+
let tasks = task_interrupts().lock().map_err(|_| {
255+
napi::Error::new(
256+
napi::Status::GenericFailure,
257+
"task registry is poisoned".to_string(),
258+
)
259+
})?;
260+
261+
if let Some(flag) = tasks.get(&task_id) {
262+
flag.store(true, Ordering::Relaxed);
263+
return Ok(true);
264+
}
265+
266+
Ok(false)
267+
}
268+
108269
/// Like [`long_running_non_blocking_scoped_thread()`], but uses a regular thread and an owned
109270
/// cancellation flag so the task can outlive the current stack frame.
110271
pub fn long_running_non_blocking_thread(

packages/but-sdk/src/generated/index.d.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,45 @@ export declare function listBranchesNapi(projectId: string, filter: BranchListin
8181

8282
export declare function listProjectsNapi(openedProjects: Array<ProjectHandleOrLegacyProjectId>): Promise<Array<ProjectForFrontend>>
8383

84+
/**
85+
* Interrupt a task started with [`long_running_start_tsfn()`].
86+
*
87+
* Returns `true` if interruption was requested successfully.
88+
*/
89+
export declare function longRunningCancelTsfn(taskId: number): boolean
90+
91+
/** Event payload for Node callbacks. */
92+
export interface LongRunningEvent {
93+
/** The id of the task that emitted this event. */
94+
taskId: number
95+
/** The event category. */
96+
kind: LongRunningEventKind
97+
/** The latest completed step if available. */
98+
step?: number
99+
/** An optional error message for failed tasks. */
100+
message?: string
101+
}
102+
103+
/** Kinds of events emitted to JavaScript while a long-running task executes. */
104+
export declare const enum LongRunningEventKind {
105+
/** The task produced an intermediate progress update. */
106+
Progress = 'Progress',
107+
/** The task finished successfully. */
108+
Done = 'Done',
109+
/** The task stopped because interruption was requested. */
110+
Cancelled = 'Cancelled',
111+
/** The task failed with an error. */
112+
Error = 'Error'
113+
}
114+
115+
/**
116+
* Start a long-running task and stream progress to `callback` via a ThreadsafeFunction.
117+
*
118+
* Returns the task id, which can be used to interrupt processing with
119+
* [`long_running_cancel_tsfn()`].
120+
*/
121+
export declare function longRunningStartTsfn(durationMs: number, callback: ((err: Error | null, arg: LongRunningEvent) => any)): number
122+
84123
/**
85124
* Provide a unified diff for `change`, but fail if `change` is a [type-change](but_core::ModeFlags::TypeChange)
86125
* or if it involves a change to a [submodule](gix::object::Kind::Commit).

packages/but-sdk/src/generated/index.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ if (!nativeBinding) {
579579
throw new Error(`Failed to load native binding`)
580580
}
581581

582-
const { applyNapi, assignHunkNapi, branchDetailsNapi, branchDiffNapi, changesInWorktreeNapi, commitAmendNapi, commitCreateNapi, commitDetailsWithLineStatsNapi, commitInsertBlankNapi, commitMoveChangesBetweenNapi, commitMoveNapi, commitMoveToBranchNapi, commitRewordNapi, commitUncommitChangesNapi, headInfoNapi, listBranchesNapi, listProjectsNapi, treeChangeDiffsNapi, unapplyStackNapi } = nativeBinding
582+
const { applyNapi, assignHunkNapi, branchDetailsNapi, branchDiffNapi, changesInWorktreeNapi, commitAmendNapi, commitCreateNapi, commitDetailsWithLineStatsNapi, commitInsertBlankNapi, commitMoveChangesBetweenNapi, commitMoveNapi, commitMoveToBranchNapi, commitRewordNapi, commitUncommitChangesNapi, headInfoNapi, listBranchesNapi, listProjectsNapi, longRunningCancelTsfn, LongRunningEventKind, longRunningStartTsfn, treeChangeDiffsNapi, unapplyStackNapi } = nativeBinding
583583
export { applyNapi }
584584
export { assignHunkNapi }
585585
export { branchDetailsNapi }
@@ -597,5 +597,8 @@ export { commitUncommitChangesNapi }
597597
export { headInfoNapi }
598598
export { listBranchesNapi }
599599
export { listProjectsNapi }
600+
export { longRunningCancelTsfn }
601+
export { LongRunningEventKind }
602+
export { longRunningStartTsfn }
600603
export { treeChangeDiffsNapi }
601604
export { unapplyStackNapi }

packages/but-sdk/src/test.ts

Lines changed: 96 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,102 @@
11
/* eslint-disable no-console */
2-
import { listProjectsNapi, stackDetailsNapi, stacksNapi } from "./generated/index.js";
2+
import {
3+
longRunningCancelTsfn,
4+
LongRunningEventKind,
5+
longRunningStartTsfn,
6+
} from "./generated/index.js";
37

8+
function isNumber(something: unknown): something is number {
9+
return typeof something === "number";
10+
}
11+
12+
/**
13+
* So what's the deal here?
14+
*
15+
* We're testing running a long-running process being triggered by JS but not blocking it.
16+
*
17+
* The process emits back events that we can read and react to.
18+
*
19+
* It can be stopped and handled gracefully.
20+
*/
21+
async function runLongRunning({
22+
durationMs,
23+
signal,
24+
onProgress,
25+
}: {
26+
durationMs: number;
27+
signal?: AbortSignal;
28+
onProgress?: (step: number) => void;
29+
}) {
30+
return await new Promise<{ lastStep: number }>((resolve, reject) => {
31+
let taskId = 0;
32+
let lastStep = 0;
33+
34+
taskId = longRunningStartTsfn(durationMs, (err, event) => {
35+
if (err) {
36+
reject(err);
37+
return;
38+
}
39+
40+
if (isNumber(event.step)) {
41+
lastStep = event.step;
42+
}
43+
44+
if (event.kind === LongRunningEventKind.Progress && isNumber(event.step)) {
45+
onProgress?.(event.step);
46+
return;
47+
}
48+
49+
if (event.kind === LongRunningEventKind.Done) {
50+
resolve({ lastStep });
51+
return;
52+
}
53+
54+
if (event.kind === LongRunningEventKind.Cancelled) {
55+
reject(new Error("yep. interrupted"));
56+
return;
57+
}
58+
59+
if (event.kind === LongRunningEventKind.Error) {
60+
reject(new Error(event.message ?? "unknown error"));
61+
}
62+
});
63+
64+
console.log("start long running process, but don't block");
65+
66+
if (signal) {
67+
console.log("there is a signal");
68+
if (signal.aborted) {
69+
console.log("signal is aborted");
70+
longRunningCancelTsfn(taskId);
71+
} else {
72+
signal.addEventListener(
73+
"abort",
74+
() => {
75+
console.log("signal abort event triggered");
76+
longRunningCancelTsfn(taskId);
77+
},
78+
{ once: true },
79+
);
80+
}
81+
}
82+
});
83+
}
484
async function main() {
5-
const projects = await listProjectsNapi([]);
6-
console.log(projects);
7-
8-
if (projects.length === 0) {
9-
console.log("No projects found");
10-
}
11-
12-
const project = projects.at(0);
13-
if (!project) throw new Error("The world is wrong");
14-
15-
const stacks = await stacksNapi(project.id, null);
16-
for (const stack of stacks) {
17-
const details = await stackDetailsNapi(project.id, stack.id);
18-
console.log("This are the details for stack with id: " + stack.id);
19-
console.log(details);
20-
}
85+
const abortController = new AbortController();
86+
setTimeout(() => {
87+
// Abort after a second
88+
console.log("after waiting a second, interrupt.");
89+
abortController.abort();
90+
}, 1000);
91+
92+
const result = await runLongRunning({
93+
durationMs: 5000,
94+
signal: abortController.signal,
95+
onProgress: (step: number) => console.log(`step ${step}`),
96+
}).catch((e) => `probably interrupt error: ${e}`);
97+
98+
console.log("\nresult");
99+
console.log(result);
21100
}
22101

23102
main();

packages/but-sdk/tsconfig.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"declaration": true,
1616
"declarationMap": true,
1717
"verbatimModuleSyntax": false,
18-
"isolatedModules": true,
18+
"isolatedModules": false,
1919
"outDir": "dist",
2020
"rootDir": "src"
2121
},

0 commit comments

Comments
 (0)