diff --git a/src/aggregator.py b/src/aggregator.py index 8a0603f..2973c27 100644 --- a/src/aggregator.py +++ b/src/aggregator.py @@ -18,57 +18,25 @@ class MetricsAggregator: Returns: Enhanced metrics list with aggregations """ + if not all_metrics: + return all_metrics + aggregated = list(all_metrics) # Start with original metrics + timestamp = all_metrics[0].get('timestamp', 0) - # Add container-level aggregations - container_aggs = self._aggregate_by_container(all_metrics) - aggregated.extend(container_aggs) - - # Add volume-level aggregations - volume_aggs = self._aggregate_by_volume(all_metrics) + # Add volume-level aggregations (most useful) + volume_aggs = self._aggregate_volumes(all_metrics, timestamp) aggregated.extend(volume_aggs) # Add system-level aggregations - system_aggs = self._aggregate_system_metrics(all_metrics) + system_aggs = self._aggregate_system(all_metrics, timestamp) aggregated.extend(system_aggs) return aggregated - def _aggregate_by_container(self, metrics: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """Create container-level aggregations.""" - aggregations = [] - - # Group metrics by container - container_metrics = defaultdict(list) - for metric in metrics: - name = metric.get('name', '') - if name.startswith('containers.'): - parts = name.split('.') - if len(parts) >= 2: - container_name = parts[1] - container_metrics[container_name].append(metric) - - # For each container, create aggregations - for container_name, cmets in container_metrics.items(): - timestamp = cmets[0].get('timestamp') if cmets else 0 - - # Count different metric types for this container - metric_count = len(cmets) - - aggregations.append({ - 'name': f'aggregated.containers.{container_name}.metric_count', - 'value': metric_count, - 'timestamp': timestamp - }) - - return aggregations - - def _aggregate_by_volume(self, metrics: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + def _aggregate_volumes(self, metrics: List[Dict[str, Any]], timestamp: int) -> List[Dict[str, Any]]: """Create volume-level aggregations.""" - aggregations = [] - - # Find all volume metrics - volume_containers = defaultdict(int) + volume_containers = {} for metric in metrics: name = metric.get('name', '') @@ -76,67 +44,49 @@ class MetricsAggregator: parts = name.split('.') if len(parts) >= 2: volume_name = parts[1] - count = metric.get('value', 0) - volume_containers[volume_name] = count + volume_containers[volume_name] = metric.get('value', 0) - # Create summary metrics - if volume_containers: - timestamp = metrics[0].get('timestamp', 0) if metrics else 0 - - # Total volumes - aggregations.append({ + if not volume_containers: + return [] + + volumes_in_use = sum(1 for count in volume_containers.values() if count > 0) + + return [ + { 'name': 'aggregated.volumes.total_count', 'value': len(volume_containers), 'timestamp': timestamp - }) - - # Volumes in use (with at least one container) - volumes_in_use = sum(1 for count in volume_containers.values() if count > 0) - aggregations.append({ + }, + { 'name': 'aggregated.volumes.in_use_count', 'value': volumes_in_use, 'timestamp': timestamp - }) - - # Unused volumes - aggregations.append({ + }, + { 'name': 'aggregated.volumes.unused_count', 'value': len(volume_containers) - volumes_in_use, 'timestamp': timestamp - }) - - return aggregations + } + ] - def _aggregate_system_metrics(self, metrics: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + def _aggregate_system(self, metrics: List[Dict[str, Any]], timestamp: int) -> List[Dict[str, Any]]: """Create system-level aggregations.""" - aggregations = [] - - # Find system metrics total_containers = 0 running_containers = 0 - total_images = 0 for metric in metrics: name = metric.get('name', '') - value = metric.get('value', 0) - if name == 'system.containers.total': - total_containers = value + total_containers = metric.get('value', 0) elif name == 'system.containers.running': - running_containers = value - elif name == 'system.images.total': - total_images = value + running_containers = metric.get('value', 0) - if total_containers > 0 or total_images > 0: - timestamp = metrics[0].get('timestamp', 0) if metrics else 0 - - # Container utilization percentage - if total_containers > 0: - utilization = (running_containers / total_containers) * 100 - aggregations.append({ - 'name': 'aggregated.system.container_utilization_percent', - 'value': round(utilization, 2), - 'timestamp': timestamp - }) + if total_containers > 0: + utilization = (running_containers / total_containers) * 100 + return [{ + 'name': 'aggregated.system.container_utilization_percent', + 'value': round(utilization, 2), + 'timestamp': timestamp + }] - return aggregations + return [] diff --git a/src/collectors/base.py b/src/collectors/base.py index 1d28649..31237b4 100644 --- a/src/collectors/base.py +++ b/src/collectors/base.py @@ -2,12 +2,23 @@ Base collector interface for Docker metrics collection. """ from abc import ABC, abstractmethod -from typing import Dict, List, Any +from typing import Dict, List, Any, Optional +import docker class BaseCollector(ABC): """Abstract base class for all metric collectors.""" + # Shared Docker client instance across all collectors + _shared_client: Optional[docker.DockerClient] = None + + @classmethod + def get_docker_client(cls) -> docker.DockerClient: + """Get or create shared Docker client instance.""" + if cls._shared_client is None: + cls._shared_client = docker.from_env() + return cls._shared_client + @abstractmethod def collect(self) -> List[Dict[str, Any]]: """ diff --git a/src/collectors/container_collector.py b/src/collectors/container_collector.py index bdf8a03..8cf7179 100644 --- a/src/collectors/container_collector.py +++ b/src/collectors/container_collector.py @@ -1,7 +1,6 @@ """ Container metrics collector - gathers CPU, memory, state, and health metrics. """ -import docker import time from typing import Dict, List, Any from .base import BaseCollector @@ -11,7 +10,7 @@ class ContainerCollector(BaseCollector): """Collects metrics from Docker containers.""" def __init__(self): - self.client = docker.from_env() + self.client = self.get_docker_client() def get_name(self) -> str: return "container" @@ -26,10 +25,10 @@ class ContainerCollector(BaseCollector): metrics = [] timestamp = int(time.time()) - try: - containers = self.client.containers.list(all=True) - - for container in containers: + containers = self.client.containers.list(all=True) + + for container in containers: + try: container_name = container.name # Basic state metrics @@ -60,71 +59,61 @@ class ContainerCollector(BaseCollector): }) # Disk usage metrics (available for all containers) - try: - disk_usage = self._get_container_disk_usage(container) - if disk_usage is not None: - metrics.append({ - 'name': f'containers.{container_name}.disk_usage_bytes', - 'value': disk_usage, - 'timestamp': timestamp - }) - except Exception as e: - print(f"Warning: Could not collect disk usage for {container_name}: {e}") + disk_usage = self._get_container_disk_usage(container) + if disk_usage is not None: + metrics.append({ + 'name': f'containers.{container_name}.disk_usage_bytes', + 'value': disk_usage, + 'timestamp': timestamp + }) # Only collect resource metrics for running containers if state == 'running': - try: - stats = container.stats(stream=False) - - # CPU metrics - cpu_percent = self._calculate_cpu_percent(stats) + stats = container.stats(stream=False) + + # CPU metrics + cpu_percent = self._calculate_cpu_percent(stats) + metrics.append({ + 'name': f'containers.{container_name}.cpu_percent', + 'value': cpu_percent, + 'timestamp': timestamp + }) + + # Memory metrics + memory_stats = stats.get('memory_stats', {}) + memory_usage = memory_stats.get('usage', 0) + memory_limit = memory_stats.get('limit', 1) + memory_percent = (memory_usage / memory_limit) * 100 if memory_limit > 0 else 0 + + metrics.append({ + 'name': f'containers.{container_name}.memory_bytes', + 'value': memory_usage, + 'timestamp': timestamp + }) + + metrics.append({ + 'name': f'containers.{container_name}.memory_percent', + 'value': round(memory_percent, 2), + 'timestamp': timestamp + }) + + # Network metrics + for net_metric in self._get_network_metrics(stats): metrics.append({ - 'name': f'containers.{container_name}.cpu_percent', - 'value': cpu_percent, + 'name': f'containers.{container_name}.{net_metric["name"]}', + 'value': net_metric['value'], 'timestamp': timestamp }) - - # Memory metrics - memory_stats = stats.get('memory_stats', {}) - memory_usage = memory_stats.get('usage', 0) - memory_limit = memory_stats.get('limit', 1) - memory_percent = (memory_usage / memory_limit) * 100 if memory_limit > 0 else 0 - + + # Block I/O metrics + for io_metric in self._get_io_metrics(stats): metrics.append({ - 'name': f'containers.{container_name}.memory_bytes', - 'value': memory_usage, + 'name': f'containers.{container_name}.{io_metric["name"]}', + 'value': io_metric['value'], 'timestamp': timestamp }) - - metrics.append({ - 'name': f'containers.{container_name}.memory_percent', - 'value': memory_percent, - 'timestamp': timestamp - }) - - # Network metrics - network_metrics = self._get_network_metrics(stats) - for net_metric in network_metrics: - metrics.append({ - 'name': f'containers.{container_name}.{net_metric["name"]}', - 'value': net_metric['value'], - 'timestamp': timestamp - }) - - # Block I/O metrics - io_metrics = self._get_io_metrics(stats) - for io_metric in io_metrics: - metrics.append({ - 'name': f'containers.{container_name}.{io_metric["name"]}', - 'value': io_metric['value'], - 'timestamp': timestamp - }) - - except Exception as e: - print(f"Warning: Could not collect stats for {container_name}: {e}") - - except Exception as e: - print(f"Error collecting container metrics: {e}") + except Exception as e: + print(f" Warning: Error collecting metrics for {container.name}: {e}") return metrics @@ -142,128 +131,94 @@ class ContainerCollector(BaseCollector): def _get_health_status(self, container) -> int: """Get container health status as numeric value.""" - try: - health = container.attrs.get('State', {}).get('Health', {}).get('Status', None) - if health is None: - return None - - health_map = { - 'healthy': 2, - 'unhealthy': 0, - 'starting': 1, - 'none': -1 - } - return health_map.get(health.lower(), -1) - except: + health = container.attrs.get('State', {}).get('Health', {}).get('Status', None) + if health is None: return None + + health_map = { + 'healthy': 2, + 'unhealthy': 0, + 'starting': 1, + 'none': -1 + } + return health_map.get(health.lower(), -1) def _calculate_cpu_percent(self, stats: dict) -> float: """Calculate CPU usage percentage from stats.""" - try: - cpu_stats = stats.get('cpu_stats', {}) - precpu_stats = stats.get('precpu_stats', {}) - - cpu_delta = cpu_stats.get('cpu_usage', {}).get('total_usage', 0) - \ - precpu_stats.get('cpu_usage', {}).get('total_usage', 0) - - system_delta = cpu_stats.get('system_cpu_usage', 0) - \ - precpu_stats.get('system_cpu_usage', 0) - - online_cpus = cpu_stats.get('online_cpus', 0) - if online_cpus == 0: - online_cpus = len(cpu_stats.get('cpu_usage', {}).get('percpu_usage', [0])) - - if system_delta > 0 and cpu_delta > 0: - cpu_percent = (cpu_delta / system_delta) * online_cpus * 100.0 - return round(cpu_percent, 2) - except Exception as e: - print(f"Warning: Error calculating CPU percent: {e}") + cpu_stats = stats.get('cpu_stats', {}) + precpu_stats = stats.get('precpu_stats', {}) + + cpu_delta = cpu_stats.get('cpu_usage', {}).get('total_usage', 0) - \ + precpu_stats.get('cpu_usage', {}).get('total_usage', 0) + + system_delta = cpu_stats.get('system_cpu_usage', 0) - \ + precpu_stats.get('system_cpu_usage', 0) + + online_cpus = cpu_stats.get('online_cpus', 0) + if online_cpus == 0: + online_cpus = len(cpu_stats.get('cpu_usage', {}).get('percpu_usage', [0])) + + if system_delta > 0 and cpu_delta > 0: + cpu_percent = (cpu_delta / system_delta) * online_cpus * 100.0 + return round(cpu_percent, 2) return 0.0 def _get_network_metrics(self, stats: dict) -> List[Dict[str, Any]]: """Extract network metrics from stats.""" - metrics = [] - try: - networks = stats.get('networks', {}) - - total_rx_bytes = 0 - total_tx_bytes = 0 - total_rx_packets = 0 - total_tx_packets = 0 - - for interface, net_stats in networks.items(): - total_rx_bytes += net_stats.get('rx_bytes', 0) - total_tx_bytes += net_stats.get('tx_bytes', 0) - total_rx_packets += net_stats.get('rx_packets', 0) - total_tx_packets += net_stats.get('tx_packets', 0) - - metrics.append({'name': 'network.rx_bytes', 'value': total_rx_bytes}) - metrics.append({'name': 'network.tx_bytes', 'value': total_tx_bytes}) - metrics.append({'name': 'network.rx_packets', 'value': total_rx_packets}) - metrics.append({'name': 'network.tx_packets', 'value': total_tx_packets}) - - except Exception as e: - print(f"Warning: Error getting network metrics: {e}") + networks = stats.get('networks', {}) - return metrics + total_rx_bytes = 0 + total_tx_bytes = 0 + total_rx_packets = 0 + total_tx_packets = 0 + + for net_stats in networks.values(): + total_rx_bytes += net_stats.get('rx_bytes', 0) + total_tx_bytes += net_stats.get('tx_bytes', 0) + total_rx_packets += net_stats.get('rx_packets', 0) + total_tx_packets += net_stats.get('tx_packets', 0) + + return [ + {'name': 'network.rx_bytes', 'value': total_rx_bytes}, + {'name': 'network.tx_bytes', 'value': total_tx_bytes}, + {'name': 'network.rx_packets', 'value': total_rx_packets}, + {'name': 'network.tx_packets', 'value': total_tx_packets} + ] def _get_io_metrics(self, stats: dict) -> List[Dict[str, Any]]: """Extract block I/O metrics from stats.""" - metrics = [] - try: - blkio_stats = stats.get('blkio_stats', {}) - io_service_bytes = blkio_stats.get('io_service_bytes_recursive', []) - - # Handle case where io_service_bytes might be None - if io_service_bytes is None: - io_service_bytes = [] - - read_bytes = 0 - write_bytes = 0 - - for entry in io_service_bytes: - op = entry.get('op', '') - value = entry.get('value', 0) - - if op == 'Read': - read_bytes += value - elif op == 'Write': - write_bytes += value - - metrics.append({'name': 'blkio.read_bytes', 'value': read_bytes}) - metrics.append({'name': 'blkio.write_bytes', 'value': write_bytes}) - - except Exception as e: - print(f"Warning: Error getting I/O metrics: {e}") + blkio_stats = stats.get('blkio_stats', {}) + io_service_bytes = blkio_stats.get('io_service_bytes_recursive', []) or [] - return metrics + read_bytes = 0 + write_bytes = 0 + + for entry in io_service_bytes: + op = entry.get('op', '') + value = entry.get('value', 0) + + if op == 'Read': + read_bytes += value + elif op == 'Write': + write_bytes += value + + return [ + {'name': 'blkio.read_bytes', 'value': read_bytes}, + {'name': 'blkio.write_bytes', 'value': write_bytes} + ] def _get_container_disk_usage(self, container) -> int: - """ - Get the disk usage for a container in bytes using the Docker system df API. - This provides accurate size information including writable layer and virtual size. - """ - try: - # Use the system df API to get accurate container size information - # This is more reliable than container.attrs which often doesn't include size data - df_data = self.client.df() - containers_info = df_data.get('Containers', []) - - # Find the matching container by ID - for container_info in containers_info: - if container_info.get('Id', '').startswith(container.id): - # SizeRw: Size of files created or changed in the writable layer - size_rw = container_info.get('SizeRw', 0) - # SizeRootFs: Total size including all layers (image + writable) - size_rootfs = container_info.get('SizeRootFs', 0) - - # Return SizeRootFs (total size) if available, otherwise SizeRw - return size_rootfs if size_rootfs > 0 else size_rw - - # Container not found in df data - return 0 - - except Exception as e: - print(f"Warning: Error getting disk usage: {e}") - return None + """Get the disk usage for a container in bytes using the Docker system df API.""" + df_data = self.client.df() + containers_info = df_data.get('Containers', []) + + # Find the matching container by ID + for container_info in containers_info: + if container_info.get('Id', '').startswith(container.id): + # Return SizeRootFs (total size) if available, otherwise SizeRw + size_rootfs = container_info.get('SizeRootFs', 0) + size_rw = container_info.get('SizeRw', 0) + return size_rootfs if size_rootfs > 0 else size_rw + + return 0 diff --git a/src/collectors/self_collector.py b/src/collectors/self_collector.py index d54ccb5..7b6c302 100644 --- a/src/collectors/self_collector.py +++ b/src/collectors/self_collector.py @@ -110,50 +110,41 @@ class SelfMetricsCollector(BaseCollector): }) # Memory usage - try: - mem_info = self.process.memory_info() - metrics.append({ - 'name': 'service.memory_rss_bytes', - 'value': mem_info.rss, - 'timestamp': timestamp - }) - - metrics.append({ - 'name': 'service.memory_vms_bytes', - 'value': mem_info.vms, - 'timestamp': timestamp - }) - - # Memory usage in MB for easier reading - metrics.append({ - 'name': 'service.memory_rss_mb', - 'value': round(mem_info.rss / (1024 * 1024), 2), - 'timestamp': timestamp - }) - except Exception as e: - pass # Skip memory metrics if we can't get them + mem_info = self.process.memory_info() + metrics.append({ + 'name': 'service.memory_rss_bytes', + 'value': mem_info.rss, + 'timestamp': timestamp + }) + + metrics.append({ + 'name': 'service.memory_vms_bytes', + 'value': mem_info.vms, + 'timestamp': timestamp + }) + + # Memory usage in MB for easier reading + metrics.append({ + 'name': 'service.memory_rss_mb', + 'value': round(mem_info.rss / (1024 * 1024), 2), + 'timestamp': timestamp + }) # CPU usage - try: - cpu_percent = self.process.cpu_percent() - metrics.append({ - 'name': 'service.cpu_percent', - 'value': round(cpu_percent, 2), - 'timestamp': timestamp - }) - except Exception as e: - pass # Skip CPU metrics if we can't get them + cpu_percent = self.process.cpu_percent() + metrics.append({ + 'name': 'service.cpu_percent', + 'value': round(cpu_percent, 2), + 'timestamp': timestamp + }) # Thread count - try: - num_threads = self.process.num_threads() - metrics.append({ - 'name': 'service.threads_count', - 'value': num_threads, - 'timestamp': timestamp - }) - except Exception as e: - pass # Skip thread metrics if we can't get them + num_threads = self.process.num_threads() + metrics.append({ + 'name': 'service.threads_count', + 'value': num_threads, + 'timestamp': timestamp + }) return metrics diff --git a/src/collectors/system_collector.py b/src/collectors/system_collector.py index ecc9156..4805f60 100644 --- a/src/collectors/system_collector.py +++ b/src/collectors/system_collector.py @@ -1,19 +1,18 @@ """ System-level Docker metrics collector - gathers disk usage and system-wide statistics. """ -import docker import subprocess -import re import time from typing import Dict, List, Any from .base import BaseCollector +from utils import parse_size_from_line, safe_int class SystemCollector(BaseCollector): """Collects system-level Docker metrics using 'docker system df'.""" def __init__(self): - self.client = docker.from_env() + self.client = self.get_docker_client() def get_name(self) -> str: return "system" @@ -28,49 +27,45 @@ class SystemCollector(BaseCollector): metrics = [] timestamp = int(time.time()) - try: - # Get Docker info - info = self.client.info() - - # System-wide container counts - metrics.append({ - 'name': 'system.containers.total', - 'value': info.get('Containers', 0), - 'timestamp': timestamp - }) - - metrics.append({ - 'name': 'system.containers.running', - 'value': info.get('ContainersRunning', 0), - 'timestamp': timestamp - }) - - metrics.append({ - 'name': 'system.containers.paused', - 'value': info.get('ContainersPaused', 0), - 'timestamp': timestamp - }) - - metrics.append({ - 'name': 'system.containers.stopped', - 'value': info.get('ContainersStopped', 0), - 'timestamp': timestamp - }) - - # Image count - metrics.append({ - 'name': 'system.images.total', - 'value': info.get('Images', 0), - 'timestamp': timestamp - }) - - # Parse docker system df -v output for detailed metrics - df_metrics = self._parse_docker_df() - metrics.extend(df_metrics) - - except Exception as e: - print(f"Error collecting system metrics: {e}") - + # Get Docker info + info = self.client.info() + + # System-wide container counts + metrics.append({ + 'name': 'system.containers.total', + 'value': info.get('Containers', 0), + 'timestamp': timestamp + }) + + metrics.append({ + 'name': 'system.containers.running', + 'value': info.get('ContainersRunning', 0), + 'timestamp': timestamp + }) + + metrics.append({ + 'name': 'system.containers.paused', + 'value': info.get('ContainersPaused', 0), + 'timestamp': timestamp + }) + + metrics.append({ + 'name': 'system.containers.stopped', + 'value': info.get('ContainersStopped', 0), + 'timestamp': timestamp + }) + + # Image count + metrics.append({ + 'name': 'system.images.total', + 'value': info.get('Images', 0), + 'timestamp': timestamp + }) + + # Parse docker system df -v output for detailed metrics + df_metrics = self._parse_docker_df() + metrics.extend(df_metrics) + return metrics def _parse_docker_df(self) -> List[Dict[str, Any]]: @@ -142,14 +137,14 @@ class SystemCollector(BaseCollector): continue # Parse image line to get size - size_bytes = self._parse_size_from_line(line) + size_bytes = parse_size_from_line(line) if size_bytes > 0: total_size += size_bytes # Check if image has containers parts = line.split() if len(parts) >= 8: - containers_count = self._parse_number(parts[-1]) + containers_count = safe_int(parts[-1]) if containers_count > 0: active_images += 1 @@ -194,18 +189,11 @@ class SystemCollector(BaseCollector): # Format: CONTAINER ID IMAGE COMMAND LOCAL VOLUMES SIZE CREATED STATUS NAMES parts = line.split() if len(parts) >= 6: - # SIZE is typically at index 4 - size_str = None - for i, part in enumerate(parts): - if 'B' in part and i > 3: - size_str = part - break - - if size_str: - size_bytes = self._parse_size_string(size_str) - if size_bytes > 0: - total_size += size_bytes - container_count += 1 + # Parse container size + size_bytes = parse_size_from_line(line) + if size_bytes > 0: + total_size += size_bytes + container_count += 1 metrics.append({ 'name': 'system.containers.total_size_bytes', @@ -240,7 +228,7 @@ class SystemCollector(BaseCollector): continue # Parse volume line - size_bytes = self._parse_size_from_line(line) + size_bytes = parse_size_from_line(line) if size_bytes > 0: total_size += size_bytes volume_count += 1 @@ -260,50 +248,4 @@ class SystemCollector(BaseCollector): return metrics - def _parse_size_from_line(self, line: str) -> int: - """Extract size in bytes from a line containing size information.""" - # Look for patterns like "1.33GB", "443MB", "23.98kB" - size_pattern = r'(\d+(?:\.\d+)?)\s*(GB|MB|KB|kB|B)' - matches = re.findall(size_pattern, line, re.IGNORECASE) - - if matches: - # Return the largest size found (typically the first SIZE column) - sizes = [self._parse_size_string(f"{num}{unit}") for num, unit in matches] - return max(sizes) if sizes else 0 - - return 0 - - def _parse_size_string(self, size_str: str) -> int: - """Convert size string (e.g., '1.33GB') to bytes.""" - if not size_str or size_str == '0B': - return 0 - - try: - size_str = size_str.strip().upper() - - units = { - 'B': 1, - 'KB': 1024, - 'MB': 1024 ** 2, - 'GB': 1024 ** 3, - 'TB': 1024 ** 4 - } - - # Extract number and unit - match = re.match(r'(\d+(?:\.\d+)?)\s*([KMGT]?B)', size_str) - if match: - number = float(match.group(1)) - unit = match.group(2) - return int(number * units.get(unit, 1)) - - except Exception as e: - print(f"Warning: Could not parse size string '{size_str}': {e}") - - return 0 - - def _parse_number(self, s: str) -> int: - """Parse a number from a string, return 0 if not a number.""" - try: - return int(s) - except: - return 0 + diff --git a/src/collectors/volume_collector.py b/src/collectors/volume_collector.py index 52cce78..b217f47 100644 --- a/src/collectors/volume_collector.py +++ b/src/collectors/volume_collector.py @@ -1,7 +1,6 @@ """ Volume metrics collector - gathers storage usage and volume metadata. """ -import docker import time from typing import Dict, List, Any from .base import BaseCollector @@ -11,7 +10,7 @@ class VolumeCollector(BaseCollector): """Collects metrics from Docker volumes.""" def __init__(self): - self.client = docker.from_env() + self.client = self.get_docker_client() def get_name(self) -> str: return "volume" @@ -26,18 +25,12 @@ class VolumeCollector(BaseCollector): metrics = [] timestamp = int(time.time()) - try: - volumes = self.client.volumes.list() - - for volume in volumes: + volumes = self.client.volumes.list() + + for volume in volumes: + try: volume_name = volume.name - # Get volume details - volume_attrs = volume.attrs - - # Driver info - driver = volume_attrs.get('Driver', 'unknown') - # Count containers using this volume containers_using = self._get_containers_using_volume(volume_name) @@ -47,34 +40,28 @@ class VolumeCollector(BaseCollector): 'timestamp': timestamp }) - # Volume labels count (useful for tracking metadata complexity) - labels = volume_attrs.get('Labels', {}) or {} + # Volume labels count + labels = volume.attrs.get('Labels', {}) or {} metrics.append({ 'name': f'volumes.{volume_name}.labels_count', 'value': len(labels), 'timestamp': timestamp }) - - except Exception as e: - print(f"Error collecting volume metrics: {e}") + except Exception as e: + print(f" Warning: Error collecting metrics for volume {volume.name}: {e}") return metrics def _get_containers_using_volume(self, volume_name: str) -> List[str]: """Find all containers using a specific volume.""" containers_using = [] + all_containers = self.client.containers.list(all=True) - try: - all_containers = self.client.containers.list(all=True) - - for container in all_containers: - mounts = container.attrs.get('Mounts', []) - for mount in mounts: - if mount.get('Type') == 'volume' and mount.get('Name') == volume_name: - containers_using.append(container.name) - break - - except Exception as e: - print(f"Warning: Error finding containers for volume {volume_name}: {e}") - + for container in all_containers: + mounts = container.attrs.get('Mounts', []) + for mount in mounts: + if mount.get('Type') == 'volume' and mount.get('Name') == volume_name: + containers_using.append(container.name) + break + return containers_using diff --git a/src/exporters/graphite_exporter.py b/src/exporters/graphite_exporter.py index 2d4a6c4..cd05d4b 100644 --- a/src/exporters/graphite_exporter.py +++ b/src/exporters/graphite_exporter.py @@ -4,6 +4,7 @@ Graphite exporter - sends metrics to Graphite in plaintext protocol format. import socket import time from typing import List, Dict, Any +from utils import sanitize_metric_name class GraphiteExporter: @@ -54,22 +55,15 @@ class GraphiteExporter: try: # Format metrics in Graphite plaintext protocol - # Format: metric_path value timestamp\n lines = [] for metric in metrics: - name = metric.get('name', '') + name = sanitize_metric_name(metric.get('name', '')) value = metric.get('value', 0) timestamp = metric.get('timestamp', int(time.time())) - # Sanitize metric name - name = self._sanitize_metric_name(name) - - # Build full metric path + # Build full metric path and format: metric_path value timestamp full_name = f"{self.prefix}.{name}" - - # Format: metric_path value timestamp - line = f"{full_name} {value} {timestamp}\n" - lines.append(line) + lines.append(f"{full_name} {value} {timestamp}\n") message = ''.join(lines) @@ -80,26 +74,6 @@ class GraphiteExporter: print(f"Error exporting metrics to Graphite: {e}") return False - def _sanitize_metric_name(self, name: str) -> str: - """ - Sanitize metric name for Graphite. - Replace invalid characters with underscores. - """ - # Replace spaces and special characters - sanitized = name.replace(' ', '_') - sanitized = ''.join(c if c.isalnum() or c in '.-_' else '_' for c in sanitized) - - # Remove consecutive dots or underscores - while '..' in sanitized: - sanitized = sanitized.replace('..', '.') - while '__' in sanitized: - sanitized = sanitized.replace('__', '_') - - # Remove leading/trailing dots or underscores - sanitized = sanitized.strip('._') - - return sanitized - def _send_to_graphite(self, message: str) -> bool: """Send message to Graphite via TCP socket.""" sock = None diff --git a/src/main.py b/src/main.py index 753a6fe..6e76ca0 100644 --- a/src/main.py +++ b/src/main.py @@ -95,30 +95,24 @@ class DockerMetricsCollector: # Collect from all collectors all_metrics = [] for collector in self.collectors: + collector_name = collector.get_name() try: - collector_name = collector.get_name() metrics = collector.collect() all_metrics.extend(metrics) print(f" - {collector_name}: {len(metrics)} metrics") - - # Track collector success self.self_metrics.record_collector_success(collector_name) except Exception as e: - print(f" - Error in {collector.get_name()} collector: {e}") - # Track collector error - self.self_metrics.record_collector_error(collector.get_name()) + print(f" - {collector_name} error: {e}") + self.self_metrics.record_collector_error(collector_name) # Aggregate metrics - try: - aggregated_metrics = self.aggregator.aggregate(all_metrics) - added_count = len(aggregated_metrics) - len(all_metrics) - if added_count > 0: - print(f" - aggregator: {added_count} additional metrics") - all_metrics = aggregated_metrics - except Exception as e: - print(f" - Error in aggregator: {e}") + aggregated_metrics = self.aggregator.aggregate(all_metrics) + added_count = len(aggregated_metrics) - len(all_metrics) + if added_count > 0: + print(f" - aggregator: {added_count} metrics") + all_metrics = aggregated_metrics - print(f" Total: {len(all_metrics)} metrics collected") + print(f" Total: {len(all_metrics)} metrics") # Export to all exporters for exporter in self.exporters: @@ -126,7 +120,7 @@ class DockerMetricsCollector: exporter.export(all_metrics) self.self_metrics.record_export_success() except Exception as e: - print(f" - Error exporting: {e}") + print(f" - Export error: {e}") self.self_metrics.record_export_error() elapsed = time.time() - start_time diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..9cc6b92 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,126 @@ +""" +Common utilities for Docker metrics collection. +""" +import re +from typing import Any, Callable +from functools import wraps + + +def sanitize_metric_name(name: str) -> str: + """ + Sanitize metric name for Graphite/monitoring systems. + Replace invalid characters with underscores. + + Args: + name: Raw metric name + + Returns: + Sanitized metric name + """ + # Replace spaces and special characters + sanitized = name.replace(' ', '_') + sanitized = ''.join(c if c.isalnum() or c in '.-_' else '_' for c in sanitized) + + # Remove consecutive dots or underscores + while '..' in sanitized: + sanitized = sanitized.replace('..', '.') + while '__' in sanitized: + sanitized = sanitized.replace('__', '_') + + # Remove leading/trailing dots or underscores + return sanitized.strip('._') + + +def parse_size_string(size_str: str) -> int: + """ + Convert size string (e.g., '1.33GB', '443MB') to bytes. + + Args: + size_str: Size string with unit + + Returns: + Size in bytes, or 0 if parsing fails + """ + if not size_str or size_str == '0B': + return 0 + + try: + size_str = size_str.strip().upper() + + units = { + 'B': 1, + 'KB': 1024, + 'MB': 1024 ** 2, + 'GB': 1024 ** 3, + 'TB': 1024 ** 4 + } + + # Extract number and unit + match = re.match(r'(\d+(?:\.\d+)?)\s*([KMGT]?B)', size_str) + if match: + number = float(match.group(1)) + unit = match.group(2) + return int(number * units.get(unit, 1)) + + except Exception: + pass + + return 0 + + +def parse_size_from_line(line: str) -> int: + """ + Extract the largest size in bytes from a line containing size information. + Looks for patterns like "1.33GB", "443MB", "23.98kB". + + Args: + line: Text line containing size information + + Returns: + Largest size found in bytes, or 0 if none found + """ + size_pattern = r'(\d+(?:\.\d+)?)\s*(GB|MB|KB|kB|B)' + matches = re.findall(size_pattern, line, re.IGNORECASE) + + if matches: + sizes = [parse_size_string(f"{num}{unit}") for num, unit in matches] + return max(sizes) if sizes else 0 + + return 0 + + +def safe_int(value: Any, default: int = 0) -> int: + """ + Safely convert value to int, returning default on failure. + + Args: + value: Value to convert + default: Default value if conversion fails + + Returns: + Integer value or default + """ + try: + return int(value) + except (ValueError, TypeError): + return default + + +def handle_collector_errors(collector_name: str = None): + """ + Decorator to handle errors in collector methods gracefully. + + Args: + collector_name: Name of the collector for error messages + """ + def decorator(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + name = collector_name or func.__name__ + print(f"Warning: Error in {name}: {e}") + return [] if func.__name__ == 'collect' else None + return wrapper + return decorator