Skip to content

Commit b617229

Browse files
committed
Replace String with Ulids to store ids
1 parent 970a5a5 commit b617229

File tree

14 files changed

+42
-34
lines changed

14 files changed

+42
-34
lines changed

src/cli.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
use clap::Parser;
2020
use std::{env, fs, path::PathBuf};
21-
21+
use ulid::Ulid;
2222
use url::Url;
2323

2424
#[cfg(feature = "kafka")]
@@ -399,7 +399,7 @@ pub struct OidcConfig {
399399
required = false,
400400
help = "Client id for OIDC provider"
401401
)]
402-
pub client_id: String,
402+
pub client_id: Ulid,
403403

404404
#[arg(
405405
long = "oidc-client-secret",

src/connectors/kafka/config.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use rdkafka::{ClientConfig, Offset};
44
use serde::{Deserialize, Serialize};
55
use std::path::PathBuf;
66
use std::time::Duration;
7+
use ulid::Ulid;
78

89
#[derive(Debug, Clone, Parser)]
910
pub struct KafkaConfig {
@@ -24,7 +25,7 @@ pub struct KafkaConfig {
2425
value_name = "client_id",
2526
help = "Client ID for Kafka connection"
2627
)]
27-
pub client_id: String,
28+
pub client_id: UsernameValidationError,
2829

2930
#[arg(
3031
long = "partition-listener-concurrency",
@@ -76,7 +77,7 @@ pub struct ConsumerConfig {
7677
default_value_t = String::from("parseable-connect-cg"),
7778
help = "Consumer group ID"
7879
)]
79-
pub group_id: String,
80+
pub group_id: Ulid,
8081

8182
// uses per partition stream micro-batch buffer size
8283
#[arg(
@@ -108,7 +109,7 @@ pub struct ConsumerConfig {
108109
default_value_t = format!("parseable-connect-cg-ii-{}", rand::random::<u8>()).to_string(),
109110
help = "Group instance ID for static membership"
110111
)]
111-
pub group_instance_id: String,
112+
pub group_instance_id: Ulid,
112113

113114
#[arg(
114115
long = "consumer-partition-strategy",

src/correlation.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818

1919
use std::collections::{HashMap, HashSet};
20+
use ulid::Ulid;
2021

2122
use actix_web::{http::header::ContentType, Error};
2223
use chrono::Utc;
@@ -207,8 +208,8 @@ pub enum CorrelationVersion {
207208
V1,
208209
}
209210

210-
type CorrelationId = String;
211-
type UserId = String;
211+
type CorrelationId = Ulid;
212+
type UserId = Ulid;
212213

213214
#[derive(Debug, Clone, Serialize, Deserialize)]
214215
#[serde(rename_all = "camelCase")]

src/handlers/http/kinesis.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ use base64::{engine::general_purpose::STANDARD, Engine as _};
2020
use serde::{Deserialize, Serialize};
2121
use serde_json::{Map, Value};
2222
use std::str;
23+
use ulid::Ulid;
2324

2425
use crate::utils::json::flatten::{generic_flattening, has_more_than_max_allowed_levels};
2526

2627
#[derive(Serialize, Deserialize, Debug)]
2728
#[serde(rename_all = "camelCase")]
2829
pub struct Message {
2930
records: Vec<Data>,
30-
request_id: String,
31+
request_id: Ulid,
3132
timestamp: u64,
3233
}
3334
#[derive(Serialize, Deserialize, Debug)]

src/handlers/http/modal/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use serde_json::{Map, Value};
3232
use ssl_acceptor::get_ssl_acceptor;
3333
use tokio::sync::oneshot;
3434
use tracing::{error, info, warn};
35+
use ulid::Ulid;
3536

