Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions application_sdk/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@
#: Directory for storing observability data
OBSERVABILITY_DIR = os.environ.get("ATLAN_OBSERVABILITY_DIR", "/tmp/observability")

# Enable Streamlit-based observability UI
ENABLE_STREAMLIT_BASED_OBSERVABILITY = (
os.getenv("ATLAN_ENABLE_STREAMLIT_BASED_OBSERVABILITY", "false").lower() == "true"
)

# Log batching configuration
LOG_BATCH_SIZE = int(os.environ.get("ATLAN_LOG_BATCH_SIZE", 100))
LOG_FLUSH_INTERVAL_SECONDS = int(os.environ.get("ATLAN_LOG_FLUSH_INTERVAL_SECONDS", 10))
Expand Down
38 changes: 38 additions & 0 deletions application_sdk/observability/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,3 +667,41 @@ def process_partitioned_files(directory, prefix=""):
# Start DuckDB UI
con.execute("CALL start_ui();")
self._duckdb_ui_con = con


class StreamlitUI:
"""Class to handle Streamlit-based observability UI functionality."""

def __init__(self, db_path="/tmp/observability/observability.db"):
"""Initialize the Streamlit UI handler."""
self.db_path = db_path
self._streamlit_process = None

def _is_streamlit_running(self, host="0.0.0.0", port=8501):
"""Check if Streamlit is already running on the default port."""
import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(0.5)
result = sock.connect_ex((host, port))
return result == 0

def start_ui(self, db_path=OBSERVABILITY_DIR):
"""Start Streamlit UI and create views for Hive partitioned parquet files."""
if not self._is_streamlit_running():
import os
import shutil
import subprocess
import sys

# Copy the streamlit UI file to the observability directory
streamlit_app_path = os.path.join(OBSERVABILITY_DIR, "streamlit_app.py")
current_dir = os.path.dirname(os.path.abspath(__file__))
source_file = os.path.join(current_dir, "streamlit_ui.py")
shutil.copy2(source_file, streamlit_app_path)

# Start Streamlit in a subprocess
self._streamlit_process = subprocess.Popen(
[sys.executable, "-m", "streamlit", "run", streamlit_app_path],
cwd=OBSERVABILITY_DIR,
)
179 changes: 179 additions & 0 deletions application_sdk/observability/streamlit_ui.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import logging
import os

import duckdb
import pandas as pd
import streamlit as st

from application_sdk.constants import ENABLE_STREAMLIT_BASED_OBSERVABILITY

if ENABLE_STREAMLIT_BASED_OBSERVABILITY:
import plotly.express as px
import streamlit as st
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

st.set_page_config(page_title="Atlan Observability", layout="wide")

# Initialize DuckDB connection
try:
con = duckdb.connect("observability.db")
logger.info("Successfully connected to DuckDB")
except Exception as e:
logger.error(f"Failed to connect to DuckDB: {e}")
st.error(f"Failed to connect to database: {e}")


# Function to load data from parquet files
def load_data(data_type):
try:
# Get the absolute path to the observability directory
current_dir = os.path.dirname(os.path.abspath(__file__))
data_dir = os.path.join(current_dir, data_type)
logger.info(f"Looking for data in directory: {data_dir}")

if not os.path.exists(data_dir):
logger.warning(f"Directory does not exist: {data_dir}")
st.warning(f"No data directory found for {data_type}")
return pd.DataFrame()

# List all parquet files
parquet_files = []
for root, _, files in os.walk(data_dir):
for file in files:
if file.endswith(".parquet"):
parquet_files.append(os.path.join(root, file))

if not parquet_files:
logger.warning(f"No parquet files found in {data_dir}")
st.warning(f"No data files found for {data_type}")
return pd.DataFrame()

logger.info(f"Found {len(parquet_files)} parquet files")

# Read all parquet files
query = f"""
SELECT *
FROM read_parquet(
{parquet_files},
hive_partitioning=true,
hive_types={{'year': INTEGER, 'month': INTEGER, 'day': INTEGER}}
)
"""
df = con.execute(query).df()
logger.info(f"Successfully loaded {len(df)} rows of {data_type} data")
return df

except Exception as e:
logger.error(f"Error loading {data_type} data: {e}")
st.error(f"Error loading {data_type} data: {e}")
return pd.DataFrame()


