This commit is contained in:
Simon Gruber
2025-12-14 20:26:57 +01:00
parent 187960edfd
commit 49d3cc4772
9 changed files with 410 additions and 480 deletions
+34 -84
View File
@@ -18,57 +18,25 @@ class MetricsAggregator:
Returns: Returns:
Enhanced metrics list with aggregations Enhanced metrics list with aggregations
""" """
if not all_metrics:
return all_metrics
aggregated = list(all_metrics) # Start with original metrics aggregated = list(all_metrics) # Start with original metrics
timestamp = all_metrics[0].get('timestamp', 0)
# Add container-level aggregations # Add volume-level aggregations (most useful)
container_aggs = self._aggregate_by_container(all_metrics) volume_aggs = self._aggregate_volumes(all_metrics, timestamp)
aggregated.extend(container_aggs)
# Add volume-level aggregations
volume_aggs = self._aggregate_by_volume(all_metrics)
aggregated.extend(volume_aggs) aggregated.extend(volume_aggs)
# Add system-level aggregations # Add system-level aggregations
system_aggs = self._aggregate_system_metrics(all_metrics) system_aggs = self._aggregate_system(all_metrics, timestamp)
aggregated.extend(system_aggs) aggregated.extend(system_aggs)
return aggregated return aggregated
def _aggregate_by_container(self, metrics: List[Dict[str, Any]]) -> List[Dict[str, Any]]: def _aggregate_volumes(self, metrics: List[Dict[str, Any]], timestamp: int) -> List[Dict[str, Any]]:
"""Create container-level aggregations."""
aggregations = []
# Group metrics by container
container_metrics = defaultdict(list)
for metric in metrics:
name = metric.get('name', '')
if name.startswith('containers.'):
parts = name.split('.')
if len(parts) >= 2:
container_name = parts[1]
container_metrics[container_name].append(metric)
# For each container, create aggregations
for container_name, cmets in container_metrics.items():
timestamp = cmets[0].get('timestamp') if cmets else 0
# Count different metric types for this container
metric_count = len(cmets)
aggregations.append({
'name': f'aggregated.containers.{container_name}.metric_count',
'value': metric_count,
'timestamp': timestamp
})
return aggregations
def _aggregate_by_volume(self, metrics: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Create volume-level aggregations.""" """Create volume-level aggregations."""
aggregations = [] volume_containers = {}
# Find all volume metrics
volume_containers = defaultdict(int)
for metric in metrics: for metric in metrics:
name = metric.get('name', '') name = metric.get('name', '')
@@ -76,67 +44,49 @@ class MetricsAggregator:
parts = name.split('.') parts = name.split('.')
if len(parts) >= 2: if len(parts) >= 2:
volume_name = parts[1] volume_name = parts[1]
count = metric.get('value', 0) volume_containers[volume_name] = metric.get('value', 0)
volume_containers[volume_name] = count
# Create summary metrics if not volume_containers:
if volume_containers: return []
timestamp = metrics[0].get('timestamp', 0) if metrics else 0
volumes_in_use = sum(1 for count in volume_containers.values() if count > 0)
# Total volumes
aggregations.append({ return [
{
'name': 'aggregated.volumes.total_count', 'name': 'aggregated.volumes.total_count',
'value': len(volume_containers), 'value': len(volume_containers),
'timestamp': timestamp 'timestamp': timestamp
}) },
{
# Volumes in use (with at least one container)
volumes_in_use = sum(1 for count in volume_containers.values() if count > 0)
aggregations.append({
'name': 'aggregated.volumes.in_use_count', 'name': 'aggregated.volumes.in_use_count',
'value': volumes_in_use, 'value': volumes_in_use,
'timestamp': timestamp 'timestamp': timestamp
}) },
{
# Unused volumes
aggregations.append({
'name': 'aggregated.volumes.unused_count', 'name': 'aggregated.volumes.unused_count',
'value': len(volume_containers) - volumes_in_use, 'value': len(volume_containers) - volumes_in_use,
'timestamp': timestamp 'timestamp': timestamp
}) }
]
return aggregations
def _aggregate_system_metrics(self, metrics: List[Dict[str, Any]]) -> List[Dict[str, Any]]: def _aggregate_system(self, metrics: List[Dict[str, Any]], timestamp: int) -> List[Dict[str, Any]]:
"""Create system-level aggregations.""" """Create system-level aggregations."""
aggregations = []
# Find system metrics
total_containers = 0 total_containers = 0
running_containers = 0 running_containers = 0
total_images = 0
for metric in metrics: for metric in metrics:
name = metric.get('name', '') name = metric.get('name', '')
value = metric.get('value', 0)
if name == 'system.containers.total': if name == 'system.containers.total':
total_containers = value total_containers = metric.get('value', 0)
elif name == 'system.containers.running': elif name == 'system.containers.running':
running_containers = value running_containers = metric.get('value', 0)
elif name == 'system.images.total':
total_images = value
if total_containers > 0 or total_images > 0: if total_containers > 0:
timestamp = metrics[0].get('timestamp', 0) if metrics else 0 utilization = (running_containers / total_containers) * 100
return [{
# Container utilization percentage 'name': 'aggregated.system.container_utilization_percent',
if total_containers > 0: 'value': round(utilization, 2),
utilization = (running_containers / total_containers) * 100 'timestamp': timestamp
aggregations.append({ }]
'name': 'aggregated.system.container_utilization_percent',
'value': round(utilization, 2),
'timestamp': timestamp
})
return aggregations return []
+12 -1
View File
@@ -2,12 +2,23 @@
Base collector interface for Docker metrics collection. Base collector interface for Docker metrics collection.
""" """
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Dict, List, Any from typing import Dict, List, Any, Optional
import docker
class BaseCollector(ABC): class BaseCollector(ABC):
"""Abstract base class for all metric collectors.""" """Abstract base class for all metric collectors."""
# Shared Docker client instance across all collectors
_shared_client: Optional[docker.DockerClient] = None
@classmethod
def get_docker_client(cls) -> docker.DockerClient:
"""Get or create shared Docker client instance."""
if cls._shared_client is None:
cls._shared_client = docker.from_env()
return cls._shared_client
@abstractmethod @abstractmethod
def collect(self) -> List[Dict[str, Any]]: def collect(self) -> List[Dict[str, Any]]:
""" """
+126 -171
View File
@@ -1,7 +1,6 @@
""" """
Container metrics collector - gathers CPU, memory, state, and health metrics. Container metrics collector - gathers CPU, memory, state, and health metrics.
""" """
import docker
import time import time
from typing import Dict, List, Any from typing import Dict, List, Any
from .base import BaseCollector from .base import BaseCollector
@@ -11,7 +10,7 @@ class ContainerCollector(BaseCollector):
"""Collects metrics from Docker containers.""" """Collects metrics from Docker containers."""
def __init__(self): def __init__(self):
self.client = docker.from_env() self.client = self.get_docker_client()
def get_name(self) -> str: def get_name(self) -> str:
return "container" return "container"
@@ -26,10 +25,10 @@ class ContainerCollector(BaseCollector):
metrics = [] metrics = []
timestamp = int(time.time()) timestamp = int(time.time())
try: containers = self.client.containers.list(all=True)
containers = self.client.containers.list(all=True)
for container in containers:
for container in containers: try:
container_name = container.name container_name = container.name
# Basic state metrics # Basic state metrics
@@ -60,71 +59,61 @@ class ContainerCollector(BaseCollector):
}) })
# Disk usage metrics (available for all containers) # Disk usage metrics (available for all containers)
try: disk_usage = self._get_container_disk_usage(container)
disk_usage = self._get_container_disk_usage(container) if disk_usage is not None:
if disk_usage is not None: metrics.append({
metrics.append({ 'name': f'containers.{container_name}.disk_usage_bytes',
'name': f'containers.{container_name}.disk_usage_bytes', 'value': disk_usage,
'value': disk_usage, 'timestamp': timestamp
'timestamp': timestamp })
})
except Exception as e:
print(f"Warning: Could not collect disk usage for {container_name}: {e}")
# Only collect resource metrics for running containers # Only collect resource metrics for running containers
if state == 'running': if state == 'running':
try: stats = container.stats(stream=False)
stats = container.stats(stream=False)
# CPU metrics
# CPU metrics cpu_percent = self._calculate_cpu_percent(stats)
cpu_percent = self._calculate_cpu_percent(stats) metrics.append({
'name': f'containers.{container_name}.cpu_percent',
'value': cpu_percent,
'timestamp': timestamp
})
# Memory metrics
memory_stats = stats.get('memory_stats', {})
memory_usage = memory_stats.get('usage', 0)
memory_limit = memory_stats.get('limit', 1)
memory_percent = (memory_usage / memory_limit) * 100 if memory_limit > 0 else 0
metrics.append({
'name': f'containers.{container_name}.memory_bytes',
'value': memory_usage,
'timestamp': timestamp
})
metrics.append({
'name': f'containers.{container_name}.memory_percent',
'value': round(memory_percent, 2),
'timestamp': timestamp
})
# Network metrics
for net_metric in self._get_network_metrics(stats):
metrics.append({ metrics.append({
'name': f'containers.{container_name}.cpu_percent', 'name': f'containers.{container_name}.{net_metric["name"]}',
'value': cpu_percent, 'value': net_metric['value'],
'timestamp': timestamp 'timestamp': timestamp
}) })
# Memory metrics # Block I/O metrics
memory_stats = stats.get('memory_stats', {}) for io_metric in self._get_io_metrics(stats):
memory_usage = memory_stats.get('usage', 0)
memory_limit = memory_stats.get('limit', 1)
memory_percent = (memory_usage / memory_limit) * 100 if memory_limit > 0 else 0
metrics.append({ metrics.append({
'name': f'containers.{container_name}.memory_bytes', 'name': f'containers.{container_name}.{io_metric["name"]}',
'value': memory_usage, 'value': io_metric['value'],
'timestamp': timestamp 'timestamp': timestamp
}) })
except Exception as e:
metrics.append({ print(f" Warning: Error collecting metrics for {container.name}: {e}")
'name': f'containers.{container_name}.memory_percent',
'value': memory_percent,
'timestamp': timestamp
})
# Network metrics
network_metrics = self._get_network_metrics(stats)
for net_metric in network_metrics:
metrics.append({
'name': f'containers.{container_name}.{net_metric["name"]}',
'value': net_metric['value'],
'timestamp': timestamp
})
# Block I/O metrics
io_metrics = self._get_io_metrics(stats)
for io_metric in io_metrics:
metrics.append({
'name': f'containers.{container_name}.{io_metric["name"]}',
'value': io_metric['value'],
'timestamp': timestamp
})
except Exception as e:
print(f"Warning: Could not collect stats for {container_name}: {e}")
except Exception as e:
print(f"Error collecting container metrics: {e}")
return metrics return metrics
@@ -142,128 +131,94 @@ class ContainerCollector(BaseCollector):
def _get_health_status(self, container) -> int: def _get_health_status(self, container) -> int:
"""Get container health status as numeric value.""" """Get container health status as numeric value."""
try: health = container.attrs.get('State', {}).get('Health', {}).get('Status', None)
health = container.attrs.get('State', {}).get('Health', {}).get('Status', None) if health is None:
if health is None:
return None
health_map = {
'healthy': 2,
'unhealthy': 0,
'starting': 1,
'none': -1
}
return health_map.get(health.lower(), -1)
except:
return None return None
health_map = {
'healthy': 2,
'unhealthy': 0,
'starting': 1,
'none': -1
}
return health_map.get(health.lower(), -1)
def _calculate_cpu_percent(self, stats: dict) -> float: def _calculate_cpu_percent(self, stats: dict) -> float:
"""Calculate CPU usage percentage from stats.""" """Calculate CPU usage percentage from stats."""
try: cpu_stats = stats.get('cpu_stats', {})
cpu_stats = stats.get('cpu_stats', {}) precpu_stats = stats.get('precpu_stats', {})
precpu_stats = stats.get('precpu_stats', {})
cpu_delta = cpu_stats.get('cpu_usage', {}).get('total_usage', 0) - \
cpu_delta = cpu_stats.get('cpu_usage', {}).get('total_usage', 0) - \ precpu_stats.get('cpu_usage', {}).get('total_usage', 0)
precpu_stats.get('cpu_usage', {}).get('total_usage', 0)
system_delta = cpu_stats.get('system_cpu_usage', 0) - \
system_delta = cpu_stats.get('system_cpu_usage', 0) - \ precpu_stats.get('system_cpu_usage', 0)
precpu_stats.get('system_cpu_usage', 0)
online_cpus = cpu_stats.get('online_cpus', 0)
online_cpus = cpu_stats.get('online_cpus', 0) if online_cpus == 0:
if online_cpus == 0: online_cpus = len(cpu_stats.get('cpu_usage', {}).get('percpu_usage', [0]))
online_cpus = len(cpu_stats.get('cpu_usage', {}).get('percpu_usage', [0]))
if system_delta > 0 and cpu_delta > 0:
if system_delta > 0 and cpu_delta > 0: cpu_percent = (cpu_delta / system_delta) * online_cpus * 100.0
cpu_percent = (cpu_delta / system_delta) * online_cpus * 100.0 return round(cpu_percent, 2)
return round(cpu_percent, 2)
except Exception as e:
print(f"Warning: Error calculating CPU percent: {e}")
return 0.0 return 0.0
def _get_network_metrics(self, stats: dict) -> List[Dict[str, Any]]: def _get_network_metrics(self, stats: dict) -> List[Dict[str, Any]]:
"""Extract network metrics from stats.""" """Extract network metrics from stats."""
metrics = [] networks = stats.get('networks', {})
try:
networks = stats.get('networks', {})
total_rx_bytes = 0
total_tx_bytes = 0
total_rx_packets = 0
total_tx_packets = 0
for interface, net_stats in networks.items():
total_rx_bytes += net_stats.get('rx_bytes', 0)
total_tx_bytes += net_stats.get('tx_bytes', 0)
total_rx_packets += net_stats.get('rx_packets', 0)
total_tx_packets += net_stats.get('tx_packets', 0)
metrics.append({'name': 'network.rx_bytes', 'value': total_rx_bytes})
metrics.append({'name': 'network.tx_bytes', 'value': total_tx_bytes})
metrics.append({'name': 'network.rx_packets', 'value': total_rx_packets})
metrics.append({'name': 'network.tx_packets', 'value': total_tx_packets})
except Exception as e:
print(f"Warning: Error getting network metrics: {e}")
return metrics total_rx_bytes = 0
total_tx_bytes = 0
total_rx_packets = 0
total_tx_packets = 0
for net_stats in networks.values():
total_rx_bytes += net_stats.get('rx_bytes', 0)
total_tx_bytes += net_stats.get('tx_bytes', 0)
total_rx_packets += net_stats.get('rx_packets', 0)
total_tx_packets += net_stats.get('tx_packets', 0)
return [
{'name': 'network.rx_bytes', 'value': total_rx_bytes},
{'name': 'network.tx_bytes', 'value': total_tx_bytes},
{'name': 'network.rx_packets', 'value': total_rx_packets},
{'name': 'network.tx_packets', 'value': total_tx_packets}
]
def _get_io_metrics(self, stats: dict) -> List[Dict[str, Any]]: def _get_io_metrics(self, stats: dict) -> List[Dict[str, Any]]:
"""Extract block I/O metrics from stats.""" """Extract block I/O metrics from stats."""
metrics = [] blkio_stats = stats.get('blkio_stats', {})
try: io_service_bytes = blkio_stats.get('io_service_bytes_recursive', []) or []
blkio_stats = stats.get('blkio_stats', {})
io_service_bytes = blkio_stats.get('io_service_bytes_recursive', [])
# Handle case where io_service_bytes might be None
if io_service_bytes is None:
io_service_bytes = []
read_bytes = 0
write_bytes = 0
for entry in io_service_bytes:
op = entry.get('op', '')
value = entry.get('value', 0)
if op == 'Read':
read_bytes += value
elif op == 'Write':
write_bytes += value
metrics.append({'name': 'blkio.read_bytes', 'value': read_bytes})
metrics.append({'name': 'blkio.write_bytes', 'value': write_bytes})
except Exception as e:
print(f"Warning: Error getting I/O metrics: {e}")
return metrics read_bytes = 0
write_bytes = 0
for entry in io_service_bytes:
op = entry.get('op', '')
value = entry.get('value', 0)
if op == 'Read':
read_bytes += value
elif op == 'Write':
write_bytes += value
return [
{'name': 'blkio.read_bytes', 'value': read_bytes},
{'name': 'blkio.write_bytes', 'value': write_bytes}
]
def _get_container_disk_usage(self, container) -> int: def _get_container_disk_usage(self, container) -> int:
""" """Get the disk usage for a container in bytes using the Docker system df API."""
Get the disk usage for a container in bytes using the Docker system df API. df_data = self.client.df()
This provides accurate size information including writable layer and virtual size. containers_info = df_data.get('Containers', [])
"""
try: # Find the matching container by ID
# Use the system df API to get accurate container size information for container_info in containers_info:
# This is more reliable than container.attrs which often doesn't include size data if container_info.get('Id', '').startswith(container.id):
df_data = self.client.df() # Return SizeRootFs (total size) if available, otherwise SizeRw
containers_info = df_data.get('Containers', []) size_rootfs = container_info.get('SizeRootFs', 0)
size_rw = container_info.get('SizeRw', 0)
# Find the matching container by ID return size_rootfs if size_rootfs > 0 else size_rw
for container_info in containers_info:
if container_info.get('Id', '').startswith(container.id): return 0
# SizeRw: Size of files created or changed in the writable layer
size_rw = container_info.get('SizeRw', 0)
# SizeRootFs: Total size including all layers (image + writable)
size_rootfs = container_info.get('SizeRootFs', 0)
# Return SizeRootFs (total size) if available, otherwise SizeRw
return size_rootfs if size_rootfs > 0 else size_rw
# Container not found in df data
return 0
except Exception as e:
print(f"Warning: Error getting disk usage: {e}")
return None
+31 -40
View File
@@ -110,50 +110,41 @@ class SelfMetricsCollector(BaseCollector):
}) })
# Memory usage # Memory usage
try: mem_info = self.process.memory_info()
mem_info = self.process.memory_info() metrics.append({
metrics.append({ 'name': 'service.memory_rss_bytes',
'name': 'service.memory_rss_bytes', 'value': mem_info.rss,
'value': mem_info.rss, 'timestamp': timestamp
'timestamp': timestamp })
})
metrics.append({
metrics.append({ 'name': 'service.memory_vms_bytes',
'name': 'service.memory_vms_bytes', 'value': mem_info.vms,
'value': mem_info.vms, 'timestamp': timestamp
'timestamp': timestamp })
})
# Memory usage in MB for easier reading
# Memory usage in MB for easier reading metrics.append({
metrics.append({ 'name': 'service.memory_rss_mb',
'name': 'service.memory_rss_mb', 'value': round(mem_info.rss / (1024 * 1024), 2),
'value': round(mem_info.rss / (1024 * 1024), 2), 'timestamp': timestamp
'timestamp': timestamp })
})
except Exception as e:
pass # Skip memory metrics if we can't get them
# CPU usage # CPU usage
try: cpu_percent = self.process.cpu_percent()
cpu_percent = self.process.cpu_percent() metrics.append({
metrics.append({ 'name': 'service.cpu_percent',
'name': 'service.cpu_percent', 'value': round(cpu_percent, 2),
'value': round(cpu_percent, 2), 'timestamp': timestamp
'timestamp': timestamp })
})
except Exception as e:
pass # Skip CPU metrics if we can't get them
# Thread count # Thread count
try: num_threads = self.process.num_threads()
num_threads = self.process.num_threads() metrics.append({
metrics.append({ 'name': 'service.threads_count',
'name': 'service.threads_count', 'value': num_threads,
'value': num_threads, 'timestamp': timestamp
'timestamp': timestamp })
})
except Exception as e:
pass # Skip thread metrics if we can't get them
return metrics return metrics
+50 -108
View File
@@ -1,19 +1,18 @@
""" """
System-level Docker metrics collector - gathers disk usage and system-wide statistics. System-level Docker metrics collector - gathers disk usage and system-wide statistics.
""" """
import docker
import subprocess import subprocess
import re
import time import time
from typing import Dict, List, Any from typing import Dict, List, Any
from .base import BaseCollector from .base import BaseCollector
from utils import parse_size_from_line, safe_int
class SystemCollector(BaseCollector): class SystemCollector(BaseCollector):
"""Collects system-level Docker metrics using 'docker system df'.""" """Collects system-level Docker metrics using 'docker system df'."""
def __init__(self): def __init__(self):
self.client = docker.from_env() self.client = self.get_docker_client()
def get_name(self) -> str: def get_name(self) -> str:
return "system" return "system"
@@ -28,49 +27,45 @@ class SystemCollector(BaseCollector):
metrics = [] metrics = []
timestamp = int(time.time()) timestamp = int(time.time())
try: # Get Docker info
# Get Docker info info = self.client.info()
info = self.client.info()
# System-wide container counts
# System-wide container counts metrics.append({
metrics.append({ 'name': 'system.containers.total',
'name': 'system.containers.total', 'value': info.get('Containers', 0),
'value': info.get('Containers', 0), 'timestamp': timestamp
'timestamp': timestamp })
})
metrics.append({
metrics.append({ 'name': 'system.containers.running',
'name': 'system.containers.running', 'value': info.get('ContainersRunning', 0),
'value': info.get('ContainersRunning', 0), 'timestamp': timestamp
'timestamp': timestamp })
})
metrics.append({
metrics.append({ 'name': 'system.containers.paused',
'name': 'system.containers.paused', 'value': info.get('ContainersPaused', 0),
'value': info.get('ContainersPaused', 0), 'timestamp': timestamp
'timestamp': timestamp })
})
metrics.append({
metrics.append({ 'name': 'system.containers.stopped',
'name': 'system.containers.stopped', 'value': info.get('ContainersStopped', 0),
'value': info.get('ContainersStopped', 0), 'timestamp': timestamp
'timestamp': timestamp })
})
# Image count
# Image count metrics.append({
metrics.append({ 'name': 'system.images.total',
'name': 'system.images.total', 'value': info.get('Images', 0),
'value': info.get('Images', 0), 'timestamp': timestamp
'timestamp': timestamp })
})
# Parse docker system df -v output for detailed metrics
# Parse docker system df -v output for detailed metrics df_metrics = self._parse_docker_df()
df_metrics = self._parse_docker_df() metrics.extend(df_metrics)
metrics.extend(df_metrics)
except Exception as e:
print(f"Error collecting system metrics: {e}")
return metrics return metrics
def _parse_docker_df(self) -> List[Dict[str, Any]]: def _parse_docker_df(self) -> List[Dict[str, Any]]:
@@ -142,14 +137,14 @@ class SystemCollector(BaseCollector):
continue continue
# Parse image line to get size # Parse image line to get size
size_bytes = self._parse_size_from_line(line) size_bytes = parse_size_from_line(line)
if size_bytes > 0: if size_bytes > 0:
total_size += size_bytes total_size += size_bytes
# Check if image has containers # Check if image has containers
parts = line.split() parts = line.split()
if len(parts) >= 8: if len(parts) >= 8:
containers_count = self._parse_number(parts[-1]) containers_count = safe_int(parts[-1])
if containers_count > 0: if containers_count > 0:
active_images += 1 active_images += 1
@@ -194,18 +189,11 @@ class SystemCollector(BaseCollector):
# Format: CONTAINER ID IMAGE COMMAND LOCAL VOLUMES SIZE CREATED STATUS NAMES # Format: CONTAINER ID IMAGE COMMAND LOCAL VOLUMES SIZE CREATED STATUS NAMES
parts = line.split() parts = line.split()
if len(parts) >= 6: if len(parts) >= 6:
# SIZE is typically at index 4 # Parse container size
size_str = None size_bytes = parse_size_from_line(line)
for i, part in enumerate(parts): if size_bytes > 0:
if 'B' in part and i > 3: total_size += size_bytes
size_str = part container_count += 1
break
if size_str:
size_bytes = self._parse_size_string(size_str)
if size_bytes > 0:
total_size += size_bytes
container_count += 1
metrics.append({ metrics.append({
'name': 'system.containers.total_size_bytes', 'name': 'system.containers.total_size_bytes',
@@ -240,7 +228,7 @@ class SystemCollector(BaseCollector):
continue continue
# Parse volume line # Parse volume line
size_bytes = self._parse_size_from_line(line) size_bytes = parse_size_from_line(line)
if size_bytes > 0: if size_bytes > 0:
total_size += size_bytes total_size += size_bytes
volume_count += 1 volume_count += 1
@@ -260,50 +248,4 @@ class SystemCollector(BaseCollector):
return metrics return metrics
def _parse_size_from_line(self, line: str) -> int:
"""Extract size in bytes from a line containing size information."""
# Look for patterns like "1.33GB", "443MB", "23.98kB"
size_pattern = r'(\d+(?:\.\d+)?)\s*(GB|MB|KB|kB|B)'
matches = re.findall(size_pattern, line, re.IGNORECASE)
if matches:
# Return the largest size found (typically the first SIZE column)
sizes = [self._parse_size_string(f"{num}{unit}") for num, unit in matches]
return max(sizes) if sizes else 0
return 0
def _parse_size_string(self, size_str: str) -> int:
"""Convert size string (e.g., '1.33GB') to bytes."""
if not size_str or size_str == '0B':
return 0
try:
size_str = size_str.strip().upper()
units = {
'B': 1,
'KB': 1024,
'MB': 1024 ** 2,
'GB': 1024 ** 3,
'TB': 1024 ** 4
}
# Extract number and unit
match = re.match(r'(\d+(?:\.\d+)?)\s*([KMGT]?B)', size_str)
if match:
number = float(match.group(1))
unit = match.group(2)
return int(number * units.get(unit, 1))
except Exception as e:
print(f"Warning: Could not parse size string '{size_str}': {e}")
return 0
def _parse_number(self, s: str) -> int:
"""Parse a number from a string, return 0 if not a number."""
try:
return int(s)
except:
return 0
+17 -30
View File
@@ -1,7 +1,6 @@
""" """
Volume metrics collector - gathers storage usage and volume metadata. Volume metrics collector - gathers storage usage and volume metadata.
""" """
import docker
import time import time
from typing import Dict, List, Any from typing import Dict, List, Any
from .base import BaseCollector from .base import BaseCollector
@@ -11,7 +10,7 @@ class VolumeCollector(BaseCollector):
"""Collects metrics from Docker volumes.""" """Collects metrics from Docker volumes."""
def __init__(self): def __init__(self):
self.client = docker.from_env() self.client = self.get_docker_client()
def get_name(self) -> str: def get_name(self) -> str:
return "volume" return "volume"
@@ -26,18 +25,12 @@ class VolumeCollector(BaseCollector):
metrics = [] metrics = []
timestamp = int(time.time()) timestamp = int(time.time())
try: volumes = self.client.volumes.list()
volumes = self.client.volumes.list()
for volume in volumes:
for volume in volumes: try:
volume_name = volume.name volume_name = volume.name
# Get volume details
volume_attrs = volume.attrs
# Driver info
driver = volume_attrs.get('Driver', 'unknown')
# Count containers using this volume # Count containers using this volume
containers_using = self._get_containers_using_volume(volume_name) containers_using = self._get_containers_using_volume(volume_name)
@@ -47,34 +40,28 @@ class VolumeCollector(BaseCollector):
'timestamp': timestamp 'timestamp': timestamp
}) })
# Volume labels count (useful for tracking metadata complexity) # Volume labels count
labels = volume_attrs.get('Labels', {}) or {} labels = volume.attrs.get('Labels', {}) or {}
metrics.append({ metrics.append({
'name': f'volumes.{volume_name}.labels_count', 'name': f'volumes.{volume_name}.labels_count',
'value': len(labels), 'value': len(labels),
'timestamp': timestamp 'timestamp': timestamp
}) })
except Exception as e:
except Exception as e: print(f" Warning: Error collecting metrics for volume {volume.name}: {e}")
print(f"Error collecting volume metrics: {e}")
return metrics return metrics
def _get_containers_using_volume(self, volume_name: str) -> List[str]: def _get_containers_using_volume(self, volume_name: str) -> List[str]:
"""Find all containers using a specific volume.""" """Find all containers using a specific volume."""
containers_using = [] containers_using = []
all_containers = self.client.containers.list(all=True)
try: for container in all_containers:
all_containers = self.client.containers.list(all=True) mounts = container.attrs.get('Mounts', [])
for mount in mounts:
for container in all_containers: if mount.get('Type') == 'volume' and mount.get('Name') == volume_name:
mounts = container.attrs.get('Mounts', []) containers_using.append(container.name)
for mount in mounts: break
if mount.get('Type') == 'volume' and mount.get('Name') == volume_name:
containers_using.append(container.name)
break
except Exception as e:
print(f"Warning: Error finding containers for volume {volume_name}: {e}")
return containers_using return containers_using
+4 -30
View File
@@ -4,6 +4,7 @@ Graphite exporter - sends metrics to Graphite in plaintext protocol format.
import socket import socket
import time import time
from typing import List, Dict, Any from typing import List, Dict, Any
from utils import sanitize_metric_name
class GraphiteExporter: class GraphiteExporter:
@@ -54,22 +55,15 @@ class GraphiteExporter:
try: try:
# Format metrics in Graphite plaintext protocol # Format metrics in Graphite plaintext protocol
# Format: metric_path value timestamp\n
lines = [] lines = []
for metric in metrics: for metric in metrics:
name = metric.get('name', '') name = sanitize_metric_name(metric.get('name', ''))
value = metric.get('value', 0) value = metric.get('value', 0)
timestamp = metric.get('timestamp', int(time.time())) timestamp = metric.get('timestamp', int(time.time()))
# Sanitize metric name # Build full metric path and format: metric_path value timestamp
name = self._sanitize_metric_name(name)
# Build full metric path
full_name = f"{self.prefix}.{name}" full_name = f"{self.prefix}.{name}"
lines.append(f"{full_name} {value} {timestamp}\n")
# Format: metric_path value timestamp
line = f"{full_name} {value} {timestamp}\n"
lines.append(line)
message = ''.join(lines) message = ''.join(lines)
@@ -80,26 +74,6 @@ class GraphiteExporter:
print(f"Error exporting metrics to Graphite: {e}") print(f"Error exporting metrics to Graphite: {e}")
return False return False
def _sanitize_metric_name(self, name: str) -> str:
"""
Sanitize metric name for Graphite.
Replace invalid characters with underscores.
"""
# Replace spaces and special characters
sanitized = name.replace(' ', '_')
sanitized = ''.join(c if c.isalnum() or c in '.-_' else '_' for c in sanitized)
# Remove consecutive dots or underscores
while '..' in sanitized:
sanitized = sanitized.replace('..', '.')
while '__' in sanitized:
sanitized = sanitized.replace('__', '_')
# Remove leading/trailing dots or underscores
sanitized = sanitized.strip('._')
return sanitized
def _send_to_graphite(self, message: str) -> bool: def _send_to_graphite(self, message: str) -> bool:
"""Send message to Graphite via TCP socket.""" """Send message to Graphite via TCP socket."""
sock = None sock = None
+10 -16
View File
@@ -95,30 +95,24 @@ class DockerMetricsCollector:
# Collect from all collectors # Collect from all collectors
all_metrics = [] all_metrics = []
for collector in self.collectors: for collector in self.collectors:
collector_name = collector.get_name()
try: try:
collector_name = collector.get_name()
metrics = collector.collect() metrics = collector.collect()
all_metrics.extend(metrics) all_metrics.extend(metrics)
print(f" - {collector_name}: {len(metrics)} metrics") print(f" - {collector_name}: {len(metrics)} metrics")
# Track collector success
self.self_metrics.record_collector_success(collector_name) self.self_metrics.record_collector_success(collector_name)
except Exception as e: except Exception as e:
print(f" - Error in {collector.get_name()} collector: {e}") print(f" - {collector_name} error: {e}")
# Track collector error self.self_metrics.record_collector_error(collector_name)
self.self_metrics.record_collector_error(collector.get_name())
# Aggregate metrics # Aggregate metrics
try: aggregated_metrics = self.aggregator.aggregate(all_metrics)
aggregated_metrics = self.aggregator.aggregate(all_metrics) added_count = len(aggregated_metrics) - len(all_metrics)
added_count = len(aggregated_metrics) - len(all_metrics) if added_count > 0:
if added_count > 0: print(f" - aggregator: {added_count} metrics")
print(f" - aggregator: {added_count} additional metrics") all_metrics = aggregated_metrics
all_metrics = aggregated_metrics
except Exception as e:
print(f" - Error in aggregator: {e}")
print(f" Total: {len(all_metrics)} metrics collected") print(f" Total: {len(all_metrics)} metrics")
# Export to all exporters # Export to all exporters
for exporter in self.exporters: for exporter in self.exporters:
@@ -126,7 +120,7 @@ class DockerMetricsCollector:
exporter.export(all_metrics) exporter.export(all_metrics)
self.self_metrics.record_export_success() self.self_metrics.record_export_success()
except Exception as e: except Exception as e:
print(f" - Error exporting: {e}") print(f" - Export error: {e}")
self.self_metrics.record_export_error() self.self_metrics.record_export_error()
elapsed = time.time() - start_time elapsed = time.time() - start_time
+126
View File
@@ -0,0 +1,126 @@
"""
Common utilities for Docker metrics collection.
"""
import re
from typing import Any, Callable
from functools import wraps
def sanitize_metric_name(name: str) -> str:
"""
Sanitize metric name for Graphite/monitoring systems.
Replace invalid characters with underscores.
Args:
name: Raw metric name
Returns:
Sanitized metric name
"""
# Replace spaces and special characters
sanitized = name.replace(' ', '_')
sanitized = ''.join(c if c.isalnum() or c in '.-_' else '_' for c in sanitized)
# Remove consecutive dots or underscores
while '..' in sanitized:
sanitized = sanitized.replace('..', '.')
while '__' in sanitized:
sanitized = sanitized.replace('__', '_')
# Remove leading/trailing dots or underscores
return sanitized.strip('._')
def parse_size_string(size_str: str) -> int:
"""
Convert size string (e.g., '1.33GB', '443MB') to bytes.
Args:
size_str: Size string with unit
Returns:
Size in bytes, or 0 if parsing fails
"""
if not size_str or size_str == '0B':
return 0
try:
size_str = size_str.strip().upper()
units = {
'B': 1,
'KB': 1024,
'MB': 1024 ** 2,
'GB': 1024 ** 3,
'TB': 1024 ** 4
}
# Extract number and unit
match = re.match(r'(\d+(?:\.\d+)?)\s*([KMGT]?B)', size_str)
if match:
number = float(match.group(1))
unit = match.group(2)
return int(number * units.get(unit, 1))
except Exception:
pass
return 0
def parse_size_from_line(line: str) -> int:
"""
Extract the largest size in bytes from a line containing size information.
Looks for patterns like "1.33GB", "443MB", "23.98kB".
Args:
line: Text line containing size information
Returns:
Largest size found in bytes, or 0 if none found
"""
size_pattern = r'(\d+(?:\.\d+)?)\s*(GB|MB|KB|kB|B)'
matches = re.findall(size_pattern, line, re.IGNORECASE)
if matches:
sizes = [parse_size_string(f"{num}{unit}") for num, unit in matches]
return max(sizes) if sizes else 0
return 0
def safe_int(value: Any, default: int = 0) -> int:
"""
Safely convert value to int, returning default on failure.
Args:
value: Value to convert
default: Default value if conversion fails
Returns:
Integer value or default
"""
try:
return int(value)
except (ValueError, TypeError):
return default
def handle_collector_errors(collector_name: str = None):
"""
Decorator to handle errors in collector methods gracefully.
Args:
collector_name: Name of the collector for error messages
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
name = collector_name or func.__name__
print(f"Warning: Error in {name}: {e}")
return [] if func.__name__ == 'collect' else None
return wrapper
return decorator