diff --git a/src/analytics.rs b/src/analytics.rs index 329a20632..8bafe8a95 100644 --- a/src/analytics.rs +++ b/src/analytics.rs @@ -43,7 +43,7 @@ use crate::{ option::Mode, parseable::PARSEABLE, stats::{self, Stats}, - storage, HTTP_CLIENT, + storage, HTTP_CLIENT, INTRA_CLUSTER_CLIENT, }; const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80"; @@ -280,7 +280,7 @@ async fn fetch_ingestors_metrics( )) .expect("Should be a valid URL"); - let resp = HTTP_CLIENT + let resp = INTRA_CLUSTER_CLIENT .get(uri) .header(header::AUTHORIZATION, im.token.clone()) .header(header::CONTENT_TYPE, "application/json") diff --git a/src/cli.rs b/src/cli.rs index 976e96969..d98499783 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -207,6 +207,18 @@ pub struct Options { )] pub trusted_ca_certs_path: Option, + /// Allows invalid TLS certificates for intra-cluster communication. + /// This is needed when nodes connect to each other via IP addresses + /// which don't match the domain names in their certificates. + /// SECURITY NOTE: Only enable this for trusted internal networks. + #[arg( + long, + env = "P_TLS_SKIP_VERIFY", + value_name = "bool", + default_value = "false" + )] + pub tls_skip_verify: bool, + // Storage configuration #[arg( long, diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index c1cd46b6c..4b5456806 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -50,7 +50,7 @@ use crate::storage::{ ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY, STREAM_ROOT_DIRECTORY, }; -use crate::HTTP_CLIENT; +use crate::INTRA_CLUSTER_CLIENT; use super::base_path_without_preceding_slash; use super::ingest::PostError; @@ -128,7 +128,7 @@ pub async fn sync_streams_with_ingestors( let headers = reqwest_headers_clone.clone(); let body = body_clone.clone(); async move { - let res = HTTP_CLIENT + let res = INTRA_CLUSTER_CLIENT .put(url) .headers(headers) .header(header::AUTHORIZATION, &ingestor.token) @@ -179,7 +179,7 @@ pub async fn sync_users_with_roles_with_ingestors( let role_data = role_data.clone(); async move { - let res = HTTP_CLIENT + let res = INTRA_CLUSTER_CLIENT .put(url) .header(header::AUTHORIZATION, &ingestor.token) .header(header::CONTENT_TYPE, "application/json") @@ -221,7 +221,7 @@ pub async fn sync_user_deletion_with_ingestors(username: &str) -> Result<(), RBA ); async move { - let res = HTTP_CLIENT + let res = INTRA_CLUSTER_CLIENT .delete(url) .header(header::AUTHORIZATION, &ingestor.token) .send() @@ -278,7 +278,7 @@ pub async fn sync_user_creation_with_ingestors( let user_data = user_data.clone(); async move { - let res = HTTP_CLIENT + let res = INTRA_CLUSTER_CLIENT .post(url) .header(header::AUTHORIZATION, &ingestor.token) .header(header::CONTENT_TYPE, "application/json") @@ -320,7 +320,7 @@ pub async fn sync_password_reset_with_ingestors(username: &str) -> Result<(), RB ); async move { - let res = HTTP_CLIENT + let res = INTRA_CLUSTER_CLIENT .post(url) .header(header::AUTHORIZATION, &ingestor.token) .header(header::CONTENT_TYPE, "application/json") @@ -364,7 +364,7 @@ pub async fn sync_role_update_with_ingestors( let privileges = privileges.clone(); async move { - let res = HTTP_CLIENT + let res = INTRA_CLUSTER_CLIENT .put(url) .header(header::AUTHORIZATION, &ingestor.token) .header(header::CONTENT_TYPE, "application/json") @@ -491,7 +491,7 @@ pub async fn send_stream_delete_request( if !utils::check_liveness(&ingestor.domain_name).await { return Ok(()); } - let resp = HTTP_CLIENT + let resp = INTRA_CLUSTER_CLIENT .delete(url) .header(header::CONTENT_TYPE, "application/json") .header(header::AUTHORIZATION, ingestor.token) @@ -529,7 +529,7 @@ pub async fn send_retention_cleanup_request( if !utils::check_liveness(&ingestor.domain_name).await { return Ok(first_event_at); } - let resp = HTTP_CLIENT + let resp = INTRA_CLUSTER_CLIENT .post(url) .header(header::CONTENT_TYPE, "application/json") .header(header::AUTHORIZATION, ingestor.token) @@ -636,7 +636,7 @@ async fn fetch_node_info(node: &T) -> Result bool { } }; - let req = HTTP_CLIENT + let req = INTRA_CLUSTER_CLIENT .get(uri) .header(header::CONTENT_TYPE, "application/json") .send() diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 3d674d20b..874a2aed5 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -25,7 +25,7 @@ use itertools::Itertools; use modal::{NodeMetadata, NodeType}; use serde_json::Value; -use crate::{parseable::PARSEABLE, storage::STREAM_ROOT_DIRECTORY, HTTP_CLIENT}; +use crate::{parseable::PARSEABLE, storage::STREAM_ROOT_DIRECTORY, INTRA_CLUSTER_CLIENT}; use self::query::Query; @@ -119,7 +119,7 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result = Lazy::new(|| { .build() .expect("Construction of client shouldn't fail") }); + +//separate client is created for intra cluster communication +//allow invalid certificates for connecting other nodes in the cluster +//required when querier/prism server tries to connect to other nodes via IP address directly +//but the certificate is valid for a specific domain name +pub static INTRA_CLUSTER_CLIENT: Lazy = Lazy::new(|| { + ClientBuilder::new() + .connect_timeout(Duration::from_secs(3)) // set a timeout of 3s for each connection setup + .timeout(Duration::from_secs(30)) // set a timeout of 30s for each request + .pool_idle_timeout(Duration::from_secs(90)) // set a timeout of 90s for each idle connection + .pool_max_idle_per_host(32) // max 32 idle connections per host + .gzip(true) // gzip compress for all requests + .brotli(true) // brotli compress for all requests + .use_rustls_tls() // use only the rustls backend + .http1_only() // use only http/1.1 + .danger_accept_invalid_certs(PARSEABLE.options.tls_skip_verify) + .build() + .expect("Construction of client shouldn't fail") +}); diff --git a/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs index f9f559f5e..d76a0ac5a 100644 --- a/src/metrics/prom_utils.rs +++ b/src/metrics/prom_utils.rs @@ -21,7 +21,7 @@ use crate::handlers::http::ingest::PostError; use crate::handlers::http::modal::Metadata; use crate::option::Mode; use crate::parseable::PARSEABLE; -use crate::HTTP_CLIENT; +use crate::INTRA_CLUSTER_CLIENT; use actix_web::http::header; use chrono::NaiveDateTime; use chrono::Utc; @@ -237,7 +237,7 @@ impl Metrics { .map_err(|err| { PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err)) })?; - let res = HTTP_CLIENT + let res = INTRA_CLUSTER_CLIENT .get(uri) .header(header::CONTENT_TYPE, "application/json") .header(header::AUTHORIZATION, metadata.token())