Skip to content

Commit db91e90

Browse files
committed
Introduce basic service streaming
1 parent 3d91e99 commit db91e90

File tree

3 files changed

+133
-13
lines changed

3 files changed

+133
-13
lines changed

engine/src/main.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::env;
22
use std::sync::Arc;
33

44
use async_std::prelude::FutureExt;
5-
use async_std::task::JoinHandle;
65
use opentelemetry::KeyValue;
76
use opentelemetry::{global, trace::TracerProvider};
87
use opentelemetry_otlp::WithExportConfig;

engine/src/models/deployment/mod.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,20 @@ impl Deployment {
260260

261261
Ok(())
262262
}
263+
264+
pub async fn get_last_by_site_id(db: &Database, site_id: &str) -> Result<Self, sqlx::Error> {
265+
let span = info_span!("Deployment::get_last_by_site_id");
266+
span.set_parent(Context::current());
267+
let _guard = span.enter();
268+
269+
query_as!(
270+
Deployment,
271+
"SELECT * FROM deployments WHERE site_id = $1 ORDER BY created_at DESC LIMIT 1",
272+
site_id
273+
)
274+
.fetch_one(&db.pool)
275+
.await
276+
}
263277
}
264278

265279
#[derive(Debug, Serialize, Deserialize, Object)]
@@ -297,6 +311,33 @@ impl DeploymentFile {
297311
.fetch_all(&db.pool)
298312
.await
299313
}
314+
315+
pub async fn get_file_by_path(db: &Database, deployment_id: &str, path: &str) -> Result<DeploymentFileEntry, sqlx::Error> {
316+
let span = info_span!("DeploymentFile::get_file_by_path");
317+
span.set_parent(Context::current());
318+
let _guard = span.enter();
319+
320+
query_as!(
321+
DeploymentFileEntry,
322+
r#"
323+
SELECT
324+
df.deployment_id as "deployment_file_deployment_id!",
325+
df.file_id as "deployment_file_file_id!",
326+
df.file_path as "deployment_file_file_path!",
327+
df.mime_type as "deployment_file_mime_type!",
328+
f.file_hash as "file_hash!",
329+
f.file_size,
330+
f.file_deleted
331+
FROM deployment_files df
332+
JOIN files f ON df.file_id = f.file_id
333+
WHERE df.deployment_id = $1 AND df.file_path = $2
334+
"#,
335+
deployment_id,
336+
path
337+
)
338+
.fetch_one(&db.pool)
339+
.await
340+
}
300341
}
301342

302343
// Add this new struct to represent the joined result

engine/src/server/mod.rs

Lines changed: 92 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,24 @@ use std::sync::Arc;
33
use opentelemetry::global;
44
use poem::middleware::OpenTelemetryMetrics;
55
use poem::web::Data;
6-
use poem::Response;
7-
use poem::{
8-
endpoint::StaticFilesEndpoint, get, handler, listener::TcpListener, middleware::Cors,
9-
web::Html, EndpointExt, Route, Server,
10-
};
11-
use poem_openapi::{OpenApi, OpenApiService, Tags};
12-
use serde_json::{self, Value};
6+
use poem::{Body, EndpointExt, IntoResponse, Request, Response};
7+
use poem::{get, handler, listener::TcpListener, middleware::Cors, Route, Server};
8+
use reqwest::StatusCode;
139
use tracing::info;
10+
use futures::StreamExt;
1411

1512
use crate::middlewares::tracing::TraceId;
13+
use crate::models::deployment::{Deployment, DeploymentFile};
14+
use crate::models::domain::Domain;
15+
use crate::models::site::Site;
16+
use crate::routes::error::HttpError;
1617
use crate::state::State;
1718

1819
pub async fn serve(state: State) {
1920
info!("Serving Router");
2021

2122
let app = Route::new()
22-
// .nest("/api", api_service)
23-
// .nest("/", file_endpoint)
24-
.at("/", get(not_found))
23+
.at("*", get(resolve_http))
2524
.with(Cors::new())
2625
.with(TraceId::new(Arc::new(global::tracer("edgeserver"))))
2726
.with(OpenTelemetryMetrics::new())
@@ -33,7 +32,88 @@ pub async fn serve(state: State) {
3332
}
3433

3534
#[handler]
36-
async fn not_found() -> Html<&'static str> {
35+
async fn resolve_http(request: &Request, state: Data<&State>) -> impl IntoResponse {
36+
let headers = request.headers();
37+
let host = match headers.get("host") {
38+
Some(host) => host.to_str().unwrap_or("localhost"),
39+
None => "localhost",
40+
};
41+
let path = request.uri().path();
42+
let path = path.trim_start_matches('/');
43+
44+
info!("Router request at: {} {}", host, path);
45+
46+
let deployment = get_last_deployment(host, &state.clone()).await;
47+
48+
if let Ok(deployment) = deployment {
49+
info!("Deployment found: {}", deployment.deployment_id);
50+
51+
let path = if path.is_empty() {
52+
"index.html"
53+
} else {
54+
path
55+
};
56+
57+
let file = DeploymentFile::get_file_by_path(&state.clone().database, &deployment.deployment_id, path)
58+
.await.ok();
59+
60+
if let Some(deployment_file) = file {
61+
info!("File found: {}", deployment_file.file_hash);
62+
63+
// stream file from s3 storage and return it
64+
let s3_path = deployment_file.file_hash.clone();
65+
if let Ok(s3_data) = state.storage.bucket.get_object_stream(s3_path).await {
66+
// Stream the S3 response bytes directly to the client
67+
let stream = s3_data.bytes.map(|chunk| {
68+
chunk.map_err(|e| {
69+
info!("Error streaming file: {}", e);
70+
std::io::Error::new(std::io::ErrorKind::Other, e)
71+
})
72+
});
73+
74+
let body = Body::from_bytes_stream(stream);
75+
return Response::builder()
76+
.status(StatusCode::OK)
77+
.header("content-type", deployment_file.deployment_file_mime_type.clone())
78+
.body(body);
79+
} else {
80+
info!("File not found in s3");
81+
}
82+
} else {
83+
info!("No file found");
84+
}
85+
} else {
86+
info!("No deployment found");
87+
}
88+
3789
// inline 404 template
38-
Html(include_str!("./404.html"))
90+
Response::builder()
91+
.status(StatusCode::NOT_FOUND)
92+
.body(Body::from_string(include_str!("./404.html").to_string()))
93+
}
94+
95+
async fn get_last_deployment(host: &str, state: &State) -> Result<Deployment, HttpError> {
96+
// get domain
97+
// get site
98+
// get last deployment
99+
// get file at path in deployment
100+
// otherwise return default /index.html if exists
101+
let domain = Domain::existing_domain_by_name(host, &state.clone())
102+
.await.ok().flatten();
103+
104+
if let Some(domain) = domain {
105+
let site = Site::get_by_id(&state.clone().database, &domain.site_id)
106+
.await.ok();
107+
108+
if let Some(site) = site {
109+
let deployment = Deployment::get_last_by_site_id(&state.clone().database, &site.site_id)
110+
.await.ok();
111+
112+
if let Some(deployment) = deployment {
113+
return Ok(deployment);
114+
}
115+
}
116+
}
117+
118+
Err(HttpError::NotFound)
39119
}

0 commit comments

Comments
 (0)