Added optimizations and configuration options

This commit is contained in:
Simon Gruber
2025-12-14 20:59:43 +01:00
parent 49d3cc4772
commit 6ce1ee7f62
9 changed files with 443 additions and 161 deletions

View File

@@ -6,10 +6,34 @@ services:
container_name: simple-docker-metrics
user: "0:0"
environment:
# Connection settings
- GRAPHITE_ENDPOINT=graphite:2003
- GRAPHITE_PREFIX=docker-metrics
- INTERVAL_SECONDS=10
- DEBUG=false
# Performance settings
- PARALLEL_COLLECTION=true # Enable parallel metric collection
- MAX_WORKERS=4 # Number of parallel workers
- CACHE_TTL_SECONDS=300 # Cache duration for expensive operations (5 min)
# Enable/disable collectors
- COLLECT_CONTAINER_METRICS=true
- COLLECT_VOLUME_METRICS=true
- COLLECT_SYSTEM_METRICS=true
- COLLECT_SELF_METRICS=true
# Container metric labels (comma-separated: cpu,memory,network,blkio,state,health,restart_count,disk)
# - CONTAINER_LABELS=cpu,memory,state # Uncomment to only collect specific labels
# Volume metric labels (comma-separated: container_count,labels_count)
# - VOLUME_LABELS=container_count
# System metric labels (comma-separated: containers,images,storage)
# - SYSTEM_LABELS=containers,images
# Aggregations
- ENABLE_AGGREGATIONS=true
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
restart: unless-stopped

View File