3637
use crate::{
3738
alerts::ALERTS,
@@ -246,7 +247,7 @@ pub struct NodeMetadata {
246247
pub domain_name: String,
247248
pub bucket_name: String,
248249
pub token: String,
249-
pub node_id: String,
250+
pub node_id: Ulid,
250251
pub flight_port: String,
251252
pub node_type: NodeType,
252253
}
@@ -259,7 +260,7 @@ impl NodeMetadata {
259260
bucket_name: String,
260261
username: &str,
261262
password: &str,
262-
node_id: String,
263+
node_id: lid,
263264
flight_port: String,
264265
node_type: NodeType,
265266
) -> Self {

src/handlers/http/rbac.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use actix_web::{
3131
use http::StatusCode;
3232
use itertools::Itertools;
3333
use tokio::sync::Mutex;
34+
use ulid::Ulid;
3435

3536
use super::modal::utils::rbac_utils::{get_metadata, put_metadata};
3637

@@ -39,7 +40,7 @@ static UPDATE_LOCK: Mutex<()> = Mutex::const_new(());
3940

4041
#[derive(serde::Serialize)]
4142
struct User {
42-
id: String,
43+
id: Ulid,
4344
method: String,
4445
}
4546

src/livetail.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use futures_util::Stream;
2626
use tokio::sync::mpsc::{
2727
self, error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
2828
};
29-
29+
use ulid::Ulid;
3030
use arrow_array::RecordBatch;
3131
use once_cell::sync::Lazy;
3232

@@ -39,7 +39,7 @@ pub struct LiveTail {
3939
}
4040

4141
impl LiveTail {
42-
pub fn new_pipe(&self, id: String, stream: String) -> ReceiverPipe {
42+
pub fn new_pipe(&self, id: ulid, stream: String) -> ReceiverPipe {
4343
let (sender, revc) = channel(id, stream.clone(), Arc::downgrade(&self.pipes));
4444
self.pipes
4545
.write()
@@ -106,7 +106,7 @@ pub struct ReceiverPipe {
106106
}
107107

108108
fn channel(
109-
id: String,
109+
id: ulid,
110110
stream: String,
111111
weak_ptr: Weak<LiveTailRegistry>,
112112
) -> (SenderPipe, ReceiverPipe) {

src/oidc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use std::collections::HashMap;
20-
20+
use ulid::Ulid;
2121
use openid::{Client, CompactJson, CustomClaims, Discovered, StandardClaims};
2222
use url::Url;
2323

@@ -37,7 +37,7 @@ pub enum Origin {
3737
#[derive(Debug, Clone)]
3838
pub struct OpenidConfig {
3939
/// Client id
40-
pub id: String,
40+
pub id: Ulid,
4141
/// Client Secret
4242
pub secret: String,
4343
/// OP host address over which discovery can be done

src/parseable/streams.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use std::{
2626
sync::{Arc, Mutex, RwLock},
2727
time::{Instant, SystemTime, UNIX_EPOCH},
2828
};
29+
use ulid::Ulid;
2930

3031
use arrow_array::RecordBatch;
3132
use arrow_schema::{Field, Fields, Schema};
@@ -100,15 +101,15 @@ pub struct Stream {
100101
pub data_path: PathBuf,
101102
pub options: Arc<Options>,
102103
pub writer: Mutex<Writer>,
103-
pub ingestor_id: Option<String>,
104+
pub ingestor_id: Option<Ulid>,
104105
}
105106

106107
impl Stream {
107108
pub fn new(
108109
options: Arc<Options>,
109110
stream_name: impl Into<String>,
110111
metadata: LogStreamMetadata,
111-
ingestor_id: Option<String>,
112+
ingestor_id: Option<Ulid>,
112113
) -> StreamRef {
113114
let stream_name = stream_name.into();
114115
let data_path = options.local_stream_data_path(&stream_name);
@@ -763,7 +764,7 @@ impl Streams {
763764
options: Arc<Options>,
764765
stream_name: String,
765766
metadata: LogStreamMetadata,
766-
ingestor_id: Option<String>,
767+
ingestor_id: Option<Ulid>,
767768
) -> StreamRef {
768769
let mut guard = self.write().expect(LOCK_EXPECT);
769770
if let Some(stream) = guard.get(&stream_name) {
@@ -1332,7 +1333,7 @@ mod tests {
13321333
let options = Arc::new(Options::default());
13331334
let stream_name = String::from("concurrent_stream");
13341335
let metadata = LogStreamMetadata::default();
1335-
let ingestor_id = Some(String::from("concurrent_ingestor"));
1336+
let ingestor_id = Some(Ulid::from_string("concurrent_ingestor")unwrap()));
13361337

13371338
// Barrier to synchronize threads
13381339
let barrier = Arc::new(Barrier::new(2));

src/prism/home/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use itertools::Itertools;
2525
use relative_path::RelativePathBuf;
2626
use serde::Serialize;
2727
use tracing::error;
28+
use ulid::Ulid;
2829

2930
use crate::{
3031
alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS},
@@ -78,7 +79,7 @@ pub enum ResourceType {
7879

7980
#[derive(Debug, Serialize)]
8081
pub struct Resource {
81-
id: String,
82+
id: Ulid,
8283
name: String,
8384
resource_type: ResourceType,
8485
}
@@ -277,7 +278,7 @@ pub async fn generate_home_search_response(
277278
for title in stream_titles {
278279
if title.to_lowercase().contains(query_value) {
279280
resources.push(Resource {
280-
id: title.clone(),
281+
id: Ulid::from_string(&title).unwrap_or_else(|_| Ulid::new()),
281282
name: title,
282283
resource_type: ResourceType::DataSet,
283284
});

src/rbac/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use url::Url;
3232
use crate::rbac::map::{mut_sessions, mut_users, sessions, users};
3333
use crate::rbac::role::Action;
3434
use crate::rbac::user::User;
35-
35+
use ulid::Ulid;
3636
use self::map::SessionKey;
3737
use self::role::{Permission, RoleBuilder};
3838
use self::user::UserType;
@@ -176,7 +176,7 @@ impl Users {
176176
#[derive(Debug, Serialize, Clone)]
177177
pub struct UsersPrism {
178178
// username
179-
pub id: String,
179+
pub id: Ulid,
180180
// oaith or native
181181
pub method: String,
182182
// email only if method is oauth

src/rbac/user.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use std::collections::HashSet;
20-
20+
use ulid::Ulid;
2121
use argon2::{
2222
password_hash::{rand_core::OsRng, PasswordHasher, SaltString},
2323
Argon2, PasswordHash, PasswordVerifier,
@@ -152,7 +152,7 @@ pub fn get_admin_user() -> User {
152152

153153
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
154154
pub struct OAuth {
155-
pub userid: String,
155+
pub userid: Ulid,
156156
pub user_info: UserInfo,
157157
}
158158

src/storage/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::{
3232
};
3333

3434
use chrono::Utc;
35-
35+
use ulid::Ulid;
3636
use std::fmt::Debug;
3737

3838
mod azure_blob;
@@ -174,25 +174,25 @@ impl std::fmt::Display for StreamType {
174174

175175
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
176176
pub struct Owner {
177-
pub id: String,
177+
pub id: Ulid,
178178
pub group: String,
179179
}
180180

181181
impl Owner {
182-
pub fn new(id: String, group: String) -> Self {
182+
pub fn new(id: Ulid, group: String) -> Self {
183183
Self { id, group }
184184
}
185185
}
186186

187187
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
188188
pub struct Permisssion {
189-
pub id: String,
189+
pub id: Ulid,
190190
pub group: String,
191191
pub access: Vec<String>,
192192
}
193193

194194
impl Permisssion {
195-
pub fn new(id: String) -> Self {
195+
pub fn new(id: Ulid) -> Self {
196196
Self {
197197
id: id.clone(),
198198
group: id,

src/users/filters.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use once_cell::sync::Lazy;
2020
use serde::{Deserialize, Serialize};
2121
use serde_json::Value;
2222
use tokio::sync::RwLock;
23+
use ulid::Ulid;
2324

2425
use super::TimeFilter;
2526
use crate::{
@@ -73,21 +74,21 @@ impl FilterType {
7374

7475
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
7576
pub struct FilterBuilder {
76-
pub id: String,
77+
pub id: Ulid,
7778
pub combinator: String,
7879
pub rules: Vec<FilterRules>,
7980
}
8081

8182
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
8283
pub struct FilterRules {
83-
pub id: String,
84+
pub id: Ulid,
8485
pub combinator: String,
8586
pub rules: Vec<Rules>,
8687
}
8788

8889
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
8990
pub struct Rules {
90-
pub id: String,
91+
pub id: Ulid,
9192
pub field: String,
9293
pub value: String,
9394
pub operator: String,

0 commit comments

Comments
 (0)