# Sidebar for navigation
st.sidebar.title("Navigation")
page = st.sidebar.radio("Go to", ["Logs", "Metrics", "Traces"])

# Main content
st.title(f"Atlan Observability - {page}")

# Load and display data based on selection
if page == "Logs":
df = load_data("logs")
if not df.empty:
st.write(f"Total records: {len(df)}")
st.dataframe(df)

# Create a stacked bar chart of log levels over time
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
df["minute"] = df["timestamp"].dt.floor("min")

# Create pivot table for stacked bar chart
log_levels = df.pivot_table(
index="minute",
columns="level",
values="timestamp",
aggfunc="count",
fill_value=0,
)

# Plot stacked bar chart
st.bar_chart(log_levels)
else:
st.info("No log data available")
elif page == "Metrics":
df = load_data("metrics")
if not df.empty:
st.write(f"Total records: {len(df)}")

# Convert timestamp to datetime
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")

# Separate counter and histogram metrics
counter_metrics = df[df["type"] == "counter"]
histogram_metrics = df[df["type"] == "histogram"]

# Display counter metrics
if not counter_metrics.empty:
st.subheader("Counter Metrics")
# Get unique counter metric names
counter_names = counter_metrics["name"].unique()

for metric_name in counter_names:
metric_data = counter_metrics[counter_metrics["name"] == metric_name]
# Ensure timestamp is datetime and sort
metric_data["timestamp"] = pd.to_datetime(
metric_data["timestamp"], unit="s"
)
# Sort using set_index and sort_index
metric_data = (
metric_data.set_index("timestamp").sort_index().reset_index()
)
# Create cumulative sum
metric_data["cumulative_value"] = metric_data.groupby("name")[
"value"
].cumsum()

st.write(f"### {metric_name}")
st.line_chart(metric_data.set_index("timestamp")["cumulative_value"])

# Display histogram metrics
if not histogram_metrics.empty:
st.subheader("Histogram Metrics")
# Get unique histogram metric names
histogram_names = histogram_metrics["name"].unique()

for metric_name in histogram_names:
metric_data = histogram_metrics[
histogram_metrics["name"] == metric_name
]

st.write(f"### {metric_name}")
# Create histogram
fig = px.histogram(
metric_data,
x="value",
nbins=50,
title=f"Distribution of {metric_name}",
)
st.plotly_chart(fig)

# Show time series of mean values
st.write(f"Mean {metric_name} over time")
time_series = metric_data.groupby("timestamp")["value"].mean()
st.line_chart(time_series)
else:
st.info("No metrics data available")
elif page == "Traces":
df = load_data("traces")
if not df.empty:
st.write(f"Total records: {len(df)}")
st.dataframe(df)
# Group by timestamp and count records
time_series = df.groupby(
pd.to_datetime(df["timestamp"], unit="s").dt.floor("min")
).size()
st.line_chart(time_series)
else:
st.info("No traces data available")
19 changes: 14 additions & 5 deletions application_sdk/server/fastapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
APP_PORT,
APP_TENANT_ID,
APPLICATION_NAME,
ENABLE_STREAMLIT_BASED_OBSERVABILITY,
WORKFLOW_UI_HOST,
WORKFLOW_UI_PORT,
)
from application_sdk.docgen import AtlanDocsGenerator
from application_sdk.handlers import HandlerInterface
from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.observability.metrics_adaptor import MetricType, get_metrics
from application_sdk.observability.observability import DuckDBUI
from application_sdk.observability.observability import DuckDBUI, StreamlitUI
from application_sdk.outputs.eventstore import AtlanEvent, EventStore
from application_sdk.server import ServerInterface
from application_sdk.server.fastapi.middleware.logmiddleware import LogMiddleware
Expand Down Expand Up @@ -84,6 +85,7 @@ class APIServer(ServerInterface):
workflows (List[WorkflowInterface]): List of registered workflows.
event_triggers (List[EventWorkflowTrigger]): List of event-based workflow triggers.
duckdb_ui (DuckDBUI): Instance of DuckDBUI for handling DuckDB UI functionality.
streamlit_ui (StreamlitUI): Instance of StreamlitUI for handling Streamlit UI functionality.

