From 6ce1ee7f6202c73f5b356d6cca4f38071fc18bb2 Mon Sep 17 00:00:00 2001 From: Simon Gruber Date: Sun, 14 Dec 2025 20:59:43 +0100 Subject: [PATCH] Added optimizations and configuration options --- compose.yml | 24 ++++ src/aggregator.py | 10 +- src/collectors/base.py | 31 +++++ src/collectors/container_collector.py | 174 +++++++++++++++----------- src/collectors/self_collector.py | 1 + src/collectors/system_collector.py | 78 +++++++----- src/collectors/volume_collector.py | 52 +++++--- src/config.py | 103 +++++++++++++++ src/main.py | 131 ++++++++++++------- 9 files changed, 443 insertions(+), 161 deletions(-) create mode 100644 src/config.py diff --git a/compose.yml b/compose.yml index 93aee50..49d024a 100644 --- a/compose.yml +++ b/compose.yml @@ -6,10 +6,34 @@ services: container_name: simple-docker-metrics user: "0:0" environment: + # Connection settings - GRAPHITE_ENDPOINT=graphite:2003 - GRAPHITE_PREFIX=docker-metrics - INTERVAL_SECONDS=10 - DEBUG=false + + # Performance settings + - PARALLEL_COLLECTION=true # Enable parallel metric collection + - MAX_WORKERS=4 # Number of parallel workers + - CACHE_TTL_SECONDS=300 # Cache duration for expensive operations (5 min) + + # Enable/disable collectors + - COLLECT_CONTAINER_METRICS=true + - COLLECT_VOLUME_METRICS=true + - COLLECT_SYSTEM_METRICS=true + - COLLECT_SELF_METRICS=true + + # Container metric labels (comma-separated: cpu,memory,network,blkio,state,health,restart_count,disk) + # - CONTAINER_LABELS=cpu,memory,state # Uncomment to only collect specific labels + + # Volume metric labels (comma-separated: container_count,labels_count) + # - VOLUME_LABELS=container_count + + # System metric labels (comma-separated: containers,images,storage) + # - SYSTEM_LABELS=containers,images + + # Aggregations + - ENABLE_AGGREGATIONS=true volumes: - /var/run/docker.sock:/var/run/docker.sock:ro restart: unless-stopped diff --git a/src/aggregator.py b/src/aggregator.py index 2973c27..00705b0 100644 --- a/src/aggregator.py +++ b/src/aggregator.py @@ -1,13 +1,17 @@ """ Metrics aggregator - combines and processes metrics from multiple collectors. """ -from typing import List, Dict, Any +from typing import List, Dict, Any, Optional from collections import defaultdict class MetricsAggregator: """Aggregates and processes metrics from multiple sources.""" + def __init__(self, config=None): + """Initialize aggregator with optional config.""" + self.config = config + def aggregate(self, all_metrics: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Aggregate metrics and add computed aggregations. @@ -21,6 +25,10 @@ class MetricsAggregator: if not all_metrics: return all_metrics + # Check if aggregations are enabled + if self.config and not self.config.enable_aggregations: + return all_metrics + aggregated = list(all_metrics) # Start with original metrics timestamp = all_metrics[0].get('timestamp', 0) diff --git a/src/collectors/base.py b/src/collectors/base.py index 31237b4..53a79b6 100644 --- a/src/collectors/base.py +++ b/src/collectors/base.py @@ -4,6 +4,7 @@ Base collector interface for Docker metrics collection. from abc import ABC, abstractmethod from typing import Dict, List, Any, Optional import docker +import time class BaseCollector(ABC): @@ -12,6 +13,13 @@ class BaseCollector(ABC): # Shared Docker client instance across all collectors _shared_client: Optional[docker.DockerClient] = None + # Cache for expensive operations + _cache: Dict[str, Dict[str, Any]] = {} + + def __init__(self): + """Initialize collector with optional config.""" + self.config = None # Will be set by main collector + @classmethod def get_docker_client(cls) -> docker.DockerClient: """Get or create shared Docker client instance.""" @@ -19,6 +27,29 @@ class BaseCollector(ABC): cls._shared_client = docker.from_env() return cls._shared_client + def set_config(self, config): + """Set collector configuration.""" + self.config = config + + def _get_cached(self, key: str, ttl_seconds: int = 300) -> Optional[Any]: + """Get cached value if not expired.""" + if key in self._cache: + cached_data = self._cache[key] + if time.time() - cached_data['timestamp'] < ttl_seconds: + return cached_data['value'] + return None + + def _set_cache(self, key: str, value: Any): + """Set cache value with current timestamp.""" + self._cache[key] = { + 'value': value, + 'timestamp': time.time() + } + + def _clear_cache(self): + """Clear all cached data.""" + self._cache.clear() + @abstractmethod def collect(self) -> List[Dict[str, Any]]: """ diff --git a/src/collectors/container_collector.py b/src/collectors/container_collector.py index 8cf7179..4008f62 100644 --- a/src/collectors/container_collector.py +++ b/src/collectors/container_collector.py @@ -10,6 +10,7 @@ class ContainerCollector(BaseCollector): """Collects metrics from Docker containers.""" def __init__(self): + super().__init__() self.client = self.get_docker_client() def get_name(self) -> str: @@ -25,93 +26,116 @@ class ContainerCollector(BaseCollector): metrics = [] timestamp = int(time.time()) + # Check if container metrics are enabled + if self.config and not self.config.collect_container_metrics: + return metrics + containers = self.client.containers.list(all=True) + # Cache disk usage data if enabled + df_data = None + cache_ttl = self.config.cache_ttl_seconds if self.config else 300 + + if self._should_collect_label('disk'): + cached_df = self._get_cached('df_data', cache_ttl) + if cached_df is not None: + df_data = cached_df + else: + df_data = self.client.df() + self._set_cache('df_data', df_data) + for container in containers: try: container_name = container.name + state = container.status # Basic state metrics - state = container.status - state_value = self._state_to_value(state) - - metrics.append({ - 'name': f'containers.{container_name}.state', - 'value': state_value, - 'timestamp': timestamp - }) + if self._should_collect_label('state'): + state_value = self._state_to_value(state) + metrics.append({ + 'name': f'containers.{container_name}.state', + 'value': state_value, + 'timestamp': timestamp + }) # Health status if available - health_status = self._get_health_status(container) - if health_status is not None: - metrics.append({ - 'name': f'containers.{container_name}.health', - 'value': health_status, - 'timestamp': timestamp - }) + if self._should_collect_label('health'): + health_status = self._get_health_status(container) + if health_status is not None: + metrics.append({ + 'name': f'containers.{container_name}.health', + 'value': health_status, + 'timestamp': timestamp + }) # Restart count - restart_count = container.attrs.get('RestartCount', 0) - metrics.append({ - 'name': f'containers.{container_name}.restart_count', - 'value': restart_count, - 'timestamp': timestamp - }) - - # Disk usage metrics (available for all containers) - disk_usage = self._get_container_disk_usage(container) - if disk_usage is not None: + if self._should_collect_label('restart_count'): + restart_count = container.attrs.get('RestartCount', 0) metrics.append({ - 'name': f'containers.{container_name}.disk_usage_bytes', - 'value': disk_usage, + 'name': f'containers.{container_name}.restart_count', + 'value': restart_count, 'timestamp': timestamp }) + # Disk usage metrics (available for all containers) + if self._should_collect_label('disk') and df_data: + disk_usage = self._get_container_disk_usage_from_df(container, df_data) + if disk_usage is not None: + metrics.append({ + 'name': f'containers.{container_name}.disk_usage_bytes', + 'value': disk_usage, + 'timestamp': timestamp + }) + # Only collect resource metrics for running containers if state == 'running': stats = container.stats(stream=False) # CPU metrics - cpu_percent = self._calculate_cpu_percent(stats) - metrics.append({ - 'name': f'containers.{container_name}.cpu_percent', - 'value': cpu_percent, - 'timestamp': timestamp - }) + if self._should_collect_label('cpu'): + cpu_percent = self._calculate_cpu_percent(stats) + metrics.append({ + 'name': f'containers.{container_name}.cpu_percent', + 'value': cpu_percent, + 'timestamp': timestamp + }) # Memory metrics - memory_stats = stats.get('memory_stats', {}) - memory_usage = memory_stats.get('usage', 0) - memory_limit = memory_stats.get('limit', 1) - memory_percent = (memory_usage / memory_limit) * 100 if memory_limit > 0 else 0 - - metrics.append({ - 'name': f'containers.{container_name}.memory_bytes', - 'value': memory_usage, - 'timestamp': timestamp - }) - - metrics.append({ - 'name': f'containers.{container_name}.memory_percent', - 'value': round(memory_percent, 2), - 'timestamp': timestamp - }) + if self._should_collect_label('memory'): + memory_stats = stats.get('memory_stats', {}) + memory_usage = memory_stats.get('usage', 0) + memory_limit = memory_stats.get('limit', 1) + memory_percent = (memory_usage / memory_limit) * 100 if memory_limit > 0 else 0 + + metrics.append({ + 'name': f'containers.{container_name}.memory_bytes', + 'value': memory_usage, + 'timestamp': timestamp + }) + + metrics.append({ + 'name': f'containers.{container_name}.memory_percent', + 'value': round(memory_percent, 2), + 'timestamp': timestamp + }) # Network metrics - for net_metric in self._get_network_metrics(stats): - metrics.append({ - 'name': f'containers.{container_name}.{net_metric["name"]}', - 'value': net_metric['value'], - 'timestamp': timestamp - }) + if self._should_collect_label('network'): + for net_metric in self._get_network_metrics(stats): + metrics.append({ + 'name': f'containers.{container_name}.{net_metric["name"]}', + 'value': net_metric['value'], + 'timestamp': timestamp + }) # Block I/O metrics - for io_metric in self._get_io_metrics(stats): - metrics.append({ - 'name': f'containers.{container_name}.{io_metric["name"]}', - 'value': io_metric['value'], - 'timestamp': timestamp - }) + if self._should_collect_label('blkio'): + for io_metric in self._get_io_metrics(stats): + metrics.append({ + 'name': f'containers.{container_name}.{io_metric["name"]}', + 'value': io_metric['value'], + 'timestamp': timestamp + }) except Exception as e: print(f" Warning: Error collecting metrics for {container.name}: {e}") @@ -120,12 +144,12 @@ class ContainerCollector(BaseCollector): def _state_to_value(self, state: str) -> int: """Convert container state to numeric value.""" state_map = { - 'running': 2, - 'paused': 1, - 'restarting': 1, - 'exited': 0, - 'dead': 0, - 'created': 0 + 'running': 5, + 'paused': 4, + 'restarting': 3, + 'created': 2, + 'exited': 1, + 'dead': 0 } return state_map.get(state.lower(), 0) @@ -208,9 +232,14 @@ class ContainerCollector(BaseCollector): {'name': 'blkio.write_bytes', 'value': write_bytes} ] - def _get_container_disk_usage(self, container) -> int: - """Get the disk usage for a container in bytes using the Docker system df API.""" - df_data = self.client.df() + def _should_collect_label(self, label: str) -> bool: + """Check if a specific label should be collected.""" + if not self.config: + return True + return self.config.is_label_enabled('container', label) + + def _get_container_disk_usage_from_df(self, container, df_data) -> int: + """Get the disk usage for a container from pre-fetched df data.""" containers_info = df_data.get('Containers', []) # Find the matching container by ID @@ -222,3 +251,8 @@ class ContainerCollector(BaseCollector): return size_rootfs if size_rootfs > 0 else size_rw return 0 + + def _get_container_disk_usage(self, container) -> int: + """Get the disk usage for a container in bytes using the Docker system df API.""" + df_data = self.client.df() + return self._get_container_disk_usage_from_df(container, df_data) diff --git a/src/collectors/self_collector.py b/src/collectors/self_collector.py index 7b6c302..1778303 100644 --- a/src/collectors/self_collector.py +++ b/src/collectors/self_collector.py @@ -13,6 +13,7 @@ class SelfMetricsCollector(BaseCollector): def __init__(self): """Initialize the self-metrics collector.""" + super().__init__() self.process = psutil.Process(os.getpid()) self.start_time = time.time() self.iteration_count = 0 diff --git a/src/collectors/system_collector.py b/src/collectors/system_collector.py index 4805f60..d719008 100644 --- a/src/collectors/system_collector.py +++ b/src/collectors/system_collector.py @@ -12,6 +12,7 @@ class SystemCollector(BaseCollector): """Collects system-level Docker metrics using 'docker system df'.""" def __init__(self): + super().__init__() self.client = self.get_docker_client() def get_name(self) -> str: @@ -27,44 +28,51 @@ class SystemCollector(BaseCollector): metrics = [] timestamp = int(time.time()) + # Check if system metrics are enabled + if self.config and not self.config.collect_system_metrics: + return metrics + # Get Docker info info = self.client.info() # System-wide container counts - metrics.append({ - 'name': 'system.containers.total', - 'value': info.get('Containers', 0), - 'timestamp': timestamp - }) - - metrics.append({ - 'name': 'system.containers.running', - 'value': info.get('ContainersRunning', 0), - 'timestamp': timestamp - }) - - metrics.append({ - 'name': 'system.containers.paused', - 'value': info.get('ContainersPaused', 0), - 'timestamp': timestamp - }) - - metrics.append({ - 'name': 'system.containers.stopped', - 'value': info.get('ContainersStopped', 0), - 'timestamp': timestamp - }) + if self._should_collect_label('containers'): + metrics.append({ + 'name': 'system.containers.total', + 'value': info.get('Containers', 0), + 'timestamp': timestamp + }) + + metrics.append({ + 'name': 'system.containers.running', + 'value': info.get('ContainersRunning', 0), + 'timestamp': timestamp + }) + + metrics.append({ + 'name': 'system.containers.paused', + 'value': info.get('ContainersPaused', 0), + 'timestamp': timestamp + }) + + metrics.append({ + 'name': 'system.containers.stopped', + 'value': info.get('ContainersStopped', 0), + 'timestamp': timestamp + }) # Image count - metrics.append({ - 'name': 'system.images.total', - 'value': info.get('Images', 0), - 'timestamp': timestamp - }) + if self._should_collect_label('images'): + metrics.append({ + 'name': 'system.images.total', + 'value': info.get('Images', 0), + 'timestamp': timestamp + }) # Parse docker system df -v output for detailed metrics - df_metrics = self._parse_docker_df() - metrics.extend(df_metrics) + if self._should_collect_label('storage'): + df_metrics = self._parse_docker_df() + metrics.extend(df_metrics) return metrics @@ -242,10 +250,16 @@ class SystemCollector(BaseCollector): 'name': 'system.volumes.count', 'value': volume_count }) - + except Exception as e: - print(f"Warning: Error parsing volumes section: {e}") + print(f" Warning: Error parsing Local Volumes section: {e}") return metrics + def _should_collect_label(self, label: str) -> bool: + """Check if a specific label should be collected.""" + if not self.config: + return True + return self.config.is_label_enabled('system', label) + diff --git a/src/collectors/volume_collector.py b/src/collectors/volume_collector.py index b217f47..1368488 100644 --- a/src/collectors/volume_collector.py +++ b/src/collectors/volume_collector.py @@ -10,6 +10,7 @@ class VolumeCollector(BaseCollector): """Collects metrics from Docker volumes.""" def __init__(self): + super().__init__() self.client = self.get_docker_client() def get_name(self) -> str: @@ -25,37 +26,58 @@ class VolumeCollector(BaseCollector): metrics = [] timestamp = int(time.time()) + # Check if volume metrics are enabled + if self.config and not self.config.collect_volume_metrics: + return metrics + volumes = self.client.volumes.list() + # Cache container list for volume lookups + cache_ttl = self.config.cache_ttl_seconds if self.config else 300 + all_containers = self._get_cached('all_containers', cache_ttl) + if all_containers is None: + all_containers = self.client.containers.list(all=True) + self._set_cache('all_containers', all_containers) + for volume in volumes: try: volume_name = volume.name # Count containers using this volume - containers_using = self._get_containers_using_volume(volume_name) - - metrics.append({ - 'name': f'volumes.{volume_name}.container_count', - 'value': len(containers_using), - 'timestamp': timestamp - }) + if self._should_collect_label('container_count'): + containers_using = self._get_containers_using_volume(volume_name, all_containers) + + metrics.append({ + 'name': f'volumes.{volume_name}.container_count', + 'value': len(containers_using), + 'timestamp': timestamp + }) # Volume labels count - labels = volume.attrs.get('Labels', {}) or {} - metrics.append({ - 'name': f'volumes.{volume_name}.labels_count', - 'value': len(labels), - 'timestamp': timestamp - }) + if self._should_collect_label('labels_count'): + labels = volume.attrs.get('Labels', {}) or {} + metrics.append({ + 'name': f'volumes.{volume_name}.labels_count', + 'value': len(labels), + 'timestamp': timestamp + }) except Exception as e: print(f" Warning: Error collecting metrics for volume {volume.name}: {e}") return metrics - def _get_containers_using_volume(self, volume_name: str) -> List[str]: + def _should_collect_label(self, label: str) -> bool: + """Check if a specific label should be collected.""" + if not self.config: + return True + return self.config.is_label_enabled('volume', label) + + def _get_containers_using_volume(self, volume_name: str, all_containers=None) -> List[str]: """Find all containers using a specific volume.""" containers_using = [] - all_containers = self.client.containers.list(all=True) + + if all_containers is None: + all_containers = self.client.containers.list(all=True) for container in all_containers: mounts = container.attrs.get('Mounts', []) diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..2f17733 --- /dev/null +++ b/src/config.py @@ -0,0 +1,103 @@ +""" +Configuration management for Docker metrics collector. +""" +import os +from typing import Set + + +class CollectorConfig: + """Configuration for metric collection behavior.""" + + def __init__(self): + # General settings + self.graphite_endpoint = os.getenv('GRAPHITE_ENDPOINT', 'http://localhost:2003') + self.graphite_prefix = os.getenv('GRAPHITE_PREFIX', 'docker-metrics') + self.interval_seconds = int(os.getenv('INTERVAL_SECONDS', '60')) + self.debug = os.getenv('DEBUG', 'false').lower() == 'true' + + # Performance settings + self.parallel_collection = os.getenv('PARALLEL_COLLECTION', 'true').lower() == 'true' + self.max_workers = int(os.getenv('MAX_WORKERS', '4')) + + # Cache settings + self.cache_ttl_seconds = int(os.getenv('CACHE_TTL_SECONDS', '300')) # 5 minutes default + + # Label/metric filtering - disable specific metric types + self.collect_container_metrics = self._parse_bool('COLLECT_CONTAINER_METRICS', True) + self.collect_volume_metrics = self._parse_bool('COLLECT_VOLUME_METRICS', True) + self.collect_system_metrics = self._parse_bool('COLLECT_SYSTEM_METRICS', True) + self.collect_self_metrics = self._parse_bool('COLLECT_SELF_METRICS', True) + + # Container metric labels - granular control + self.container_labels = self._parse_label_set('CONTAINER_LABELS', { + 'cpu', 'memory', 'network', 'blkio', 'state', 'health', 'restart_count', 'disk' + }) + + # Volume metric labels + self.volume_labels = self._parse_label_set('VOLUME_LABELS', { + 'container_count', 'labels_count' + }) + + # System metric labels + self.system_labels = self._parse_label_set('SYSTEM_LABELS', { + 'containers', 'images', 'storage' + }) + + # Aggregation settings + self.enable_aggregations = self._parse_bool('ENABLE_AGGREGATIONS', True) + + def _parse_bool(self, env_var: str, default: bool) -> bool: + """Parse boolean environment variable.""" + value = os.getenv(env_var, str(default)).lower() + return value in ('true', '1', 'yes', 'on') + + def _parse_label_set(self, env_var: str, default: Set[str]) -> Set[str]: + """ + Parse comma-separated label list from environment. + Returns set of enabled labels. + """ + value = os.getenv(env_var, '') + if not value: + return default + + # Split by comma and clean whitespace + labels = {label.strip().lower() for label in value.split(',') if label.strip()} + return labels if labels else default + + def is_label_enabled(self, category: str, label: str) -> bool: + """Check if a specific label is enabled for a category.""" + label = label.lower() + + if category == 'container': + return label in self.container_labels + elif category == 'volume': + return label in self.volume_labels + elif category == 'system': + return label in self.system_labels + + return True + + def print_config(self): + """Print current configuration.""" + print("="*60) + print("Configuration:") + print("="*60) + print(f"Graphite Endpoint: {self.graphite_endpoint}") + print(f"Metric Prefix: {self.graphite_prefix}") + print(f"Collection Interval: {self.interval_seconds}s") + print(f"Debug Mode: {self.debug}") + print(f"Parallel Collection: {self.parallel_collection}") + print(f"Max Workers: {self.max_workers}") + print(f"Cache TTL: {self.cache_ttl_seconds}s") + print() + print("Enabled Collectors:") + print(f" - Containers: {self.collect_container_metrics}") + print(f" - Volumes: {self.collect_volume_metrics}") + print(f" - System: {self.collect_system_metrics}") + print(f" - Self-metrics: {self.collect_self_metrics}") + print() + print("Container Labels:", ', '.join(sorted(self.container_labels)) or 'none') + print("Volume Labels:", ', '.join(sorted(self.volume_labels)) or 'none') + print("System Labels:", ', '.join(sorted(self.system_labels)) or 'none') + print("Aggregations:", self.enable_aggregations) + print("="*60) diff --git a/src/main.py b/src/main.py index 6e76ca0..034a94a 100644 --- a/src/main.py +++ b/src/main.py @@ -4,10 +4,12 @@ import sys import time import signal from typing import List +from concurrent.futures import ThreadPoolExecutor, as_completed from collectors import ContainerCollector, VolumeCollector, SystemCollector, SelfMetricsCollector from exporters import GraphiteExporter, ConsoleExporter from aggregator import MetricsAggregator +from config import CollectorConfig class DockerMetricsCollector: @@ -15,51 +17,56 @@ class DockerMetricsCollector: def __init__(self): self.running = True - self.config = self._load_config() + self.config = CollectorConfig() # Initialize self-metrics collector first self.self_metrics = SelfMetricsCollector() + self.self_metrics.set_config(self.config) - # Initialize collectors - self.collectors = [ - ContainerCollector(), - VolumeCollector(), - SystemCollector(), - self.self_metrics # Include self-metrics in collection - ] + # Initialize collectors based on config + self.collectors = [] + + if self.config.collect_container_metrics: + container_collector = ContainerCollector() + container_collector.set_config(self.config) + self.collectors.append(container_collector) + + if self.config.collect_volume_metrics: + volume_collector = VolumeCollector() + volume_collector.set_config(self.config) + self.collectors.append(volume_collector) + + if self.config.collect_system_metrics: + system_collector = SystemCollector() + system_collector.set_config(self.config) + self.collectors.append(system_collector) + + if self.config.collect_self_metrics: + self.collectors.append(self.self_metrics) # Initialize aggregator - self.aggregator = MetricsAggregator() + self.aggregator = MetricsAggregator(self.config) # Initialize exporters self.exporters = [] # Add Graphite exporter - if self.config['graphite_endpoint']: + if self.config.graphite_endpoint: self.exporters.append( GraphiteExporter( - endpoint=self.config['graphite_endpoint'], - prefix=self.config['graphite_prefix'] + endpoint=self.config.graphite_endpoint, + prefix=self.config.graphite_prefix ) ) # Add console exporter in debug mode - if self.config['debug']: + if self.config.debug: self.exporters.append(ConsoleExporter(pretty_print=False)) # Setup signal handlers signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) - def _load_config(self) -> dict: - """Load configuration from environment variables.""" - return { - 'graphite_endpoint': os.getenv('GRAPHITE_ENDPOINT', 'http://localhost:2003'), - 'graphite_prefix': os.getenv('GRAPHITE_PREFIX', 'docker-metrics'), - 'interval_seconds': int(os.getenv('INTERVAL_SECONDS', '60')), - 'debug': os.getenv('DEBUG', 'false').lower() == 'true' - } - def _signal_handler(self, signum, frame): """Handle shutdown signals gracefully.""" print(f"\nReceived signal {signum}, shutting down...") @@ -70,11 +77,7 @@ class DockerMetricsCollector: print("="*60) print("Simple Docker Metrics Collector") print("="*60) - print(f"Graphite Endpoint: {self.config['graphite_endpoint']}") - print(f"Metric Prefix: {self.config['graphite_prefix']}") - print(f"Collection Interval: {self.config['interval_seconds']}s") - print(f"Debug Mode: {self.config['debug']}") - print("="*60) + self.config.print_config() # Test Graphite connection if self.exporters: @@ -92,18 +95,11 @@ class DockerMetricsCollector: print(f"[Iteration {iteration}] Collecting metrics...") start_time = time.time() - # Collect from all collectors - all_metrics = [] - for collector in self.collectors: - collector_name = collector.get_name() - try: - metrics = collector.collect() - all_metrics.extend(metrics) - print(f" - {collector_name}: {len(metrics)} metrics") - self.self_metrics.record_collector_success(collector_name) - except Exception as e: - print(f" - {collector_name} error: {e}") - self.self_metrics.record_collector_error(collector_name) + # Collect from all collectors (parallel or sequential) + if self.config.parallel_collection and len(self.collectors) > 1: + all_metrics = self._collect_parallel() + else: + all_metrics = self._collect_sequential() # Aggregate metrics aggregated_metrics = self.aggregator.aggregate(all_metrics) @@ -118,20 +114,23 @@ class DockerMetricsCollector: for exporter in self.exporters: try: exporter.export(all_metrics) - self.self_metrics.record_export_success() + if self.config.collect_self_metrics: + self.self_metrics.record_export_success() except Exception as e: print(f" - Export error: {e}") - self.self_metrics.record_export_error() + if self.config.collect_self_metrics: + self.self_metrics.record_export_error() elapsed = time.time() - start_time # Record iteration metrics (excluding self-metrics from count to avoid recursion) - self.self_metrics.record_iteration(elapsed, len(all_metrics)) + if self.config.collect_self_metrics: + self.self_metrics.record_iteration(elapsed, len(all_metrics)) print(f" Collection completed in {elapsed:.2f}s\n") # Sleep until next iteration - sleep_time = max(0, self.config['interval_seconds'] - elapsed) + sleep_time = max(0, self.config.interval_seconds - elapsed) if sleep_time > 0 and self.running: time.sleep(sleep_time) @@ -141,6 +140,52 @@ class DockerMetricsCollector: time.sleep(10) # Brief pause before retrying print("\nShutdown complete.") + + def _collect_sequential(self) -> List: + """Collect metrics sequentially from all collectors.""" + all_metrics = [] + for collector in self.collectors: + collector_name = collector.get_name() + try: + metrics = collector.collect() + all_metrics.extend(metrics) + print(f" - {collector_name}: {len(metrics)} metrics") + if self.config.collect_self_metrics: + self.self_metrics.record_collector_success(collector_name) + except Exception as e: + print(f" - {collector_name} error: {e}") + if self.config.collect_self_metrics: + self.self_metrics.record_collector_error(collector_name) + return all_metrics + + def _collect_parallel(self) -> List: + """Collect metrics in parallel from all collectors using ThreadPoolExecutor.""" + all_metrics = [] + + with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: + # Submit all collection tasks + future_to_collector = { + executor.submit(collector.collect): collector + for collector in self.collectors + } + + # Gather results as they complete + for future in as_completed(future_to_collector): + collector = future_to_collector[future] + collector_name = collector.get_name() + + try: + metrics = future.result() + all_metrics.extend(metrics) + print(f" - {collector_name}: {len(metrics)} metrics") + if self.config.collect_self_metrics: + self.self_metrics.record_collector_success(collector_name) + except Exception as e: + print(f" - {collector_name} error: {e}") + if self.config.collect_self_metrics: + self.self_metrics.record_collector_error(collector_name) + + return all_metrics def main():