@@ -1,13 +1,17 @@
"""
Metrics aggregator - combines and processes metrics from multiple collectors.
"""
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional
from collections import defaultdict
class MetricsAggregator:
"""Aggregates and processes metrics from multiple sources."""
def __init__(self, config=None):
"""Initialize aggregator with optional config."""
self.config = config
def aggregate(self, all_metrics: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Aggregate metrics and add computed aggregations.
@@ -21,6 +25,10 @@ class MetricsAggregator:
if not all_metrics:
return all_metrics
# Check if aggregations are enabled
if self.config and not self.config.enable_aggregations:
return all_metrics
aggregated = list(all_metrics) # Start with original metrics
timestamp = all_metrics[0].get('timestamp', 0)

View File

@@ -4,6 +4,7 @@ Base collector interface for Docker metrics collection.
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
import docker
import time
class BaseCollector(ABC):
@@ -12,6 +13,13 @@ class BaseCollector(ABC):
# Shared Docker client instance across all collectors
_shared_client: Optional[docker.DockerClient] = None
# Cache for expensive operations
_cache: Dict[str, Dict[str, Any]] = {}
def __init__(self):
"""Initialize collector with optional config."""
self.config = None # Will be set by main collector
@classmethod
def get_docker_client(cls) -> docker.DockerClient:
"""Get or create shared Docker client instance."""
@@ -19,6 +27,29 @@ class BaseCollector(ABC):
cls._shared_client = docker.from_env()
return cls._shared_client
def set_config(self, config):
"""Set collector configuration."""
self.config = config
def _get_cached(self, key: str, ttl_seconds: int = 300) -> Optional[Any]:
"""Get cached value if not expired."""
if key in self._cache:
cached_data = self._cache[key]
if time.time() - cached_data['timestamp'] < ttl_seconds:
return cached_data['value']
return None
def _set_cache(self, key: str, value: Any):
"""Set cache value with current timestamp."""
self._cache[key] = {
'value': value,
'timestamp': time.time()
}
def _clear_cache(self):
"""Clear all cached data."""
self._cache.clear()
@abstractmethod
def collect(self) -> List[Dict[str, Any]]:
"""

View File

@@ -10,6 +10,7 @@ class ContainerCollector(BaseCollector):
"""Collects metrics from Docker containers."""
def __init__(self):
super().__init__()
self.client = self.get_docker_client()
def get_name(self) -> str:
@@ -25,93 +26,116 @@ class ContainerCollector(BaseCollector):
metrics = []
timestamp = int(time.time())
# Check if container metrics are enabled
if self.config and not self.config.collect_container_metrics:
return metrics
containers = self.client.containers.list(all=True)
# Cache disk usage data if enabled
df_data = None
cache_ttl = self.config.cache_ttl_seconds if self.config else 300
if self._should_collect_label('disk'):
cached_df = self._get_cached('df_data', cache_ttl)
if cached_df is not None:
df_data = cached_df
else:
df_data = self.client.df()
self._set_cache('df_data', df_data)
for container in containers:
try:
container_name = container.name
state = container.status
# Basic state metrics
state = container.status
state_value = self._state_to_value(state)
metrics.append({
'name': f'containers.{container_name}.state',
'value': state_value,
'timestamp': timestamp
})
if self._should_collect_label('state'):
state_value = self._state_to_value(state)
metrics.append({
'name': f'containers.{container_name}.state',
'value': state_value,
'timestamp': timestamp
})
# Health status if available
health_status = self._get_health_status(container)
if health_status is not None:
metrics.append({
'name': f'containers.{container_name}.health',
'value': health_status,
'timestamp': timestamp
})
if self._should_collect_label('health'):
health_status = self._get_health_status(container)
if health_status is not None:
metrics.append({
'name': f'containers.{container_name}.health',
'value': health_status,
'timestamp': timestamp
})
# Restart count
restart_count = container.attrs.get('RestartCount', 0)
metrics.append({
'name': f'containers.{container_name}.restart_count',
'value': restart_count,
'timestamp': timestamp
})
# Disk usage metrics (available for all containers)
disk_usage = self._get_container_disk_usage(container)
if disk_usage is not None:
if self._should_collect_label('restart_count'):
restart_count = container.attrs.get('RestartCount', 0)
metrics.append({
'name': f'containers.{container_name}.disk_usage_bytes',
'value': disk_usage,
'name': f'containers.{container_name}.restart_count',
'value': restart_count,
'timestamp': timestamp
})
# Disk usage metrics (available for all containers)
if self._should_collect_label('disk') and df_data:
disk_usage = self._get_container_disk_usage_from_df(container, df_data)
if disk_usage is not None:
metrics.append({
'name': f'containers.{container_name}.disk_usage_bytes',
'value': disk_usage,
'timestamp': timestamp
})
# Only collect resource metrics for running containers
if state == 'running':
stats = container.stats(stream=False)
# CPU metrics
cpu_percent = self._calculate_cpu_percent(stats)
metrics.append({
'name': f'containers.{container_name}.cpu_percent',
'value': cpu_percent,
'timestamp': timestamp
})
if self._should_collect_label('cpu'):
cpu_percent = self._calculate_cpu_percent(stats)
metrics.append({
'name': f'containers.{container_name}.cpu_percent',
'value': cpu_percent,
'timestamp': timestamp
})
# Memory metrics
memory_stats = stats.get('memory_stats', {})
memory_usage = memory_stats.get('usage', 0)
memory_limit = memory_stats.get('limit', 1)
memory_percent = (memory_usage / memory_limit) * 100 if memory_limit > 0 else 0
metrics.append({
'name': f'containers.{container_name}.memory_bytes',
'value': memory_usage,
'timestamp': timestamp
})
metrics.append({
'name': f'containers.{container_name}.memory_percent',
'value': round(memory_percent, 2),
'timestamp': timestamp
})
if self._should_collect_label('memory'):
memory_stats = stats.get('memory_stats', {})
memory_usage = memory_stats.get('usage', 0)
memory_limit = memory_stats.get('limit', 1)
memory_percent = (memory_usage / memory_limit) * 100 if memory_limit > 0 else 0
metrics.append({
'name': f'containers.{container_name}.memory_bytes',
'value': memory_usage,
'timestamp': timestamp
})
metrics.append({
'name': f'containers.{container_name}.memory_percent',
'value': round(memory_percent, 2),
'timestamp': timestamp
})
# Network metrics
for net_metric in self._get_network_metrics(stats):
metrics.append({
'name': f'containers.{container_name}.{net_metric["name"]}',
'value': net_metric['value'],
'timestamp': timestamp
})
if self._should_collect_label('network'):
for net_metric in self._get_network_metrics(stats):
metrics.append({
'name': f'containers.{container_name}.{net_metric["name"]}',
'value': net_metric['value'],
'timestamp': timestamp
})
# Block I/O metrics
for io_metric in self._get_io_metrics(stats):
metrics.append({
'name': f'containers.{container_name}.{io_metric["name"]}',
'value': io_metric['value'],
'timestamp': timestamp
})
if self._should_collect_label('blkio'):
for io_metric in self._get_io_metrics(stats):
metrics.append({
'name': f'containers.{container_name}.{io_metric["name"]}',
'value': io_metric['value'],
'timestamp': timestamp
})
except Exception as e:
print(f" Warning: Error collecting metrics for {container.name}: {e}")
@@ -120,12 +144,12 @@ class ContainerCollector(BaseCollector):
def _state_to_value(self, state: str) -> int:
"""Convert container state to numeric value."""
state_map = {
'running': 2,
'paused': 1,
'restarting': 1,
'exited': 0,
'dead': 0,
'created': 0
'running': 5,
'paused': 4,
'restarting': 3,
'created': 2,
'exited': 1,
'dead': 0
}
return state_map.get(state.lower(), 0)
@@ -208,9 +232,14 @@ class ContainerCollector(BaseCollector):
{'name': 'blkio.write_bytes', 'value': write_bytes}
]
def _get_container_disk_usage(self, container) -> int:
"""Get the disk usage for a container in bytes using the Docker system df API."""
df_data = self.client.df()
def _should_collect_label(self, label: str) -> bool:
"""Check if a specific label should be collected."""
if not self.config:
return True
return self.config.is_label_enabled('container', label)
def _get_container_disk_usage_from_df(self, container, df_data) -> int:
"""Get the disk usage for a container from pre-fetched df data."""
containers_info = df_data.get('Containers', [])
# Find the matching container by ID
@@ -222,3 +251,8 @@ class ContainerCollector(BaseCollector):
return size_rootfs if size_rootfs > 0 else size_rw
return 0
def _get_container_disk_usage(self, container) -> int:
"""Get the disk usage for a container in bytes using the Docker system df API."""
df_data = self.client.df()
return self._get_container_disk_usage_from_df(container, df_data)

View File

@@ -13,6 +13,7 @@ class SelfMetricsCollector(BaseCollector):
def __init__(self):
"""Initialize the self-metrics collector."""
super().__init__()
self.process = psutil.Process(os.getpid())
self.start_time = time.time()
self.iteration_count = 0

View File

@@ -12,6 +12,7 @@ class SystemCollector(BaseCollector):
"""Collects system-level Docker metrics using 'docker system df'."""
def __init__(self):
super().__init__()
self.client = self.get_docker_client()
def get_name(self) -> str:
@@ -27,44 +28,51 @@ class SystemCollector(BaseCollector):
metrics = []
timestamp = int(time.time())
# Check if system metrics are enabled
if self.config and not self.config.collect_system_metrics:
return metrics
# Get Docker info
info = self.client.info()
# System-wide container counts
metrics.append({
'name': 'system.containers.total',
'value': info.get('Containers', 0),
'timestamp': timestamp
})
metrics.append({
'name': 'system.containers.running',
'value': info.get('ContainersRunning', 0),
'timestamp': timestamp
})
metrics.append({
'name': 'system.containers.paused',
'value': info.get('ContainersPaused', 0),
'timestamp': timestamp
})
metrics.append({
'name': 'system.containers.stopped',
'value': info.get('ContainersStopped', 0),
'timestamp': timestamp
})
if self._should_collect_label('containers'):
metrics.append({
'name': 'system.containers.total',
'value': info.get('Containers', 0),
'timestamp': timestamp
})
metrics.append({
'name': 'system.containers.running',
'value': info.get('ContainersRunning', 0),
'timestamp': timestamp
})
metrics.append({
'name': 'system.containers.paused',
'value': info.get('ContainersPaused', 0),
'timestamp': timestamp
})
metrics.append({
'name': 'system.containers.stopped',
'value': info.get('ContainersStopped', 0),
'timestamp': timestamp
})
# Image count
metrics.append({
'name': 'system.images.total',
'value': info.get('Images', 0),
'timestamp': timestamp
})
if self._should_collect_label('images'):
metrics.append({
'name': 'system.images.total',
'value': info.get('Images', 0),
'timestamp': timestamp
})
# Parse docker system df -v output for detailed metrics
df_metrics = self._parse_docker_df()
metrics.extend(df_metrics)
if self._should_collect_label('storage'):
df_metrics = self._parse_docker_df()
metrics.extend(df_metrics)
return metrics
@@ -242,10 +250,16 @@ class SystemCollector(BaseCollector):
'name': 'system.volumes.count',
'value': volume_count
})
except Exception as e:
print(f"Warning: Error parsing volumes section: {e}")
print(f" Warning: Error parsing Local Volumes section: {e}")
return metrics
def _should_collect_label(self, label: str) -> bool:
"""Check if a specific label should be collected."""
if not self.config:
return True
return self.config.is_label_enabled('system', label)

View File

@@ -10,6 +10,7 @@ class VolumeCollector(BaseCollector):
"""Collects metrics from Docker volumes."""
def __init__(self):
super().__init__()
self.client = self.get_docker_client()
def get_name(self) -> str:
@@ -25,37 +26,58 @@ class VolumeCollector(BaseCollector):
metrics = []
timestamp = int(time.time())
# Check if volume metrics are enabled
if self.config and not self.config.collect_volume_metrics:
return metrics
volumes = self.client.volumes.list()
# Cache container list for volume lookups
cache_ttl = self.config.cache_ttl_seconds if self.config else 300
all_containers = self._get_cached('all_containers', cache_ttl)
if all_containers is None:
all_containers = self.client.containers.list(all=True)
self._set_cache('all_containers', all_containers)
for volume in volumes:
try:
volume_name = volume.name
# Count containers using this volume
containers_using = self._get_containers_using_volume(volume_name)
metrics.append({
'name': f'volumes.{volume_name}.container_count',
'value': len(containers_using),
'timestamp': timestamp
})
if self._should_collect_label('container_count'):
containers_using = self._get_containers_using_volume(volume_name, all_containers)
metrics.append({
'name': f'volumes.{volume_name}.container_count',
'value': len(containers_using),
'timestamp': timestamp
})
# Volume labels count
labels = volume.attrs.get('Labels', {}) or {}
metrics.append({
'name': f'volumes.{volume_name}.labels_count',
'value': len(labels),
'timestamp': timestamp
})
if self._should_collect_label('labels_count'):
labels = volume.attrs.get('Labels', {}) or {}
metrics.append({
'name': f'volumes.{volume_name}.labels_count',
'value': len(labels),
'timestamp': timestamp
})
except Exception as e:
print(f" Warning: Error collecting metrics for volume {volume.name}: {e}")
return metrics
def _get_containers_using_volume(self, volume_name: str) -> List[str]:
def _should_collect_label(self, label: str) -> bool:
"""Check if a specific label should be collected."""
if not self.config:
return True
return self.config.is_label_enabled('volume', label)
def _get_containers_using_volume(self, volume_name: str, all_containers=None) -> List[str]:
"""Find all containers using a specific volume."""
containers_using = []
all_containers = self.client.containers.list(all=True)
if all_containers is None:
all_containers = self.client.containers.list(all=True)
for container in all_containers:
mounts = container.attrs.get('Mounts', [])

103
src/config.py Normal file
View File

@@ -0,0 +1,103 @@
"""
Configuration management for Docker metrics collector.
"""
import os
from typing import Set
class CollectorConfig:
"""Configuration for metric collection behavior."""
def __init__(self):
# General settings
self.graphite_endpoint = os.getenv('GRAPHITE_ENDPOINT', 'http://localhost:2003')
self.graphite_prefix = os.getenv('GRAPHITE_PREFIX', 'docker-metrics')
self.interval_seconds = int(os.getenv('INTERVAL_SECONDS', '60'))
self.debug = os.getenv('DEBUG', 'false').lower() == 'true'
# Performance settings
self.parallel_collection = os.getenv('PARALLEL_COLLECTION', 'true').lower() == 'true'
self.max_workers = int(os.getenv('MAX_WORKERS', '4'))
# Cache settings
self.cache_ttl_seconds = int(os.getenv('CACHE_TTL_SECONDS', '300')) # 5 minutes default
# Label/metric filtering - disable specific metric types
self.collect_container_metrics = self._parse_bool('COLLECT_CONTAINER_METRICS', True)
self.collect_volume_metrics = self._parse_bool('COLLECT_VOLUME_METRICS', True)
self.collect_system_metrics = self._parse_bool('COLLECT_SYSTEM_METRICS', True)
self.collect_self_metrics = self._parse_bool('COLLECT_SELF_METRICS', True)
# Container metric labels - granular control
self.container_labels = self._parse_label_set('CONTAINER_LABELS', {
'cpu', 'memory', 'network', 'blkio', 'state', 'health', 'restart_count', 'disk'
})
# Volume metric labels
self.volume_labels = self._parse_label_set('VOLUME_LABELS', {
'container_count', 'labels_count'
})
# System metric labels
self.system_labels = self._parse_label_set('SYSTEM_LABELS', {
'containers', 'images', 'storage'
})
# Aggregation settings
self.enable_aggregations = self._parse_bool('ENABLE_AGGREGATIONS', True)
def _parse_bool(self, env_var: str, default: bool) -> bool:
"""Parse boolean environment variable."""
value = os.getenv(env_var, str(default)).lower()
return value in ('true', '1', 'yes', 'on')
def _parse_label_set(self, env_var: str, default: Set[str]) -> Set[str]:
"""
Parse comma-separated label list from environment.
Returns set of enabled labels.
"""
value = os.getenv(env_var, '')
if not value:
return default
# Split by comma and clean whitespace
labels = {label.strip().lower() for label in value.split(',') if label.strip()}
return labels if labels else default
def is_label_enabled(self, category: str, label: str) -> bool:
"""Check if a specific label is enabled for a category."""
label = label.lower()
if category == 'container':
return label in self.container_labels
elif category == 'volume':
return label in self.volume_labels
elif category == 'system':
return label in self.system_labels
return True
def print_config(self):
"""Print current configuration."""
print("="*60)
print("Configuration:")
print("="*60)
print(f"Graphite Endpoint: {self.graphite_endpoint}")
print(f"Metric Prefix: {self.graphite_prefix}")
print(f"Collection Interval: {self.interval_seconds}s")
print(f"Debug Mode: {self.debug}")
print(f"Parallel Collection: {self.parallel_collection}")
print(f"Max Workers: {self.max_workers}")
print(f"Cache TTL: {self.cache_ttl_seconds}s")
print()
print("Enabled Collectors:")
print(f" - Containers: {self.collect_container_metrics}")
print(f" - Volumes: {self.collect_volume_metrics}")
print(f" - System: {self.collect_system_metrics}")
print(f" - Self-metrics: {self.collect_self_metrics}")
print()
print("Container Labels:", ', '.join(sorted(self.container_labels)) or 'none')
print("Volume Labels:", ', '.join(sorted(self.volume_labels)) or 'none')
print("System Labels:", ', '.join(sorted(self.system_labels)) or 'none')
print("Aggregations:", self.enable_aggregations)
print("="*60)

View File

@@ -4,10 +4,12 @@ import sys
import time
import signal
from typing import List
from concurrent.futures import ThreadPoolExecutor, as_completed
from collectors import ContainerCollector, VolumeCollector, SystemCollector, SelfMetricsCollector
from exporters import GraphiteExporter, ConsoleExporter
from aggregator import MetricsAggregator
from config import CollectorConfig
class DockerMetricsCollector:
@@ -15,51 +17,56 @@ class DockerMetricsCollector:
def __init__(self):
self.running = True
self.config = self._load_config()
self.config = CollectorConfig()
# Initialize self-metrics collector first
self.self_metrics = SelfMetricsCollector()
self.self_metrics.set_config(self.config)
# Initialize collectors
self.collectors = [
ContainerCollector(),
VolumeCollector(),
SystemCollector(),
self.self_metrics # Include self-metrics in collection
]
# Initialize collectors based on config
self.collectors = []
if self.config.collect_container_metrics:
container_collector = ContainerCollector()
container_collector.set_config(self.config)
self.collectors.append(container_collector)
if self.config.collect_volume_metrics:
volume_collector = VolumeCollector()
volume_collector.set_config(self.config)
self.collectors.append(volume_collector)
if self.config.collect_system_metrics:
system_collector = SystemCollector()
system_collector.set_config(self.config)
self.collectors.append(system_collector)
if self.config.collect_self_metrics:
self.collectors.append(self.self_metrics)
# Initialize aggregator
self.aggregator = MetricsAggregator()
self.aggregator = MetricsAggregator(self.config)
# Initialize exporters
self.exporters = []
# Add Graphite exporter
if self.config['graphite_endpoint']:
if self.config.graphite_endpoint:
self.exporters.append(
GraphiteExporter(
endpoint=self.config['graphite_endpoint'],
prefix=self.config['graphite_prefix']
endpoint=self.config.graphite_endpoint,
prefix=self.config.graphite_prefix
)
)
# Add console exporter in debug mode
if self.config['debug']:
if self.config.debug:
self.exporters.append(ConsoleExporter(pretty_print=False))
# Setup signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _load_config(self) -> dict:
"""Load configuration from environment variables."""
return {
'graphite_endpoint': os.getenv('GRAPHITE_ENDPOINT', 'http://localhost:2003'),
'graphite_prefix': os.getenv('GRAPHITE_PREFIX', 'docker-metrics'),
'interval_seconds': int(os.getenv('INTERVAL_SECONDS', '60')),
'debug': os.getenv('DEBUG', 'false').lower() == 'true'
}
def _signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully."""
print(f"\nReceived signal {signum}, shutting down...")
@@ -70,11 +77,7 @@ class DockerMetricsCollector:
print("="*60)
print("Simple Docker Metrics Collector")
print("="*60)
print(f"Graphite Endpoint: {self.config['graphite_endpoint']}")
print(f"Metric Prefix: {self.config['graphite_prefix']}")
print(f"Collection Interval: {self.config['interval_seconds']}s")
print(f"Debug Mode: {self.config['debug']}")
print("="*60)
self.config.print_config()
# Test Graphite connection
if self.exporters:
@@ -92,18 +95,11 @@ class DockerMetricsCollector:
print(f"[Iteration {iteration}] Collecting metrics...")
start_time = time.time()
# Collect from all collectors
all_metrics = []
for collector in self.collectors:
collector_name = collector.get_name()
try:
metrics = collector.collect()
all_metrics.extend(metrics)
print(f" - {collector_name}: {len(metrics)} metrics")
self.self_metrics.record_collector_success(collector_name)
except Exception as e:
print(f" - {collector_name} error: {e}")
self.self_metrics.record_collector_error(collector_name)
# Collect from all collectors (parallel or sequential)
if self.config.parallel_collection and len(self.collectors) > 1:
all_metrics = self._collect_parallel()
else:
all_metrics = self._collect_sequential()
# Aggregate metrics
aggregated_metrics = self.aggregator.aggregate(all_metrics)
@@ -118,20 +114,23 @@ class DockerMetricsCollector:
for exporter in self.exporters:
try:
exporter.export(all_metrics)
self.self_metrics.record_export_success()
if self.config.collect_self_metrics:
self.self_metrics.record_export_success()
except Exception as e:
print(f" - Export error: {e}")
self.self_metrics.record_export_error()
if self.config.collect_self_metrics:
self.self_metrics.record_export_error()
elapsed = time.time() - start_time
# Record iteration metrics (excluding self-metrics from count to avoid recursion)
self.self_metrics.record_iteration(elapsed, len(all_metrics))
if self.config.collect_self_metrics:
self.self_metrics.record_iteration(elapsed, len(all_metrics))
print(f" Collection completed in {elapsed:.2f}s\n")
# Sleep until next iteration
sleep_time = max(0, self.config['interval_seconds'] - elapsed)
sleep_time = max(0, self.config.interval_seconds - elapsed)
if sleep_time > 0 and self.running:
time.sleep(sleep_time)
@@ -141,6 +140,52 @@ class DockerMetricsCollector:
time.sleep(10) # Brief pause before retrying
print("\nShutdown complete.")
def _collect_sequential(self) -> List:
"""Collect metrics sequentially from all collectors."""
all_metrics = []
for collector in self.collectors:
collector_name = collector.get_name()
try:
metrics = collector.collect()
all_metrics.extend(metrics)
print(f" - {collector_name}: {len(metrics)} metrics")
if self.config.collect_self_metrics:
self.self_metrics.record_collector_success(collector_name)
except Exception as e:
print(f" - {collector_name} error: {e}")
if self.config.collect_self_metrics:
self.self_metrics.record_collector_error(collector_name)
return all_metrics
def _collect_parallel(self) -> List:
"""Collect metrics in parallel from all collectors using ThreadPoolExecutor."""
all_metrics = []
with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
# Submit all collection tasks
future_to_collector = {
executor.submit(collector.collect): collector
for collector in self.collectors
}
# Gather results as they complete
for future in as_completed(future_to_collector):
collector = future_to_collector[future]
collector_name = collector.get_name()
try:
metrics = future.result()
all_metrics.extend(metrics)
print(f" - {collector_name}: {len(metrics)} metrics")
if self.config.collect_self_metrics:
self.self_metrics.record_collector_success(collector_name)
except Exception as e:
print(f" - {collector_name} error: {e}")
if self.config.collect_self_metrics:
self.self_metrics.record_collector_error(collector_name)
return all_metrics
def main():