Args:
lifespan: Optional lifespan manager for the FastAPI application.
Expand All @@ -100,6 +102,7 @@ class APIServer(ServerInterface):
handler: Optional[HandlerInterface]
templates: Jinja2Templates
duckdb_ui: DuckDBUI
streamlit_ui: StreamlitUI

docs_directory_path: str = "docs"
docs_export_path: str = "dist"
Expand All @@ -126,6 +129,7 @@ def __init__(
self.workflow_client = workflow_client
self.templates = Jinja2Templates(directory=frontend_templates_path)
self.duckdb_ui = DuckDBUI()
self.streamlit_ui = StreamlitUI()

# Create the FastAPI app using the renamed import
if isinstance(lifespan, Callable):
Expand Down Expand Up @@ -155,10 +159,15 @@ def __init__(
super().__init__(handler)

def observability(self, request: Request) -> RedirectResponse:
"""Endpoint to launch DuckDB UI for log self-serve exploration."""
self.duckdb_ui.start_ui()
# Redirect to the local DuckDB UI
return RedirectResponse(url="http://0.0.0.0:4213")
"""Endpoint to launch observability UI for log self-serve exploration."""
if ENABLE_STREAMLIT_BASED_OBSERVABILITY:
self.streamlit_ui.start_ui()
# Redirect to the local Streamlit UI
return RedirectResponse(url="http://0.0.0.0:8501")
else:
self.duckdb_ui.start_ui()
# Redirect to the local DuckDB UI
return RedirectResponse(url="http://0.0.0.0:4213")

def setup_atlan_docs(self):
"""Set up and serve Atlan documentation.
Expand Down
47 changes: 47 additions & 0 deletions docs/docs/concepts/observability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
## Streamlit-based Observability UI

The Application SDK includes a Streamlit-based observability UI that provides interactive visualizations for logs, metrics, and traces. This UI is optional and can be enabled by setting the `ATLAN_ENABLE_STREAMLIT_BASED_OBSERVABILITY` environment variable to `true`.

### Features

1. **Logs Visualization**
- Stacked bar chart showing log levels over time
- Interactive filtering and exploration of log data
- Real-time updates as new logs are collected

2. **Metrics Visualization**
- Counter metrics: Line charts showing cumulative counts over time
- Histogram metrics: Distribution plots and time series of mean values
- Interactive exploration of metric data

3. **Traces Visualization**
- Time series visualization of trace data
- Interactive exploration of trace patterns

### Setup

1. Install the required dependencies:
```bash
uv sync --extra observability
```

2. Enable the Streamlit UI in your environment:
```bash
export ATLAN_ENABLE_STREAMLIT_BASED_OBSERVABILITY=true
```

3. The UI will be available at `http://localhost:8501` by default.

### Dependencies

The Streamlit UI requires the following optional dependencies:
- `streamlit>=1.32.0`: For the web UI framework
- `plotly>=5.19.0`: For interactive visualizations

These dependencies are included in the `observability` optional dependency group.

### Configuration

| Environment Variable | Description | Default Value |
|---------------------|-------------|---------------|
| `ATLAN_ENABLE_STREAMLIT_BASED_OBSERVABILITY` | Whether to enable the Streamlit-based observability UI | `false` |
1 change: 1 addition & 0 deletions docs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The Application SDK uses environment variables for configuration. These can be s
| `ATLAN_OBSERVABILITY_DIR` | Base directory for storing observability data | `/tmp/observability` |
| `ATLAN_ENABLE_HIVE_PARTITIONING` | Whether to enable Hive partitioning for observability data | `true` |
| `ATLAN_ENABLE_OBSERVABILITY_DAPR_SINK` | Whether to enable Dapr sink for observability data | `true` |
| `ATLAN_ENABLE_STREAMLIT_BASED_OBSERVABILITY` | Whether to enable the Streamlit-based observability UI | `false` |

## Logging Configuration

Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ scale_data_generator = [
"faker>=37.1.0",
"numpy>=1.23.5,<3.0.0",
]
observability = [
"streamlit>=1.32.0",
"plotly>=5.19.0",
]

# Development dependencies - Install using: uv sync --group dev
[dependency-groups]
Expand Down
Loading