forked from modelcontextprotocol/swift-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathHTTPClientTransport.swift
More file actions
312 lines (312 loc) · 11.9 KB
/
HTTPClientTransport.swift
File metadata and controls
312 lines (312 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
//import Foundation
//import Logging
//
//#if canImport(FoundationNetworking)
// import FoundationNetworking
//#endif
//
//public actor HTTPClientTransport: Actor, Transport {
// public let endpoint: URL
// private let session: URLSession
// public private(set) var sessionID: String?
// private let streaming: Bool
// private var streamingTask: Task<Void, Never>?
// private var lastEventID: String?
// public nonisolated let logger: Logger
//
// private var isConnected = false
// private let messageStream: AsyncThrowingStream<Data, Swift.Error>
// private let messageContinuation: AsyncThrowingStream<Data, Swift.Error>.Continuation
//
// public init(
// endpoint: URL,
// configuration: URLSessionConfiguration = .default,
// streaming: Bool = true,
// logger: Logger? = nil
// ) {
// self.init(
// endpoint: endpoint,
// session: URLSession(configuration: configuration),
// streaming: streaming,
// logger: logger
// )
// }
//
// internal init(
// endpoint: URL,
// session: URLSession,
// streaming: Bool = false,
// logger: Logger? = nil
// ) {
// self.endpoint = endpoint
// self.session = session
// self.streaming = streaming
//
// // Create message stream
// var continuation: AsyncThrowingStream<Data, Swift.Error>.Continuation!
// self.messageStream = AsyncThrowingStream { continuation = $0 }
// self.messageContinuation = continuation
//
// self.logger =
// logger
// ?? Logger(
// label: "mcp.transport.http.client",
// factory: { _ in SwiftLogNoOpLogHandler() }
// )
// }
//
// /// Establishes connection with the transport
// public func connect() async throws {
// guard !isConnected else { return }
// isConnected = true
//
// if streaming {
// // Start listening to server events
// streamingTask = Task { await startListeningForServerEvents() }
// }
//
// logger.info("HTTP transport connected")
// }
//
// /// Disconnects from the transport
// public func disconnect() async {
// guard isConnected else { return }
// isConnected = false
//
// // Cancel streaming task if active
// streamingTask?.cancel()
// streamingTask = nil
//
// // Cancel any in-progress requests
// session.invalidateAndCancel()
//
// // Clean up message stream
// messageContinuation.finish()
//
// logger.info("HTTP clienttransport disconnected")
// }
//
// /// Sends data through an HTTP POST request
// public func send(_ data: Data) async throws {
// guard isConnected else {
// throw MCPError.internalError("Transport not connected")
// }
//
// var request = URLRequest(url: endpoint)
// request.httpMethod = "POST"
// request.addValue("application/json, text/event-stream", forHTTPHeaderField: "Accept")
// request.addValue("application/json", forHTTPHeaderField: "Content-Type")
// request.httpBody = data
//
// // Add session ID if available
// if let sessionID = sessionID {
// request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id")
// }
//
// let (responseData, response) = try await session.data(for: request)
//
// guard let httpResponse = response as? HTTPURLResponse else {
// throw MCPError.internalError("Invalid HTTP response")
// }
//
// // Process the response based on content type and status code
// let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type") ?? ""
//
// // Extract session ID if present
// if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") {
// self.sessionID = newSessionID
// logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"])
// }
//
// // Handle different response types
// switch httpResponse.statusCode {
// case 200, 201, 202:
// // For SSE, the processing happens in the streaming task
// if contentType.contains("text/event-stream") {
// logger.debug("Received SSE response, processing in streaming task")
// // The streaming is handled by the SSE task if active
// return
// }
//
// // For JSON responses, deliver the data directly
// if contentType.contains("application/json") && !responseData.isEmpty {
// logger.debug("Received JSON response", metadata: ["size": "\(responseData.count)"])
// messageContinuation.yield(responseData)
// }
// case 404:
// // If we get a 404 with a session ID, it means our session is invalid
// if sessionID != nil {
// logger.warning("Session has expired")
// sessionID = nil
// throw MCPError.internalError("Session expired")
// }
// throw MCPError.internalError("Endpoint not found")
// default:
// throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)")
// }
// }
//
// /// Receives data in an async sequence
// public func receive() -> AsyncThrowingStream<Data, Swift.Error> {
// return messageStream
// }
//
// // MARK: - SSE
//
// /// Starts listening for server events using SSE
// private func startListeningForServerEvents() async {
// guard isConnected else { return }
//
// // Retry loop for connection drops
// while isConnected && !Task.isCancelled {
// do {
// try await connectToEventStream()
// } catch {
// if !Task.isCancelled {
// logger.error("SSE connection error: \(error)")
// // Wait before retrying
// try? await Task.sleep(nanoseconds: 1_000_000_000) // 1 second
// }
// }
// }
// }
//
// #if canImport(FoundationNetworking)
// private func connectToEventStream() async throws {
// logger.warning("SSE is not supported on this platform")
// }
// #else
// /// Establishes an SSE connection to the server
// private func connectToEventStream() async throws {
// guard isConnected else { return }
//
// var request = URLRequest(url: endpoint)
// request.httpMethod = "GET"
// request.addValue("text/event-stream", forHTTPHeaderField: "Accept")
//
// // Add session ID if available
// if let sessionID = sessionID {
// request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id")
// }
//
// // Add Last-Event-ID header for resumability if available
// if let lastEventID = lastEventID {
// request.addValue(lastEventID, forHTTPHeaderField: "Last-Event-ID")
// }
//
// logger.debug("Starting SSE connection")
//
// // Create URLSession task for SSE
// let (stream, response) = try await session.bytes(for: request)
//
// guard let httpResponse = response as? HTTPURLResponse else {
// throw MCPError.internalError("Invalid HTTP response")
// }
//
// // Check response status
// guard httpResponse.statusCode == 200 else {
// // If the server returns 405 Method Not Allowed,
// // it indicates that the server doesn't support SSE streaming.
// // We should cancel the task instead of retrying the connection.
// if httpResponse.statusCode == 405 {
// self.streamingTask?.cancel()
// }
// throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)")
// }
//
// // Extract session ID if present
// if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") {
// self.sessionID = newSessionID
// }
//
// // Process the SSE stream
// var buffer = ""
// var eventType = ""
// var eventID: String?
// var eventData = ""
//
// for try await byte in stream {
// if Task.isCancelled { break }
//
// guard let char = String(bytes: [byte], encoding: .utf8) else { continue }
// buffer.append(char)
//
// // Process complete lines
// while let newlineIndex = buffer.utf8.firstIndex(where: { $0 == 10 }) {
// var line = buffer[..<newlineIndex]
// if line.hasSuffix("\r") {
// line = line.dropLast()
// }
//
// buffer = String(buffer[buffer.index(after: newlineIndex)...])
//
// // Empty line marks the end of an event
// if line.isEmpty {
// if !eventData.isEmpty {
// // Process the event
// if eventType == "id" {
// lastEventID = eventID
// } else {
// // Default event type is "message" if not specified
// if let data = eventData.data(using: .utf8) {
// logger.debug(
// "SSE event received",
// metadata: [
// "type": "\(eventType.isEmpty ? "message" : eventType)",
// "id": "\(eventID ?? "none")",
// ])
// messageContinuation.yield(data)
// }
// }
//
// // Reset for next event
// eventType = ""
// eventData = ""
// }
// continue
// }
//
// if line.hasSuffix("\r") {
// line = line.dropLast()
// }
//
// // Lines starting with ":" are comments
// if line.hasPrefix(":") { continue }
//
// // Parse field: value format
// if let colonIndex = line.firstIndex(of: ":") {
// let field = String(line[..<colonIndex])
// var value = String(line[line.index(after: colonIndex)...])
//
// // Trim leading space
// if value.hasPrefix(" ") {
// value = String(value.dropFirst())
// }
//
// // Process based on field
// switch field {
// case "event":
// eventType = value
// case "data":
// if !eventData.isEmpty {
// eventData.append("\n")
// }
// eventData.append(value)
// case "id":
// if !value.contains("\0") { // ID must not contain NULL
// eventID = value
// lastEventID = value
// }
// case "retry":
// // Retry timing not implemented
// break
// default:
// // Unknown fields are ignored per SSE spec
// break
// }
// }
// }
// }
// }
// #endif
//}