Skip to content

allow invalid certificate for intra cluster calls #1309

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::{
option::Mode,
parseable::PARSEABLE,
stats::{self, Stats},
storage, HTTP_CLIENT,
storage, HTTP_CLIENT, INTRA_CLUSTER_CLIENT,
};

const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80";
Expand Down Expand Up @@ -280,7 +280,7 @@ async fn fetch_ingestors_metrics(
))
.expect("Should be a valid URL");

let resp = HTTP_CLIENT
let resp = INTRA_CLUSTER_CLIENT
.get(uri)
.header(header::AUTHORIZATION, im.token.clone())
.header(header::CONTENT_TYPE, "application/json")
Expand Down
12 changes: 12 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,18 @@ pub struct Options {
)]
pub trusted_ca_certs_path: Option<PathBuf>,

/// Allows invalid TLS certificates for intra-cluster communication.
/// This is needed when nodes connect to each other via IP addresses
/// which don't match the domain names in their certificates.
/// SECURITY NOTE: Only enable this for trusted internal networks.
#[arg(
long,
env = "P_TLS_SKIP_VERIFY",
value_name = "bool",
default_value = "false"
)]
pub tls_skip_verify: bool,

// Storage configuration
#[arg(
long,
Expand Down
22 changes: 11 additions & 11 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::storage::{
ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY,
STREAM_ROOT_DIRECTORY,
};
use crate::HTTP_CLIENT;
use crate::INTRA_CLUSTER_CLIENT;

use super::base_path_without_preceding_slash;
use super::ingest::PostError;
Expand Down Expand Up @@ -128,7 +128,7 @@ pub async fn sync_streams_with_ingestors(
let headers = reqwest_headers_clone.clone();
let body = body_clone.clone();
async move {
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.put(url)
.headers(headers)
.header(header::AUTHORIZATION, &ingestor.token)
Expand Down Expand Up @@ -179,7 +179,7 @@ pub async fn sync_users_with_roles_with_ingestors(
let role_data = role_data.clone();

async move {
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.put(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -221,7 +221,7 @@ pub async fn sync_user_deletion_with_ingestors(username: &str) -> Result<(), RBA
);

async move {
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.delete(url)
.header(header::AUTHORIZATION, &ingestor.token)
.send()
Expand Down Expand Up @@ -278,7 +278,7 @@ pub async fn sync_user_creation_with_ingestors(
let user_data = user_data.clone();

async move {
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.post(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -320,7 +320,7 @@ pub async fn sync_password_reset_with_ingestors(username: &str) -> Result<(), RB
);

async move {
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.post(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -364,7 +364,7 @@ pub async fn sync_role_update_with_ingestors(
let privileges = privileges.clone();

async move {
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.put(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -491,7 +491,7 @@ pub async fn send_stream_delete_request(
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(());
}
let resp = HTTP_CLIENT
let resp = INTRA_CLUSTER_CLIENT
.delete(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
Expand Down Expand Up @@ -529,7 +529,7 @@ pub async fn send_retention_cleanup_request(
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(first_event_at);
}
let resp = HTTP_CLIENT
let resp = INTRA_CLUSTER_CLIENT
.post(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
Expand Down Expand Up @@ -636,7 +636,7 @@ async fn fetch_node_info<T: Metadata>(node: &T) -> Result<utils::ClusterInfo, St
))
.expect("should always be a valid url");

let resp = HTTP_CLIENT
let resp = INTRA_CLUSTER_CLIENT
.get(uri)
.header(header::AUTHORIZATION, node.token().to_owned())
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -855,7 +855,7 @@ where
}

// Fetch metrics
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.get(uri)
.header(header::AUTHORIZATION, node.token())
.header(header::CONTENT_TYPE, "application/json")
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/cluster/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use crate::{
handlers::http::{base_path_without_preceding_slash, modal::NodeType},
HTTP_CLIENT,
INTRA_CLUSTER_CLIENT,
};
use actix_web::http::header;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -188,7 +188,7 @@ pub async fn check_liveness(domain_name: &str) -> bool {
}
};

let req = HTTP_CLIENT
let req = INTRA_CLUSTER_CLIENT
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.send()
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use itertools::Itertools;
use modal::{NodeMetadata, NodeType};
use serde_json::Value;

use crate::{parseable::PARSEABLE, storage::STREAM_ROOT_DIRECTORY, HTTP_CLIENT};
use crate::{parseable::PARSEABLE, storage::STREAM_ROOT_DIRECTORY, INTRA_CLUSTER_CLIENT};

use self::query::Query;

Expand Down Expand Up @@ -119,7 +119,7 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec
base_path_without_preceding_slash(),
"query"
);
let reqw = HTTP_CLIENT
let reqw = INTRA_CLUSTER_CLIENT
.post(uri)
.json(query)
.header(http::header::AUTHORIZATION, im.token.clone())
Expand Down
20 changes: 20 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub use handlers::http::modal::{
ingest_server::IngestServer, query_server::QueryServer, server::Server, ParseableServer,
};
use once_cell::sync::Lazy;
use parseable::PARSEABLE;
use reqwest::{Client, ClientBuilder};

// It is very unlikely that panic will occur when dealing with locks.
Expand Down Expand Up @@ -85,3 +86,22 @@ pub static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
.build()
.expect("Construction of client shouldn't fail")
});

//separate client is created for intra cluster communication
//allow invalid certificates for connecting other nodes in the cluster
//required when querier/prism server tries to connect to other nodes via IP address directly
//but the certificate is valid for a specific domain name
pub static INTRA_CLUSTER_CLIENT: Lazy<Client> = Lazy::new(|| {
ClientBuilder::new()
.connect_timeout(Duration::from_secs(3)) // set a timeout of 3s for each connection setup
.timeout(Duration::from_secs(30)) // set a timeout of 30s for each request
.pool_idle_timeout(Duration::from_secs(90)) // set a timeout of 90s for each idle connection
.pool_max_idle_per_host(32) // max 32 idle connections per host
.gzip(true) // gzip compress for all requests
.brotli(true) // brotli compress for all requests
.use_rustls_tls() // use only the rustls backend
.http1_only() // use only http/1.1
.danger_accept_invalid_certs(PARSEABLE.options.tls_skip_verify)
.build()
.expect("Construction of client shouldn't fail")
});
4 changes: 2 additions & 2 deletions src/metrics/prom_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::handlers::http::ingest::PostError;
use crate::handlers::http::modal::Metadata;
use crate::option::Mode;
use crate::parseable::PARSEABLE;
use crate::HTTP_CLIENT;
use crate::INTRA_CLUSTER_CLIENT;
use actix_web::http::header;
use chrono::NaiveDateTime;
use chrono::Utc;
Expand Down Expand Up @@ -237,7 +237,7 @@ impl Metrics {
.map_err(|err| {
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, metadata.token())
Expand Down
Loading