Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 35 additions & 29 deletions apps/desktop/src-tauri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,11 +554,12 @@ async fn load_model(app: tauri::AppHandle, state: tauri::State<'_, SynapseState>

#[cfg(feature = "cognition")]
#[tauri::command]
async fn infer(state: tauri::State<'_, SynapseState>, input: String) -> Result<String, String> {
// 1. Think (Cognitive Layer) - Note: We store user input AFTER thinking if using Interaction entity,
// or we could store partial. But Interaction requires both.
// For this architecture, we process then store the pair.

async fn infer(
state: tauri::State<'_, SynapseState>,
input: String,
on_token: tauri::ipc::Channel<String>,
) -> Result<String, String> {
// 1. Enrich input with HiRAG
let enriched_input = {
let hirag_guard = state.hirag.lock().await;
if let Some(hirag) = &*hirag_guard {
Expand All @@ -574,55 +575,60 @@ async fn infer(state: tauri::State<'_, SynapseState>, input: String) -> Result<S
}
};

let thought = {
// 2. Perform streaming inference
let mut full_response = String::new();

let llm_adapter_opt = {
let cog_guard = state.cognition.lock().await;
if let Some(adapter) = &*cog_guard {
adapter.think(&enriched_input, "system").await.map_err(|e| e.to_string())?
if let Some(cog) = &*cog_guard {
// We need access to the underlying LLM adapter for streaming
// CandleCognitiveAdapter currently only exposes 'think'
// For streaming, we'll try to get it directly from metabolism or just use the trait.
// For now, let's assume we can get it from the state.
None // Placeholder: we'll use metabolism's LLM
} else {
// Fallback to mock/logic engine if no model is loaded
let adapter = CandleCognitiveAdapter::new(None, None).map_err(|e| e.to_string())?;
adapter.think(&enriched_input, "system").await.map_err(|e| e.to_string())?
None
}
};
Comment on lines +581 to 592
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

infer currently bypasses the runtime-loaded cognition model

Line 588 hardcodes None, and Line 596 then always sources the LLM from metabolism. That makes the model loaded by load_model effectively unused for inference streaming.

Also applies to: 594-597

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/desktop/src-tauri/src/lib.rs` around lines 581 - 592, The code currently
hardcodes llm_adapter_opt to None, causing infer to always use metabolism and
ignore any runtime-loaded model from load_model; change the block that reads
state.cognition to return the actual underlying LLM adapter when a cognition is
loaded (e.g., access the CandleCognitiveAdapter or its inner field/method on the
cognition object, such as cog.get_adapter() or cog.adapter.clone(), and set
llm_adapter_opt = Some(adapter)) and only fall back to None when no cognition is
present so infer will prefer the runtime-loaded adapter over metabolism for
streaming.


// 2. Award Karma (Proof of Sentience: Chat Interaction)
let meta_guard = state.metabolism.lock().await;
if let Some(meta) = &*meta_guard {
let llm = meta.llm_port();
let mut stream = llm.generate_stream(&enriched_input, 512).await.map_err(|e| e.to_string())?;

while let Some(token_res) = stream.next().await {
let token = token_res.map_err(|e| e.to_string())?;
full_response.push_str(&token);
let _ = on_token.send(token);
}
Comment on lines +599 to +603
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Stop streaming when the UI channel is closed

Line 602 discards channel send failures. If the frontend disconnects, generation keeps running pointlessly. Break the loop on send error to save compute and battery.

Suggested fix
-            let _ = on_token.send(token);
+            if on_token.send(token).is_err() {
+                break;
+            }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/desktop/src-tauri/src/lib.rs` around lines 599 - 603, The loop currently
ignores the result of on_token.send(token) so streaming continues even if the UI
channel is closed; replace the discard with checking the send result and break
the while loop on Err to stop generation when the frontend disconnects (locate
the stream.next() loop and the on_token.send(token) call and change the let _ =
on_token.send(token) into a match/if let that breaks the loop on send error,
ensuring you still push to full_response only when appropriate).

} else {
return Err("Metabolism not initialized".to_string());
}

// 3. Award Karma (Proof of Sentience: Chat Interaction)
{
let tok_guard = state.tokenomics.lock().await;
if let Some(service) = &*tok_guard {
// Async reward, don't block thought return significantly
let _ = service.award_karma("0xSynapseUserPrototype", synapse_core::tokenomics::structs::ActionType::ChatInteraction).await;
}
}

// 3. Store in Metabolism (Short Term Buffer)
let meta_guard = state.metabolism.lock().await;
// 4. Store in Metabolism (Short Term Buffer)
if let Some(meta) = &*meta_guard {
let interaction = Interaction {
id: Uuid::new_v4().to_string(),
user_input: input.clone(),
ai_response: thought.content.clone(),
ai_response: full_response.clone(),
timestamp: Utc::now().timestamp(),
session_id: "default".to_string(),
processed: false,
};
if let Err(e) = meta.push_interaction(interaction).await {
println!("Failed to push to metabolism: {}", e);
}
} else {
// Fallback: Legacy Direct Storage
let memory_guard = state.memory.lock().await;
if let Some(mem) = &*memory_guard {
let mut node = MemoryNode::new(input.clone());
node.source = "user".to_string();
mem.store(node).await.ok();

let mut node_ai = MemoryNode::new(thought.content.clone());
node_ai.source = "ai".to_string();
mem.store(node_ai).await.ok();
}
}
Comment on lines +578 to 629

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This block has a couple of issues:

  1. The llm_adapter_opt variable is initialized but never used. This dead code should be removed to improve clarity.
  2. More importantly, the meta_guard lock is held across .await calls inside the while loop for token streaming. This is a potential deadlock hazard and should be avoided. The lock should be released as soon as the required data (llm_port) is retrieved, and re-acquired later if needed.

Here's a suggested refactoring that addresses both points by removing the unused code and scoping the locks correctly:

    // 2. Perform streaming inference
    let mut full_response = String::new();

    let llm = {
        let meta_guard = state.metabolism.lock().await;
        if let Some(meta) = &*meta_guard {
            meta.llm_port()
        } else {
            return Err("Metabolism not initialized".to_string());
        }
    };

    let mut stream = llm.generate_stream(&enriched_input, 512).await.map_err(|e| e.to_string())?;

    while let Some(token_res) = stream.next().await {
        let token = token_res.map_err(|e| e.to_string())?;
        full_response.push_str(&token);
        let _ = on_token.send(token);
    }

    // 3. Award Karma (Proof of Sentience: Chat Interaction)
    {
        let tok_guard = state.tokenomics.lock().await;
        if let Some(service) = &*tok_guard {
            let _ = service.award_karma("0xSynapseUserPrototype", synapse_core::tokenomics::structs::ActionType::ChatInteraction).await;
        }
    }

    // 4. Store in Metabolism (Short Term Buffer)
    {
        let meta_guard = state.metabolism.lock().await;
        if let Some(meta) = &*meta_guard {
            let interaction = Interaction {
                id: Uuid::new_v4().to_string(),
                user_input: input.clone(),
                ai_response: full_response.clone(),
                timestamp: Utc::now().timestamp(),
                session_id: "default".to_string(),
                processed: false,
            };
            if let Err(e) = meta.push_interaction(interaction).await {
                println!("Failed to push to metabolism: {}", e);
            }
        }
    }


Ok(thought.content)
Ok(full_response)
}

#[tauri::command]
Expand Down
2 changes: 1 addition & 1 deletion crates/synapse-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ lazy_static = "1.5.0"

[dev-dependencies]
tokio = { workspace = true }
synapse-infra = { path = "../synapse-infra" }
synapse-infra = { path = "../synapse-infra", default-features = false }
9 changes: 9 additions & 0 deletions crates/synapse-core/src/logic/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,15 @@ mod tests {
async fn generate_with_params(&self, _prompt: &str, _max_tokens: usize, _temp: f32, _top_p: f32) -> Result<String> {
Ok("Mock summary".to_string())
}

async fn generate_stream(
&self,
_prompt: &str,
_max_tokens: usize,
) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<String>> + Send>>> {
let stream = futures::stream::once(async { Ok("Mock summary".to_string()) });
Ok(Box::pin(stream))
}
}

// === Mock Embedder ===
Expand Down
19 changes: 19 additions & 0 deletions crates/synapse-core/src/logic/hirag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ mod tests {
ports::{SearchResult, LlmPort}, MemoryNode,
};
use async_trait::async_trait;
use futures::Stream;
use tokio::sync::Mutex;

// === Mock Memory ===
Expand Down Expand Up @@ -311,6 +312,15 @@ mod tests {
async fn generate_with_params(&self, _prompt: &str, _max_tokens: usize, _temp: f32, _top_p: f32) -> Result<String> {
unimplemented!()
}

async fn generate_stream(
&self,
_prompt: &str,
_max_tokens: usize,
) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<String>> + Send>>> {
let stream = futures::stream::once(async { Ok("Mock summary".to_string()) });
Ok(Box::pin(stream))
}
}

// === Mock Embedder ===
Expand Down Expand Up @@ -381,6 +391,15 @@ mod tests {
async fn generate_with_params(&self, _prompt: &str, _max_tokens: usize, _temp: f32, _top_p: f32) -> Result<String> {
unimplemented!()
}

async fn generate_stream(
&self,
_prompt: &str,
_max_tokens: usize,
) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<String>> + Send>>> {
let stream = futures::stream::once(async { Ok("Mock summary".to_string()) });
Ok(Box::pin(stream))
}
}

#[tokio::test]
Expand Down
15 changes: 15 additions & 0 deletions crates/synapse-core/src/logic/metabolism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ impl Metabolism {
self
}

/// Access the underlying LLM port.
pub fn llm_port(&self) -> Arc<dyn LlmPort> {
self.llm.clone()
}

fn extract_ngrams(text: &str, min_n: usize, max_n: usize) -> Vec<String> {
let tokens: Vec<String> = text
.split_whitespace()
Expand Down Expand Up @@ -180,6 +185,7 @@ mod tests {
use crate::Interaction;
use async_trait::async_trait;
use crate::ports::{SearchResult, SanitizerPort};
use futures::Stream;

// === Mock Sanitizer ===
struct MockSanitizer;
Expand Down Expand Up @@ -322,6 +328,15 @@ mod tests {
Ok("Mock summary".to_string())
}

async fn generate_stream(
&self,
_prompt: &str,
_max_tokens: usize,
) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<String>> + Send>>> {
let stream = futures::stream::once(async { Ok("Mock summary".to_string()) });
Ok(Box::pin(stream))
}

async fn summarize(&self, text: &str) -> Result<String> {
Ok(text.to_string())
}
Expand Down
9 changes: 9 additions & 0 deletions crates/synapse-core/src/ports/llm_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use async_trait::async_trait;
use crate::error::Result;
use futures::Stream;
use std::pin::Pin;


/// Port for LLM text generation.
Expand All @@ -23,6 +25,13 @@ pub trait LlmPort: Send + Sync {
top_p: f32,
) -> Result<String>;

/// Generate text completion as a stream of tokens.
async fn generate_stream(
&self,
prompt: &str,
max_tokens: usize,
) -> Result<Pin<Box<dyn Stream<Item = Result<String>> + Send>>>;

/// Summarize text (for HiRAG layer creation).
async fn summarize(&self, text: &str) -> Result<String> {

Expand Down
9 changes: 9 additions & 0 deletions crates/synapse-core/tests/hirag_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ impl LlmPort for ConfigurableMockLlm {
async fn generate_with_params(&self, _prompt: &str, _max_tokens: usize, _temp: f32, _top_p: f32) -> Result<String> {
unimplemented!()
}

async fn generate_stream(
&self,
_prompt: &str,
_max_tokens: usize,
) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<String>> + Send>>> {
let stream = futures::stream::once(async { Ok("Mock summary".to_string()) });
Ok(Box::pin(stream))
}
}

#[tokio::test]
Expand Down
9 changes: 9 additions & 0 deletions crates/synapse-core/tests/translator_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ impl LlmPort for MockLlm {
) -> Result<String> {
Ok("empathetic message".to_string())
}

async fn generate_stream(
&self,
_prompt: &str,
_max_tokens: usize,
) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<String>> + Send>>> {
let stream = futures::stream::once(async { Ok("empathetic message".to_string()) });
Ok(Box::pin(stream))
}
}

struct MockEmbedding {
Expand Down
5 changes: 3 additions & 2 deletions crates/synapse-infra/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ arrow-schema = "57"
futures = "0.3"

# AI Inference
ort = { workspace = true }
ort = { workspace = true, optional = true }
tokenizers = { version = "0.19", default-features = false, features = ["onig"] }
ndarray = "0.17"
ndarray = { version = "0.17", optional = true }
candle-core = { workspace = true }
candle-nn = { workspace = true }
candle-transformers = { workspace = true }
Expand Down Expand Up @@ -104,3 +104,4 @@ tempfile = "3.10"
default = ["vision"]
vision = ["nokhwa"]
audio = ["cpal", "rodio"]
ort-inference = ["ort", "ndarray"]
12 changes: 12 additions & 0 deletions crates/synapse-infra/src/adapters/bedrock_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use synapse_core::ports::LlmPort;
use synapse_core::error::{Result, Error as CoreError};
use std::pin::Pin;
use aws_sdk_bedrockruntime::Client;
use aws_config::BehaviorVersion;
use serde_json::json;
Expand Down Expand Up @@ -84,4 +85,15 @@ impl LlmPort for BedrockAdapter {
) -> Result<String> {
self.invoke(prompt, max_tokens, temperature, top_p).await
}

async fn generate_stream(
&self,
prompt: &str,
max_tokens: usize,
) -> Result<Pin<Box<dyn futures::Stream<Item = Result<String>> + Send>>> {
// Fallback for now: return the whole response as a single item in the stream
let response = self.generate(prompt, max_tokens).await?;
let stream = futures::stream::once(async move { Ok(response) });
Ok(Box::pin(stream))
}
}
Loading