Skip to content

Commit b82b891

Browse files
Convert the endpoints to a smart router like litellm does in python arakoodev#286
1 parent 4369eb7 commit b82b891

File tree

7 files changed

+379
-1
lines changed

7 files changed

+379
-1
lines changed

JS/edgechains/lib/package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@
3232
"@babel/preset-env": "^7.24.4",
3333
"@hono/node-server": "^1.2.0",
3434
"@types/dotenv": "^8.2.0",
35+
"@sentry/node": "^7.107.0",
3536
"axios": "^1.6.2",
3637
"axios-retry": "^4.1.0",
3738
"dotenv": "^16.4.5",
39+
"posthog-js": "^2.22.0",
40+
"@types/axios": "^0.14.0",
3841
"esbuild": "^0.20.2",
3942
"eventsource-parser": "^1.1.2",
4043
"inquirer": "^9.2.12",
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import axios, { AxiosInstance, AxiosRequestConfig } from 'axios';
2+
import { Jsonnet } from '@arakoodev/jsonnet';
3+
import { Logger } from '../utils/Logger';
4+
5+
export abstract class BaseEndpoint {
6+
protected client: AxiosInstance;
7+
protected jsonnet: Jsonnet;
8+
protected logger: Logger;
9+
protected config: Record<string, any>;
10+
11+
constructor(config: Record<string, any>) {
12+
this.config = config;
13+
this.client = axios.create({
14+
timeout: config.timeout || 30000,
15+
headers: {
16+
'Content-Type': 'application/json',
17+
},
18+
});
19+
this.jsonnet = new Jsonnet();
20+
this.logger = Logger.getInstance();
21+
22+
// Setup interceptors
23+
this.setupInterceptors();
24+
}
25+
26+
protected abstract setupInterceptors(): void;
27+
28+
protected async executeRequest(config: AxiosRequestConfig) {
29+
try {
30+
const response = await this.client(config);
31+
return response.data;
32+
} catch (error) {
33+
this.logger.error('Request failed:', error);
34+
throw error;
35+
}
36+
}
37+
38+
protected loadJsonnetConfig(configPath: string): Record<string, any> {
39+
return this.jsonnet.evaluateFile(configPath);
40+
}
41+
}
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
import axios, { AxiosInstance, AxiosRequestConfig, AxiosResponse, InternalAxiosRequestConfig } from 'axios';
2+
3+
declare module 'axios' {
4+
interface AxiosRequestConfig {
5+
metadata?: {
6+
provider?: Provider;
7+
apiKey?: string;
8+
};
9+
}
10+
}
11+
import { Jsonnet } from '@arakoodev/jsonnet';
12+
import { Logger } from '../utils/Logger.js';
13+
import * as Sentry from '@sentry/node';
14+
import posthog from 'posthog-node';
15+
16+
type EndpointConfig = {
17+
apiKey: string;
18+
baseUrl: string;
19+
rateLimit?: number;
20+
timeout?: number;
21+
currentTokens?: number;
22+
lastUsed?: number;
23+
};
24+
25+
type Provider = 'openai' | 'palm' | 'cohere';
26+
27+
class Router {
28+
private endpoints: Map<Provider, EndpointConfig[]>;
29+
private client: AxiosInstance;
30+
private jsonnet: Jsonnet;
31+
private logger: Logger;
32+
private tokenUsage: Map<Provider, number>;
33+
private streamingClients: Map<string, AxiosInstance>;
34+
private activeRequests: Map<string, number>;
35+
constructor() {
36+
this.endpoints = new Map();
37+
this.client = axios.create();
38+
this.jsonnet = new Jsonnet();
39+
this.logger = Logger.getInstance();
40+
this.tokenUsage = new Map();
41+
this.streamingClients = new Map();
42+
this.activeRequests = new Map();
43+
44+
// Initialize observability
45+
Sentry.init({ dsn: process.env.SENTRY_DSN });
46+
posthog.setup(process.env.POSTHOG_KEY, {
47+
api_host: process.env.POSTHOG_HOST
48+
});
49+
50+
this.setupInterceptors();
51+
}
52+
53+
private setupInterceptors() {
54+
// Response interceptor for error handling and retries
55+
this.client.interceptors.response.use(
56+
(response) => {
57+
// Track token usage
58+
const provider = response.config.metadata?.provider;
59+
const apiKey = response.config.metadata?.apiKey;
60+
const tokens = response.data?.usage?.total_tokens;
61+
if (provider && this.isProvider(provider) && tokens && apiKey) {
62+
this.updateTokenUsage(provider as Provider, tokens, apiKey);
63+
}
64+
return response;
65+
},
66+
async (error) => {
67+
const config = error.config;
68+
if (!config || !config.retryCount) {
69+
config.retryCount = 0;
70+
}
71+
72+
// Log error to Sentry
73+
Sentry.captureException(error);
74+
posthog.captureException('api_error', {
75+
provider: config.metadata?.provider,
76+
status: error.response?.status,
77+
url: config.url
78+
});
79+
80+
if (config.retryCount < 3) {
81+
config.retryCount++;
82+
await new Promise((resolve) =>
83+
setTimeout(resolve, 1000 * config.retryCount)
84+
);
85+
return this.client(config);
86+
}
87+
88+
this.logger.error('Request failed after retries:', error);
89+
return Promise.reject(error);
90+
}
91+
);
92+
}
93+
94+
private updateTokenUsage(provider: Provider, tokens: number, apiKey: string) {
95+
const current = this.tokenUsage.get(provider) || 0;
96+
this.tokenUsage.set(provider, current + tokens);
97+
98+
// Update endpoint usage
99+
const endpoints = this.endpoints.get(provider);
100+
if (endpoints) {
101+
const endpoint = endpoints.find(e => e.apiKey === apiKey);
102+
if (endpoint) {
103+
endpoint.currentTokens = (endpoint.currentTokens || 0) + tokens;
104+
endpoint.lastUsed = Date.now();
105+
}
106+
}
107+
}
108+
109+
public addEndpoint(provider: Provider, config: EndpointConfig) {
110+
if (!this.endpoints.has(provider)) {
111+
this.endpoints.set(provider, []);
112+
}
113+
this.endpoints.get(provider)?.push({
114+
...config,
115+
currentTokens: 0,
116+
lastUsed: Date.now()
117+
});
118+
}
119+
120+
public addEndpointsFromJsonnet(configPath: string) {
121+
const config = this.jsonnet.evaluateFile(configPath);
122+
Object.entries(config).forEach(([provider, endpoints]) => {
123+
if (this.isProvider(provider)) {
124+
const typedEndpoints = endpoints as EndpointConfig[];
125+
const typedProvider = provider as Provider;
126+
typedEndpoints.forEach((endpoint) => {
127+
this.addEndpoint(typedProvider, endpoint);
128+
});
129+
}
130+
});
131+
}
132+
133+
private isProvider(value: string): value is Provider {
134+
return ['openai', 'palm', 'cohere'].includes(value);
135+
}
136+
137+
private selectBestEndpoint(provider: Provider): EndpointConfig | null {
138+
const endpoints = this.endpoints.get(provider);
139+
if (!endpoints || endpoints.length === 0) return null;
140+
141+
// Filter endpoints below rate limit and with least active requests
142+
const available = endpoints
143+
.filter(e => {
144+
const rateLimit = e.rateLimit || Infinity;
145+
return (e.currentTokens || 0) < rateLimit;
146+
})
147+
.sort((a, b) => {
148+
// Prefer endpoints with least tokens used
149+
const tokenDiff = (a.currentTokens || 0) - (b.currentTokens || 0);
150+
if (tokenDiff !== 0) return tokenDiff;
151+
152+
// If tokens are equal, prefer endpoint with least active requests
153+
const aRequests = this.activeRequests.get(a.apiKey) || 0;
154+
const bRequests = this.activeRequests.get(b.apiKey) || 0;
155+
return aRequests - bRequests;
156+
});
157+
158+
return available[0] || null;
159+
}
160+
161+
public async executeRequest(
162+
provider: Provider,
163+
config: AxiosRequestConfig
164+
): Promise<AxiosResponse> {
165+
const endpoint = this.selectBestEndpoint(provider);
166+
if (!endpoint) {
167+
throw new Error(`No available endpoints for ${provider}`);
168+
}
169+
170+
// Track active requests
171+
const activeRequests = this.activeRequests.get(endpoint.apiKey) || 0;
172+
this.activeRequests.set(endpoint.apiKey, activeRequests + 1);
173+
174+
const fullConfig: AxiosRequestConfig = {
175+
...config,
176+
metadata: {
177+
provider,
178+
apiKey: endpoint.apiKey
179+
},
180+
onDownloadProgress: config.onDownloadProgress,
181+
responseType: config.responseType || 'json',
182+
};
183+
184+
try {
185+
const response = await this.client(fullConfig);
186+
return response;
187+
} finally {
188+
// Decrement active requests count
189+
const currentRequests = this.activeRequests.get(endpoint.apiKey) || 0;
190+
this.activeRequests.set(endpoint.apiKey, Math.max(0, currentRequests - 1));
191+
}
192+
}
193+
194+
public async executeStreamingRequest(
195+
provider: Provider,
196+
config: AxiosRequestConfig,
197+
onData: (data: string) => void,
198+
onError?: (error: Error) => void,
199+
onComplete?: () => void
200+
): Promise<void> {
201+
const endpoint = this.selectBestEndpoint(provider);
202+
if (!endpoint) {
203+
throw new Error(`No available endpoints for ${provider}`);
204+
}
205+
206+
// Track active requests
207+
const activeRequests = this.activeRequests.get(endpoint.apiKey) || 0;
208+
this.activeRequests.set(endpoint.apiKey, activeRequests + 1);
209+
210+
const fullConfig: AxiosRequestConfig = {
211+
...config,
212+
metadata: {
213+
provider,
214+
apiKey: endpoint.apiKey
215+
},
216+
responseType: 'stream',
217+
onDownloadProgress: (progressEvent) => {
218+
if (progressEvent.event.target.responseText) {
219+
onData(progressEvent.event.target.responseText);
220+
}
221+
}
222+
};
223+
224+
try {
225+
const response = await this.client(fullConfig);
226+
response.data.on('end', () => {
227+
if (onComplete) onComplete();
228+
// Decrement active requests count
229+
const currentRequests = this.activeRequests.get(endpoint.apiKey) || 0;
230+
this.activeRequests.set(endpoint.apiKey, Math.max(0, currentRequests - 1));
231+
});
232+
response.data.on('error', (error: Error) => {
233+
if (onError) onError(error);
234+
// Decrement active requests count
235+
const currentRequests = this.activeRequests.get(endpoint.apiKey) || 0;
236+
this.activeRequests.set(endpoint.apiKey, Math.max(0, currentRequests - 1));
237+
});
238+
} catch (error) {
239+
if (onError) onError(error as Error);
240+
// Decrement active requests count
241+
const currentRequests = this.activeRequests.get(endpoint.apiKey) || 0;
242+
this.activeRequests.set(endpoint.apiKey, Math.max(0, currentRequests - 1));
243+
throw error;
244+
}
245+
}
246+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export * from './BaseEndpoint.ts';
2+
export * from './OpenAiEndpoint.ts';
3+
export * from './PalmEndpoint.ts';
4+
export * from './CohereEndpoint.ts';
5+
export * from './Router.ts';
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import * as Sentry from '@sentry/node';
2+
import posthog from 'posthog-js';
3+
4+
export class Logger {
5+
private static instance: Logger;
6+
private sentryInitialized = false;
7+
private posthogInitialized = false;
8+
9+
private constructor() {
10+
this.initializeSentry();
11+
this.initializePosthog();
12+
}
13+
14+
public static getInstance(): Logger {
15+
if (!Logger.instance) {
16+
Logger.instance = new Logger();
17+
}
18+
return Logger.instance;
19+
}
20+
21+
private initializeSentry() {
22+
if (process.env.SENTRY_DSN) {
23+
Sentry.init({
24+
dsn: process.env.SENTRY_DSN,
25+
tracesSampleRate: 1.0,
26+
});
27+
this.sentryInitialized = true;
28+
}
29+
}
30+
31+
private initializePosthog() {
32+
if (process.env.POSTHOG_KEY) {
33+
posthog.init(process.env.POSTHOG_KEY, {
34+
api_host: process.env.POSTHOG_HOST || 'https://app.posthog.com'
35+
});
36+
this.posthogInitialized = true;
37+
}
38+
}
39+
40+
public log(message: string, context?: Record<string, any>) {
41+
console.log(message, context);
42+
if (this.sentryInitialized) {
43+
Sentry.captureMessage(message, {
44+
level: 'info',
45+
extra: context
46+
});
47+
}
48+
if (this.posthogInitialized) {
49+
posthog.capture('log', {
50+
message,
51+
...context
52+
});
53+
}
54+
}
55+
56+
public error(message: string, error?: Error, context?: Record<string, any>) {
57+
console.error(message, error, context);
58+
if (this.sentryInitialized) {
59+
Sentry.captureException(error || new Error(message), {
60+
extra: context
61+
});
62+
}
63+
if (this.posthogInitialized) {
64+
posthog.capture('error', {
65+
message,
66+
error: error?.message,
67+
...context
68+
});
69+
}
70+
}
71+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
declare module '@arakoodev/jsonnet' {
2+
export class Jsonnet {
3+
constructor();
4+
evaluateFile(path: string): Record<string, any>;
5+
evaluateSnippet(code: string): Record<string, any>;
6+
}
7+
}

0 commit comments

Comments
 (0)