VitalPBX Live Call Quality Monitor

This script monitors call quality on extensions and trunks when using VitalPBX PJSIP via SSH.

Real-Time Call Quality Monitoring for VitalPBX Multi-Tenant Systems

A powerful, efficient Python-based monitoring solution that provides instant visibility into call quality metrics across your entire VitalPBX infrastructure.

1. Real-Time Dashboard

System Health Display

  • Server Uptime: Current system uptime displayed in header
  • CPU Usage: Real-time CPU percentage with 1-minute load average
  • Memory Usage: Current memory utilization percentage
  • Active Call Count: Total active calls with breakdown by type (Inbound/Outbound/Extensions)
  • Logging Status: Visual indicator showing if call logging is enabled

Live Call Display

For each active PJSIP call, displays:

  • Call Duration: Live counter showing connection time (HH:MM:SS)
  • Codec Detection: Active codec in use (G722, G711/ulaw/alaw, G729, Opus, etc.)
  • Round Trip Time (RTT): Current and peak values in milliseconds
  • Jitter Metrics: TX and RX jitter in milliseconds
  • Packet Loss: TX and RX loss percentages with packet counts (lost/total)
  • Quality Score: Calculated health percentage (0-100%) – instant and average
  • MOS Score: Mean Opinion Score (1.0-4.5) – instant and average
  • Jitter Spikes: Count and percentage of samples exceeding threshold (JS: 5/100 (5%))

Visual Quality Indicators

  • 🟢 Green: Good quality (≥90% or configurable)
  • 🟡 Yellow: Warning level (70-89% or configurable)
  • 🔴 Red: Critical issues (<70% or configurable)
  • 🟠 Orange: Outbound call direction indicator
  • Color-coded metrics highlight problems instantly

Display Optimization

  • Flicker-Free Updates: Uses terminal double-buffering and alternate screen buffer
  • Configurable Refresh: Default 1-second updates (adjustable via config)
  • Configurable Call Limit: Shows up to N live calls (default 10) with “plus X more” indicator
  • Clean Layout: Separator lines, bold headers, dimmed secondary information

2. Intelligent Call Logic

Bridge Linking & Call Correlation

  • Automatic Bridge Matching: Correlates Trunks to Extensions by matching Bridge UUIDs
  • Local Channel Proxy Support: Handles bridge linking across Local channel proxies (common in Queues and Ring Groups)
  • Cross-Call Correlation: Extension and Trunk log entries share the same Bridge ID for easy matching

Direction Detection

  • Heuristic Analysis: Determines [Inbound] or [Outbound] based on:
    • Dial Context patterns (pstn, inbound, did, outbound, outrt, etc.)
    • Application type (Dial vs Queue vs AppDial)
    • Channel naming conventions
  • Persistent Direction Tracking: Stores detected direction per channel for consistency

Multi-Tenant Support

  • Tenant ID Resolution: Extracts tenant from context patterns (e.g., T2_, trk-2-)
  • Company Name Mapping: Resolves Tenant IDs to human-readable names via lookup file (/usr/local/etc/tenant-lookup.txt)
  • Extension-Based Tenant Detection: For trunk calls, looks up tenant from linked extension
  • Outbound Tenant Override: Correctly attributes outbound trunk calls to originating tenant

Call Filtering

  • IVR/System Call Exclusion: Automatically excludes non-real calls from logging:
    • Voicemail greetings and playbacks
    • IVR announcements
    • Music on hold
    • Parking lots, Conference bridges
    • Page/Intercom, System recordings
  • Validation Requirements: Only logs calls with valid bridge ID, minimum 2-second duration, real codec, and actual packet activity

3. Historical Statistics (On-Screen)

Insights Panel

Displays analytics for recent calls (configurable, default last 50):

  • Short Call Detection: Percentage of calls under 10 seconds with warning indicator
  • Short Call Leaders: Identifies extensions/tenants with most short calls
  • Top Outbound: Most active outbound caller by company
  • Top Inbound: Most active inbound destination by company
  • Codec Distribution: Breakdown of all codecs in use with counts
  • Loss Leaders: Extensions experiencing packet loss

Per-Category Statistics

Separate statistics for each call type:

  • Extension Calls: Internal/extension-to-extension calls
  • Inbound Trunks: Calls received from external sources
  • Outbound Trunks: Calls placed to external destinations

Each category shows:

  • Average Quality Score with color coding
  • Total call count
  • Average and total call duration
  • RTT statistics: Min, Max, and Average
  • Jitter statistics: TX and RX averages
  • Packet loss percentages

Worst 3 Calls Display

  • Automatic Identification: Lists the 3 lowest quality calls from recent history
  • Caller ID Lookup: For extension calls, displays the actual external caller ID (looked up from trunk via bridge ID)
  • Full Metrics Display: Shows time, quality, duration, codec, RTT, jitter spikes, loss percentages, and packet counts
  • Quick Troubleshooting: Helps spot recurring problem sources immediately

Fuzzy Matching & Self-Learning

  • Relations Table: Builds mapping of Bridge IDs to Extensions and Companies
  • CID Mapping: Associates caller IDs with company names for improved display
  • Time-Based Correlation: Matches calls by timestamp and CID when bridge linking unavailable

4. Logging & Alerting

Visual Alerting

  • ANSI Color Coding: Green/Yellow/Red based on configurable thresholds
  • Configurable Thresholds:
    • Warning Score: Default 90% (Yellow)
    • Critical Score: Default 70% (Red)
  • Instant Visibility: Problem calls stand out immediately in the display

Persistent Call Logging

When each call ends, comprehensive statistics are written to /var/log/vitalpbx_call_quality.log:

  • Timestamp and call direction ([Inbound]/[Outbound])
  • Extension number and Company name
  • Channel identifier
  • Caller ID and Bridge ID (for correlation)
  • Duration and Codec
  • Quality Score: Average and Minimum
  • Jitter Spikes: Count/Samples
  • Peak RTT value
  • Peak Jitter: TX and RX
  • Peak Packet Loss: TX and RX percentages
  • Packet Counts: TX lost/total, RX lost/total
  • Context and Application
  • Tenant ID
  • MOS Score: Average and Minimum

Log Sanitization

  • Pipe Character Removal: Cleans data to ensure machine-readable log format
  • ANSI Code Stripping: Removes color codes from logged names
  • Consistent Formatting: Structured format for easy parsing and analysis

Automatic Log Management

  • Line Limit: Trims log when exceeding MAX_LOG_LINES (default: 2000)
  • Age Limit: Removes entries older than LOG_RETENTION_HOURS (default: 12)
  • Whichever First: Both limits checked, earliest trigger wins
  • Periodic Check: Validates every 60 seconds to minimize disk I/O

5. Performance Optimization

Efficient Design

  • Pure Python: No external process spawning for calculations
  • Native String Operations: All parsing and math done in-memory
  • Minimal Dependencies: Uses only Python standard library
  • Low Memory Footprint: File-based tracking instead of memory accumulation

Smart Caching

  • Asterisk CLI Cache: pjsip show channelstats output cached for 2 seconds
  • Statistics Cache: Historical statistics recalculated only every 15 seconds
  • Direction/Tenant Persistence: Stored in files, loaded once per cycle
  • CPU Calculation Cache: Previous values stored for delta calculation

Automatic Cleanup

  • Tracking File Management: Old .track, .info, .direction, .tenant files cleaned when:
    • Count exceeds MAX_TRACKING_FILES (default: 100)
    • Age exceeds RETENTION_MINUTES (default: 120)
  • Call End Detection: Tracking files removed immediately when calls end
  • No Manual Maintenance: System is completely self-managing

Resource-Conscious Operation

  • Suitable for Busy Servers: Designed for production PBX environments
  • Non-Blocking Updates: Display updates don’t interfere with call processing
  • Graceful Degradation: Continues operation despite transient errors

6. Technical Specifications

SpecificationDetails
LanguagePython 3.6+
PlatformDebian/Ubuntu with VitalPBX
PBX IntegrationAsterisk CLI (PJSIP channels)
Refresh Rate1 second (configurable)
Memory UsageMinimal (file-based tracking)
Disk UsageSelf-managing (auto-cleanup)
DependenciesNone (Python standard library only)
TerminalColor support required
AccessRoot (for Asterisk CLI access)

7. Use Cases

  • NOC Monitoring: Real-time visibility into call quality across all tenants
  • Troubleshooting: Quickly identify problematic calls and recurring patterns
  • Capacity Planning: Track codec usage, call volumes, and peak times
  • SLA Compliance: Document and audit call quality metrics
  • Carrier Quality Assessment: Monitor trunk performance by direction
  • Endpoint Diagnostics: Identify extensions with consistent quality problems
  • Quality Trending: Historical log analysis for pattern detection

⚠️ Important Notes & Disclaimers

Status: This is an Experimental Beta Release. While unlikely to interfere with VitalPBX operations, please proceed with standard caution.

  • System Impact: Expect a ±1% increase in CPU usage while the script is actively running. This should not impact low-to-medium capacity systems.
  • Best Practice: Always create a system backup before deploying new software on your server.
  • Support: Please send questions or feedback via the contacts page.

Installation & Usage Guide

  1. Connect: Login to your VPBX server via SSH.
  2. Configure: Add the optional tenant-lookup.txt file (if required).
  3. Install: Add the required vitalpbx_monitor.py bash script to your directory.
  4. Permissions: Make the script executable using the chmod command.
  5. First Run (Setup): Execute the following command to initialize, which allows you to configure logging and refresh times:Bash python3 /usr/local/bin/vitalpbx_monitor.py

Logging Details

Purpose: This size prevents system slowdowns while providing enough data to copy/paste into AI tools for issue analysis.

File Location: /var/log/vitalpbx_call_quality.log

Size Limit: The log is default capped at 1000 entries (approx. 280kb). User configurable in config file.

Get started by adding the optional tenant lookup table.

nano /usr/local/etc/tenant-lookup.txt

Paste the code below into the optional tenant-lookup.txt and modify with your own tenant table.

# Tenant Lookup Table - OPTIONAL
# Format: tenant_number=Company Name
# Lines starting with # are comments

1=Default
2=Tesla Inc
3=Example Corp

# Add more tenants below:
nano /usr/local/bin/vitalpbx_monitor.py

Paste the code below into the vitalpbx_monitor.py file and continue at the very bottom.


#!/usr/bin/env python3
"""
VitalPBX Live Call Quality Monitor (v2.14 - Python itproexpert.com)
Converted from Bash to Python for improved efficiency

Changes in v1.14:
- Quality score now heavily penalizes packet loss (one-way audio situations)
- Log shows average quality only (removed Min quality)
- MOS displays current and average (not minimum)
- Loss leaders based on average quality percentage
- Added "Last 3 Recent Calls" section after statistics
- Improved packet loss detection for quality scoring
"""

import os
import sys
import re
import time
import subprocess
from pathlib import Path
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Dict, List, Tuple, Any
from collections import defaultdict

SCRIPT_DIR = Path(__file__).parent.resolve()
CONFIG_FILE = SCRIPT_DIR / "vitalpbx_monitor.conf"
TENANT_LOOKUP_FILE = Path("/usr/local/etc/tenant-lookup.txt")
CALL_TRACKING_DIR = Path("/tmp/vitalpbx_call_tracking")
CALL_LOG_FILE = Path("/var/log/vitalpbx_call_quality.log")
CPU_PREV_FILE = Path("/tmp/vitalpbx_cpu_prev")
CLI_CACHE_FILE = Path("/tmp/vitalpbx_cli_cache")
STATS_CACHE_FILE = Path("/tmp/vitalpbx_stats_cache")
STATS_TIMESTAMP_FILE = Path("/tmp/vitalpbx_stats_timestamp")

@dataclass
class Config:
    enable_logging: bool = True
    refresh_interval: int = 1
    warning_score: int = 90
    critical_score: int = 70
    retention_minutes: int = 120
    max_tracking_files: int = 100
    log_trim_lines: int = 800
    stats_fetch_lines: int = 50
    max_live_calls_display: int = 10
    max_log_lines: int = 2000
    log_retention_hours: int = 12

class Colors:
    GREEN = '\033[32m'
    RED = '\033[31m'
    YELLOW = '\033[33m'
    ORANGE = '\033[38;5;208m'
    CYAN = '\033[36m'
    WHITE = '\033[97m'
    GREY = '\033[90m'
    RESET = '\033[0m'
    BOLD = '\033[1m'
    DIM = '\033[2m'

@dataclass
class CallStats:
    samples: int = 0
    total_score: int = 0
    peak_rtt: float = 0
    peak_tx_jit: float = 0
    peak_rx_jit: float = 0
    peak_tx_pct: float = 0
    peak_rx_pct: float = 0
    min_score: int = 0
    sum_rtt: float = 0
    sum_jitter: float = 0
    start_time: int = 0
    high_jitter_count: int = 0
    max_tx_cnt: int = 0
    max_tx_lost: int = 0
    max_rx_cnt: int = 0
    max_rx_lost: int = 0
    sum_mos: float = 0
    min_mos: float = 4.5

    def to_string(self) -> str:
        return '|'.join(map(str, [
            self.samples, self.total_score, self.peak_rtt, self.peak_tx_jit,
            self.peak_rx_jit, self.peak_tx_pct, self.peak_rx_pct, self.min_score,
            self.sum_rtt, self.sum_jitter, self.start_time, self.high_jitter_count,
            self.max_tx_cnt, self.max_tx_lost, self.max_rx_cnt, self.max_rx_lost,
            self.sum_mos, self.min_mos
        ]))

    @classmethod
    def from_string(cls, data: str) -> 'CallStats':
        parts = data.strip().split('|')
        if len(parts) < 18:
            parts.extend(['0'] * (18 - len(parts)))
        try:
            return cls(
                samples=int(parts[0] or 0), total_score=int(parts[1] or 0),
                peak_rtt=float(parts[2] or 0), peak_tx_jit=float(parts[3] or 0),
                peak_rx_jit=float(parts[4] or 0), peak_tx_pct=float(parts[5] or 0),
                peak_rx_pct=float(parts[6] or 0), min_score=int(parts[7] or 0),
                sum_rtt=float(parts[8] or 0), sum_jitter=float(parts[9] or 0),
                start_time=int(parts[10] or 0), high_jitter_count=int(parts[11] or 0),
                max_tx_cnt=int(parts[12] or 0), max_tx_lost=int(parts[13] or 0),
                max_rx_cnt=int(parts[14] or 0), max_rx_lost=int(parts[15] or 0),
                sum_mos=float(parts[16] or 0),
                min_mos=float(parts[17] or 4.5) if parts[17] and float(parts[17]) > 0 else 4.5,
            )
        except (ValueError, IndexError):
            return cls()

@dataclass
class ChannelInfo:
    context: str = ""
    exten: str = ""
    caller_id: str = ""
    app: str = ""
    data: str = ""
    state: str = ""
    priority: str = ""
    bridge_id: str = ""

@dataclass
class ChannelMetrics:
    codec: str = "-"
    rtt_ms: float = 0.0
    tx_jit_ms: float = 0
    rx_jit_ms: float = 0
    tx_pct: float = 0
    rx_pct: float = 0
    rx_cnt: int = 0
    rx_lost: int = 0
    tx_cnt: int = 0
    tx_lost: int = 0

def load_config() -> Config:
    config = Config()
    if not CONFIG_FILE.exists():
        save_default_config()
    try:
        with open(CONFIG_FILE, 'r') as f:
            for line in f:
                line = line.strip()
                if not line or line.startswith('#'):
                    continue
                if '=' in line:
                    key, value = line.split('=', 1)
                    key = key.strip().lower()
                    value = value.strip()
                    if key == 'enable_logging': config.enable_logging = value.lower() == 'true'
                    elif key == 'refresh_interval': config.refresh_interval = int(value)
                    elif key == 'warning_score': config.warning_score = int(value)
                    elif key == 'critical_score': config.critical_score = int(value)
                    elif key == 'retention_minutes': config.retention_minutes = int(value)
                    elif key == 'max_tracking_files': config.max_tracking_files = int(value)
                    elif key == 'log_trim_lines': config.log_trim_lines = int(value)
                    elif key == 'stats_fetch_lines': config.stats_fetch_lines = int(value)
                    elif key == 'max_live_calls_display': config.max_live_calls_display = int(value)
                    elif key == 'max_log_lines': config.max_log_lines = int(value)
                    elif key == 'log_retention_hours': config.log_retention_hours = int(value)
    except Exception:
        pass
    return config

def save_default_config():
    content = """# VitalPBX Monitor Configuration
ENABLE_LOGGING=true
REFRESH_INTERVAL=1
WARNING_SCORE=90
CRITICAL_SCORE=70
LOG_TRIM_LINES=800
STATS_FETCH_LINES=50
RETENTION_MINUTES=120
MAX_TRACKING_FILES=100
MAX_LIVE_CALLS_DISPLAY=10
MAX_LOG_LINES=2000
LOG_RETENTION_HOURS=12
"""
    CONFIG_FILE.write_text(content)

def run_asterisk_cmd(cmd: str) -> str:
    try:
        result = subprocess.run(['asterisk', '-rx', cmd], capture_output=True, text=True, timeout=10)
        return result.stdout
    except Exception:
        return ""

def get_terminal_width() -> int:
    try: return os.get_terminal_size().columns
    except Exception: return 80

def parse_duration(duration_str: str) -> int:
    try:
        parts = duration_str.split(':')
        if len(parts) == 3:
            return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
    except Exception: pass
    return 0

def format_duration(seconds: int) -> str:
    return f"{seconds // 3600:02d}:{(seconds % 3600) // 60:02d}:{seconds % 60:02d}"

def safe_int(value: Any, default: int = 0) -> int:
    try:
        if isinstance(value, str): value = value.split('.')[0]
        return int(value)
    except (ValueError, TypeError): return default

def safe_float(value: Any, default: float = 0.0) -> float:
    try: return float(value)
    except (ValueError, TypeError): return default

class TenantManager:
    def __init__(self):
        self.tenant_map: Dict[str, str] = {}
        self._load_tenants()
    
    def _load_tenants(self):
        if TENANT_LOOKUP_FILE.exists():
            try:
                with open(TENANT_LOOKUP_FILE, 'r') as f:
                    for line in f:
                        line = line.strip()
                        if not line or line.startswith('#'): continue
                        if '=' in line:
                            tenant_num, company_name = line.split('=', 1)
                            self.tenant_map[tenant_num.strip()] = company_name.strip()
            except Exception: pass
    
    def get_name(self, tenant_num: str) -> str:
        return self.tenant_map.get(tenant_num, f"Tenant {tenant_num}")

class CallTracker:
    def __init__(self, config: Config):
        self.config = config
        self._last_log_trim_check = 0
        CALL_TRACKING_DIR.mkdir(parents=True, exist_ok=True)
        if not CALL_LOG_FILE.exists():
            CALL_LOG_FILE.touch()
            CALL_LOG_FILE.chmod(0o666)
    
    def cleanup_old_files(self):
        try:
            track_files = list(CALL_TRACKING_DIR.glob("*.track"))
            if len(track_files) > self.config.max_tracking_files:
                retention_secs = self.config.retention_minutes * 60
                current_time = time.time()
                for f in track_files:
                    if current_time - f.stat().st_mtime > retention_secs:
                        f.unlink(missing_ok=True)
                        f.with_suffix('.info').unlink(missing_ok=True)
        except Exception: pass
    
    def trim_log_file(self):
        current_time = time.time()
        if current_time - self._last_log_trim_check < 60: return
        self._last_log_trim_check = current_time
        if not CALL_LOG_FILE.exists(): return
        try:
            with open(CALL_LOG_FILE, 'r') as f: lines = f.readlines()
            if not lines: return
            original_count = len(lines)
            cutoff_time = datetime.now() - timedelta(hours=self.config.log_retention_hours)
            filtered_lines = []
            for line in lines:
                timestamp_match = re.match(r'^\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]', line)
                if timestamp_match:
                    try:
                        line_time = datetime.strptime(timestamp_match.group(1), '%Y-%m-%d %H:%M:%S')
                        if line_time >= cutoff_time: filtered_lines.append(line)
                    except ValueError: filtered_lines.append(line)
                else: filtered_lines.append(line)
            if len(filtered_lines) > self.config.max_log_lines:
                filtered_lines = filtered_lines[-self.config.max_log_lines:]
            if len(filtered_lines) < original_count:
                with open(CALL_LOG_FILE, 'w') as f: f.writelines(filtered_lines)
        except Exception: pass
    
    def get_track_file(self, channel: str) -> Path:
        return CALL_TRACKING_DIR / f"{channel.replace('/', '_')}.track"
    def get_info_file(self, channel: str) -> Path:
        return CALL_TRACKING_DIR / f"{channel.replace('/', '_')}.info"
    def get_direction_file(self, channel: str) -> Path:
        return CALL_TRACKING_DIR / f"{channel.replace('/', '_')}.direction"
    def get_tenant_file(self, channel: str) -> Path:
        return CALL_TRACKING_DIR / f"{channel.replace('/', '_')}.tenant"
    
    def load_stats(self, channel: str) -> CallStats:
        track_file = self.get_track_file(channel)
        if track_file.exists():
            try: return CallStats.from_string(track_file.read_text())
            except Exception: pass
        return CallStats()
    
    def save_stats(self, channel: str, stats: CallStats):
        self.get_track_file(channel).write_text(stats.to_string())
    
    def update_tracking(self, channel: str, rtt: float, tx_jit: float, rx_jit: float,
                        tx_pct: float, rx_pct: float, current_score: int,
                        tx_cnt: int, tx_lost: int, rx_cnt: int, rx_lost: int,
                        current_mos: float = 4.5):
        current_time = int(time.time())
        stats = self.load_stats(channel)
        if stats.samples == 0: stats.start_time = current_time
        call_duration = current_time - stats.start_time
        stats.samples += 1
        stats.total_score += current_score
        stats.sum_mos += current_mos
        if current_mos < stats.min_mos: stats.min_mos = current_mos
        stats.max_tx_cnt = max(stats.max_tx_cnt, tx_cnt)
        stats.max_tx_lost = max(stats.max_tx_lost, tx_lost)
        stats.max_rx_cnt = max(stats.max_rx_cnt, rx_cnt)
        stats.max_rx_lost = max(stats.max_rx_lost, rx_lost)
        if tx_jit > 70 or rx_jit > 70: stats.high_jitter_count += 1
        if stats.min_score == 0 or current_score < stats.min_score: stats.min_score = current_score
        if rtt > 0 and rtt > stats.peak_rtt:
            stats.peak_rtt = rtt
            stats.sum_rtt += rtt
        if call_duration > 3:
            if 0 < tx_jit <= 3000 and tx_jit > stats.peak_tx_jit: stats.peak_tx_jit = tx_jit
            if 0 < rx_jit <= 3000 and rx_jit not in (999, 1000) and rx_jit > stats.peak_rx_jit: stats.peak_rx_jit = rx_jit
        if tx_pct > stats.peak_tx_pct: stats.peak_tx_pct = tx_pct
        if rx_pct > stats.peak_rx_pct: stats.peak_rx_pct = rx_pct
        self.save_stats(channel, stats)
    
    def get_overall_quality(self, channel: str) -> int:
        stats = self.load_stats(channel)
        if stats.samples == 0: return 100
        avg_score = stats.total_score // stats.samples
        if stats.high_jitter_count > 0:
            jitter_pct = (stats.high_jitter_count * 100) // stats.samples
            if jitter_pct > 20: avg_score -= 25
            elif jitter_pct > 10: avg_score -= 15
            elif jitter_pct > 5: avg_score -= 10
            elif jitter_pct > 2: avg_score -= 5
        return max(0, avg_score)
    
    def save_call_info(self, channel: str, display_name: str, duration: str,
                       codec: str, context: str, bridge_id: str, caller_id: str,
                       direction: str, app: str, tenant: str):
        info_file = self.get_info_file(channel)
        clean_name = re.sub(r'\x1b\[[0-9;]*m', '', display_name).replace('|', '-')
        info = '|'.join([clean_name, duration.replace('|', '-'), codec.replace('|', '-'),
            context.replace('|', '-'), bridge_id.replace('|', '-'), caller_id.replace('|', '-'),
            direction, app.replace('|', '-'), tenant.replace('|', '-')])
        info_file.write_text(info)
    
    def log_call_completion(self, channel: str, display_name: str, duration: str,
                           codec: str, context: str, bridge_id: str, caller_id: str,
                           direction: str, app: str, tenant: str, tenant_mgr: TenantManager):
        if codec in ('-', '') or not self.config.enable_logging: return
        stats = self.load_stats(channel)
        if stats.samples == 0: return
        if not bridge_id or bridge_id.strip() == '': return
        dur_seconds = parse_duration(duration)
        if dur_seconds < 2: return
        skip_contexts = ('sub-extvm', 'vm-', 'voicemail', 'playback', 'ivr', 'announcement', 'moh', 'parking', 'page', 'intercom', 'conf-', 'meetme', 'spy', 'chanspy', 'pickup')
        context_lower = context.lower()
        if any(skip in context_lower for skip in skip_contexts): return
        skip_apps = ('playback', 'background', 'read', 'saydigits', 'saynumber', 'voicemail', 'voicemailmain', 'record', 'mixmonitor', 'chanspy', 'page', 'park', 'parkedcall')
        app_lower = app.lower()
        if any(skip_app == app_lower for skip_app in skip_apps): return
        if stats.max_tx_cnt == 0 and stats.max_rx_cnt == 0: return
        
        avg_quality = stats.total_score // stats.samples
        clean_name = re.sub(r'\x1b\[[0-9;]*m', '', display_name).replace('|', ' ')
        
        if "Extension" in display_name:
            ext_match = re.search(r'\[(\d+)\]', clean_name)
            ext_num = ext_match.group(1) if ext_match else "Unknown"
            comp_matches = re.findall(r'\[([a-zA-Z0-9 ]+)\]', clean_name)
            comp_name = next((m for m in comp_matches if m != "Extension" and m != ext_num), "Unknown")
            final_name = f"[Extension] [{ext_num}] [{comp_name}] {channel}"
        else:
            dir_fmt = {"inbound": "[Inbound]", "outbound": "[Outbound]"}.get(direction, "[Unknown]")
            comp_name = tenant_mgr.get_name(tenant)
            trunk_match = re.search(r'\[Trunk\] \[([^\]]+)', clean_name)
            if trunk_match: comp_name = trunk_match.group(1)
            final_name = f"[Trunk] {dir_fmt} [{comp_name}] {channel}"
        
        caller_id = caller_id.replace('|', '-')
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        avg_mos = stats.sum_mos / stats.samples if stats.samples > 0 else 4.5
        
        log_entry = (
            f"[{timestamp}] {final_name} | CID: {caller_id} | Bridge: {bridge_id} | "
            f"Dur: {duration} | Codec: {codec} | Quality: {avg_quality}% | "
            f"JitterSpikes: {stats.high_jitter_count}/{stats.samples} | RTT: {stats.peak_rtt}ms | "
            f"Jitter: TX {stats.peak_tx_jit}ms RX {stats.peak_rx_jit}ms | "
            f"Loss: TX {stats.peak_tx_pct}% RX {stats.peak_rx_pct}% | "
            f"Pkts: TX {stats.max_tx_lost}/{stats.max_tx_cnt} RX {stats.max_rx_lost}/{stats.max_rx_cnt} | "
            f"Ctx: {context} | App: {app} | Tenant: {tenant} | MOS: {avg_mos:.1f}\n"
        )
        with open(CALL_LOG_FILE, 'a') as f: f.write(log_entry)
    
    def check_ended_calls(self, active_channels: set, tenant_mgr: TenantManager):
        if not self.config.enable_logging: return
        active_normalized = set()
        for c in active_channels:
            normalized = c.replace("PJSIP/", "").replace("pjsip/", "").lower().replace('/', '_')
            active_normalized.add(normalized)
        for track_file in CALL_TRACKING_DIR.glob("*.track"):
            tracked_channel = track_file.stem.lower()
            if tracked_channel not in active_normalized:
                info_file = track_file.with_suffix('.info')
                if info_file.exists():
                    try:
                        parts = info_file.read_text().split('|')
                        if len(parts) >= 9:
                            self.log_call_completion(tracked_channel, parts[0], parts[1], parts[2],
                                parts[3], parts[4], parts[5], parts[6], parts[7], parts[8], tenant_mgr)
                    except Exception: pass
                    info_file.unlink(missing_ok=True)
                track_file.unlink(missing_ok=True)
                self.get_direction_file(tracked_channel).unlink(missing_ok=True)
                self.get_tenant_file(tracked_channel).unlink(missing_ok=True)

def calculate_quality_score(rtt: float, tx_jit: float, rx_jit: float, tx_pct: float, rx_pct: float,
                           tx_cnt: int = 0, tx_lost: int = 0, rx_cnt: int = 0, rx_lost: int = 0) -> int:
    """
    Calculate quality score (0-100) based on metrics.
    
    QUALITY FACTORS SUMMARY:
    ========================
    1. RTT (Round Trip Time) - Network latency
       - >300ms: -30 points, >200ms: -20, >150ms: -15, >100ms: -8
    2. Jitter (TX and RX averaged) - Packet timing variation
       - >200ms: -35 points, >100ms: -25, >70ms: -15, >50ms: -10, >30ms: -5
    3. Packet Loss Percentage (TX and RX averaged) - Lost audio data
       - >15%: -70 points, >10%: -60, >5%: -45, >3%: -30, >1%: -15, >0%: -8
    4. One-Way Audio Detection - Asymmetric packet loss
       - TX loss 100% with RX OK: -50 points (caller can't hear you)
       - RX loss 100% with TX OK: -50 points (you can't hear caller)
       - Severe asymmetric loss: -30 to -40 points
    5. High Jitter Spike Penalty (applied in get_overall_quality)
       - >20% of samples with jitter >70ms: -25, >10%: -15, >5%: -10, >2%: -5
    """
    score = 100
    rtt_int = safe_int(rtt)
    if rtt_int > 300: score -= 30
    elif rtt_int > 200: score -= 20
    elif rtt_int > 150: score -= 15
    elif rtt_int > 100: score -= 8
    
    avg_jitter = (safe_int(tx_jit) + safe_int(rx_jit)) // 2
    if avg_jitter > 200: score -= 35
    elif avg_jitter > 100: score -= 25
    elif avg_jitter > 70: score -= 15
    elif avg_jitter > 50: score -= 10
    elif avg_jitter > 30: score -= 5
    
    avg_loss = (safe_float(tx_pct) + safe_float(rx_pct)) / 2
    if avg_loss > 15: score -= 70
    elif avg_loss > 10: score -= 60
    elif avg_loss > 5: score -= 45
    elif avg_loss > 3: score -= 30
    elif avg_loss > 1: score -= 15
    elif avg_loss > 0: score -= 8
    
    # ONE-WAY AUDIO DETECTION
    if tx_pct >= 99 and rx_pct < 5: score -= 50
    elif rx_pct >= 99 and tx_pct < 5: score -= 50
    elif tx_pct >= 50 and rx_pct < 5: score -= 30
    elif rx_pct >= 50 and tx_pct < 5: score -= 30
    
    if tx_cnt > 100 and rx_cnt > 100:
        actual_tx_loss = (tx_lost / tx_cnt * 100) if tx_cnt > 0 else 0
        actual_rx_loss = (rx_lost / rx_cnt * 100) if rx_cnt > 0 else 0
        if actual_tx_loss > 80 and actual_rx_loss < 10: score -= 40
        elif actual_rx_loss > 80 and actual_tx_loss < 10: score -= 40
        elif actual_tx_loss > 50 and actual_rx_loss < 10: score -= 25
        elif actual_rx_loss > 50 and actual_tx_loss < 10: score -= 25
    
    return max(0, score)

def calculate_mos(rtt: float, tx_jitter: float, rx_jitter: float, tx_loss: float, rx_loss: float) -> float:
    jitter = max(tx_jitter, rx_jitter)
    loss = max(tx_loss, rx_loss)
    r_pen = 0
    if rtt > 200: r_pen = 0.8 + ((rtt - 200) / 100) * 0.5
    elif rtt > 150: r_pen = 0.5 + ((rtt - 150) / 50) * 0.3
    elif rtt > 100: r_pen = 0.2 + ((rtt - 100) / 50) * 0.3
    elif rtt > 50: r_pen = ((rtt - 50) / 50) * 0.2
    j_pen = 0
    if jitter > 100: j_pen = 1.0 + ((jitter - 100) / 100) * 0.5
    elif jitter > 50: j_pen = 0.5 + ((jitter - 50) / 50) * 0.5
    elif jitter > 30: j_pen = 0.2 + ((jitter - 30) / 20) * 0.3
    elif jitter > 10: j_pen = ((jitter - 10) / 20) * 0.2
    l_pen = 0
    if loss > 5: l_pen = 2.0 + ((loss - 5) / 5) * 1.0
    elif loss > 2: l_pen = 1.0 + ((loss - 2) / 3) * 1.0
    elif loss > 1: l_pen = 0.5 + ((loss - 1)) * 0.5
    elif loss > 0.5: l_pen = 0.2 + ((loss - 0.5) / 0.5) * 0.3
    elif loss > 0: l_pen = loss * 0.4
    return max(1.0, min(4.5, round(4.5 - r_pen - j_pen - l_pen, 1)))

def get_quality_display(score: int, config: Config) -> str:
    if score >= config.warning_score: return f"{Colors.GREEN}{score}%{Colors.RESET}"
    elif score >= config.critical_score: return f"{Colors.YELLOW}{score}%{Colors.RESET}"
    else: return f"{Colors.RED}{score}%{Colors.RESET}"

def get_cpu_usage() -> int:
    try:
        with open('/proc/stat', 'r') as f: line = f.readline()
        parts = line.split()
        user, nice, system, idle, iowait, irq, softirq, steal = map(int, parts[1:9])
        total = user + nice + system + idle + iowait + irq + softirq + steal
        idle_sum = idle + iowait
        if CPU_PREV_FILE.exists():
            prev_data = CPU_PREV_FILE.read_text().split()
            prev_total, prev_idle = int(prev_data[0]), int(prev_data[1])
            diff_total = total - prev_total
            diff_idle = idle_sum - prev_idle
            usage = 100 * (diff_total - diff_idle) // diff_total if diff_total > 0 else 0
        else: usage = 0
        CPU_PREV_FILE.write_text(f"{total} {idle_sum}")
        return usage
    except Exception: return 0

def get_memory_usage() -> Tuple[int, int, float]:
    try:
        result = subprocess.run(['free', '-m'], capture_output=True, text=True)
        lines = result.stdout.strip().split('\n')
        if len(lines) >= 2:
            parts = lines[1].split()
            total, used = int(parts[1]), int(parts[2])
            return used, total, (used / total) * 100 if total > 0 else 0
    except Exception: pass
    return 0, 0, 0.0

def get_load_average() -> Tuple[float, float]:
    try:
        with open('/proc/loadavg', 'r') as f: load_1min = float(f.read().split()[0])
        cores = os.cpu_count() or 1
        return load_1min, (load_1min / cores) * 100
    except Exception: return 0.0, 0.0

def get_uptime() -> str:
    try:
        result = subprocess.run(['uptime', '-p'], capture_output=True, text=True)
        return result.stdout.strip().replace('up ', '')
    except Exception: return "unknown"

class ChannelParser:
    def __init__(self, config: Config, tenant_mgr: TenantManager, tracker: CallTracker):
        self.config = config
        self.tenant_mgr = tenant_mgr
        self.tracker = tracker
        self.channel_info: Dict[str, ChannelInfo] = {}
        self.channel_stats: Dict[str, str] = {}
        self.channel_direction: Dict[str, str] = {}
        self.channel_tenant: Dict[str, str] = {}
        self.bridge_groups: Dict[str, List[str]] = defaultdict(list)
        self.local_link_map: Dict[str, str] = {}
    
    def load_persisted_state(self):
        for f in CALL_TRACKING_DIR.glob("*.direction"):
            self.channel_direction[f.stem] = f.read_text().strip()
        for f in CALL_TRACKING_DIR.glob("*.tenant"):
            self.channel_tenant[f.stem] = f.read_text().strip()
    
    def parse_channels_concise(self, raw_output: str) -> Dict[str, ChannelInfo]:
        local_chans_1, local_chans_2 = {}, {}
        for line in raw_output.strip().split('\n'):
            if not line: continue
            parts = line.split('!')
            if len(parts) < 8: continue
            chan, context, exten = parts[0], parts[1] if len(parts) > 1 else "", parts[2] if len(parts) > 2 else ""
            state, app, data = parts[4] if len(parts) > 4 else "", parts[5] if len(parts) > 5 else "", parts[6] if len(parts) > 6 else ""
            caller_id = parts[7] if len(parts) > 7 else ""
            bridge_id = ""
            for i in range(len(parts) - 1, -1, -1):
                if re.match(r'^[0-9a-f]{8}-[0-9a-f]{4}', parts[i]):
                    bridge_id = parts[i]
                    break
            if chan.startswith("Local/"):
                base_name = chan.split(';')[0]
                suffix = chan.split(';')[1] if ';' in chan else ""
                if suffix == "1": local_chans_1[base_name] = bridge_id
                elif suffix == "2": local_chans_2[base_name] = bridge_id
            if chan.startswith("PJSIP/"):
                chan_short = chan.replace("PJSIP/", "").lower()
                self.channel_info[chan_short] = ChannelInfo(context=context, exten=exten, caller_id=caller_id, app=app, data=data, state=state, bridge_id=bridge_id)
                if bridge_id: self.bridge_groups[bridge_id].append(chan_short)
                if re.match(r'^admin\d+-[0-9a-f]+|^[a-z][a-z0-9_-]+-[0-9a-f]+', chan_short):
                    detected_dir, detected_tenant = "", ""
                    tenant_match = re.search(r'^T(\d+)_|^trk-(\d+)-', context)
                    if tenant_match: detected_tenant = tenant_match.group(1) or tenant_match.group(2)
                    if re.search(r'(AppDial|Dial)', app) and "(Outgoing Line)" in data: detected_dir = "outbound"
                    elif re.search(r'(Queue|AppQueue)', app): detected_dir = "inbound"
                    if not detected_dir:
                        if re.search(r'(pstn|inbound|did|ivr|ext-group|ringgroups|ext-queues|sub-local-dialing)', context): detected_dir = "inbound"
                        elif re.search(r'(internal|outbound|outrt|macro-dialout)', context): detected_dir = "outbound"
                        elif re.search(r'trk-\d+-in', context): detected_dir = "outbound"
                    if detected_dir and chan_short not in self.channel_direction:
                        self.channel_direction[chan_short] = detected_dir
                        self.tracker.get_direction_file(chan_short).write_text(detected_dir)
                    if detected_tenant and chan_short not in self.channel_tenant:
                        self.channel_tenant[chan_short] = detected_tenant
                        self.tracker.get_tenant_file(chan_short).write_text(detected_tenant)
        for base in local_chans_1:
            b1, b2 = local_chans_1.get(base, ""), local_chans_2.get(base, "")
            if b1 and b2:
                self.local_link_map[b1] = b2
                self.local_link_map[b2] = b1
        return self.channel_info
    
    def get_all_linked_channels(self, bridge_id: str) -> List[str]:
        channels = list(self.bridge_groups.get(bridge_id, []))
        linked_bridge = self.local_link_map.get(bridge_id, "")
        if linked_bridge: channels.extend(self.bridge_groups.get(linked_bridge, []))
        return channels
    
    def get_effective_bridge_id(self, chan_short: str) -> str:
        info = self.channel_info.get(chan_short)
        if not info or not info.bridge_id: return ""
        bridge_id = info.bridge_id
        linked_bridge = self.local_link_map.get(bridge_id, "")
        if linked_bridge and re.match(r'^t\d+_\d+', chan_short):
            for lc in self.bridge_groups.get(linked_bridge, []):
                if re.match(r'^admin\d+-[0-9a-f]+|^[a-z][a-z0-9_-]+-[0-9a-f]+', lc): return linked_bridge
        return bridge_id
    
    def get_tenant_from_linked_extension(self, chan_short: str) -> str:
        info = self.channel_info.get(chan_short)
        if not info or not info.bridge_id: return ""
        for linked_chan in self.get_all_linked_channels(info.bridge_id):
            if linked_chan == chan_short: continue
            ext_match = re.match(r'^t(\d+)_(\d+)', linked_chan)
            if ext_match: return ext_match.group(1)
        return ""
    
    def parse_channel_stats(self, raw_output: str):
        for line in raw_output.strip().split('\n'):
            if not line: continue
            parts = line.split()
            if len(parts) < 4: continue
            col1, col2 = parts[0], parts[1] if len(parts) > 1 else ""
            chan_id = col1 if re.match(r'^\d{2}:\d{2}:\d{2}$', col2) else col2
            if chan_id == "ChannelId" or not chan_id: continue
            self.channel_stats[chan_id.lower()] = line
    
    def get_metrics_for_channel(self, chan_short: str, bridge_id: str) -> ChannelMetrics:
        trunc_key = chan_short[:18].lower()
        stat_line = self.channel_stats.get(f"{bridge_id}|{trunc_key}") or self.channel_stats.get(trunc_key)
        metrics = ChannelMetrics()
        if not stat_line: return metrics
        parts = stat_line.split()
        col2 = parts[1] if len(parts) > 1 else ""
        try:
            if re.match(r'^\d{2}:\d{2}:\d{2}$', col2):
                metrics.rx_jit_ms = safe_float(parts[6]) if len(parts) > 6 else 0
                metrics.tx_jit_ms = safe_float(parts[10]) * 1000 if len(parts) > 10 else 0
                metrics.rtt_ms = safe_float(parts[11]) * 1000 if len(parts) > 11 else 0
                metrics.tx_pct = safe_float(parts[9]) if len(parts) > 9 else 0
                metrics.rx_pct = safe_float(parts[5]) if len(parts) > 5 else 0
                metrics.codec = parts[2] if len(parts) > 2 else "-"
                metrics.rx_cnt = safe_int(parts[3]) if len(parts) > 3 else 0
                metrics.rx_lost = safe_int(parts[4]) if len(parts) > 4 else 0
                metrics.tx_cnt = safe_int(parts[7]) if len(parts) > 7 else 0
                metrics.tx_lost = safe_int(parts[8]) if len(parts) > 8 else 0
            else:
                metrics.rx_jit_ms = safe_float(parts[7]) if len(parts) > 7 else 0
                metrics.tx_jit_ms = safe_float(parts[11]) * 1000 if len(parts) > 11 else 0
                metrics.rtt_ms = safe_float(parts[12]) * 1000 if len(parts) > 12 else 0
                metrics.tx_pct = safe_float(parts[10]) if len(parts) > 10 else 0
                metrics.rx_pct = safe_float(parts[6]) if len(parts) > 6 else 0
                metrics.codec = parts[3] if len(parts) > 3 else "-"
                metrics.rx_cnt = safe_int(parts[4]) if len(parts) > 4 else 0
                metrics.rx_lost = safe_int(parts[5]) if len(parts) > 5 else 0
                metrics.tx_cnt = safe_int(parts[8]) if len(parts) > 8 else 0
                metrics.tx_lost = safe_int(parts[9]) if len(parts) > 9 else 0
        except (IndexError, ValueError): pass
        return metrics

class StatisticsDisplay:
    def __init__(self, config: Config, tenant_mgr: TenantManager):
        self.config = config
        self.tenant_mgr = tenant_mgr
        self.relations_table: Dict[str, str] = {}
        self.global_cid_map: Dict[str, str] = {}
    
    def build_relations_map(self, log_chunk: str):
        for line in log_chunk.split('\n'):
            if "[Extension]" not in line: continue
            bridge_match = re.search(r'Bridge: ([^ |]+)', line)
            b_id = bridge_match.group(1) if bridge_match else ""
            ext_match = re.search(r'\[Extension\] .*?\[(\d+)\]', line)
            e_num = ext_match.group(1) if ext_match else ""
            comp_match = re.search(r'\[Extension\] \[\d+\] \[([^\]]+)\]', line)
            comp = comp_match.group(1) if comp_match else ""
            cid_match = re.search(r'CID: ([^|]+)', line)
            cid = cid_match.group(1).strip().replace('|', '-') if cid_match else ""
            match_cid = cid.replace('+', '')
            if match_cid.startswith('44'): match_cid = '0' + match_cid[2:]
            if b_id and e_num: self.relations_table[b_id] = f"{comp}|{e_num}"
            if match_cid and comp: self.global_cid_map[match_cid] = comp
    
    def _lookup_cid_by_bridge(self, bridge_id: str, all_calls: List[str], exclude_line: str) -> str:
        if not bridge_id: return ""
        for line in all_calls:
            if line == exclude_line or "[Trunk]" not in line: continue
            bridge_match = re.search(r'Bridge: ([^ |]+)', line)
            if bridge_match and bridge_match.group(1) == bridge_id:
                cid_match = re.search(r'CID: ([^|]+)', line)
                if cid_match: return cid_match.group(1).strip()
        return ""
    
    def calculate_stats(self, call_data: List[str], section_title: str, all_calls: List[str] = None) -> List[str]:
        output = []
        count, sum_quality, min_rtt, max_rtt, sum_rtt = 0, 0, 999999, 0, 0
        sum_tx_jit, sum_rx_jit, sum_tx_loss, sum_rx_loss, total_duration_sec = 0, 0, 0, 0, 0
        worst_calls = []
        if all_calls is None: all_calls = call_data
        
        for line in call_data:
            if not line or "Quality:" not in line: continue
            codec_match = re.search(r'Codec: ([^|]+)', line)
            codec = codec_match.group(1).strip() if codec_match else "-"
            if codec == "-": continue
            count += 1
            rtt = safe_int(re.search(r'RTT: ([\d.]+)', line).group(1) if re.search(r'RTT: ([\d.]+)', line) else "0")
            tx_jit = safe_int(re.search(r'Jitter: TX ([\d.]+)', line).group(1) if re.search(r'Jitter: TX ([\d.]+)', line) else "0")
            rx_jit = safe_int(re.search(r'Jitter:.*RX ([\d.]+)', line).group(1) if re.search(r'Jitter:.*RX ([\d.]+)', line) else "0")
            tx_loss = safe_int(re.search(r'Loss: TX ([\d.]+)', line).group(1) if re.search(r'Loss: TX ([\d.]+)', line) else "0")
            rx_loss = safe_int(re.search(r'Loss:.*RX ([\d.]+)', line).group(1) if re.search(r'Loss:.*RX ([\d.]+)', line) else "0")
            dur_match = re.search(r'Dur: ([\d:]+)', line)
            duration = dur_match.group(1) if dur_match else "00:00:00"
            qual_match = re.search(r'Quality: (\d+)', line)
            quality = safe_int(qual_match.group(1) if qual_match else "100")
            sum_quality += quality
            dur_sec = parse_duration(duration)
            total_duration_sec += dur_sec
            sum_rtt += rtt
            if 0 < rtt < min_rtt: min_rtt = rtt
            if rtt > max_rtt: max_rtt = rtt
            sum_tx_jit += tx_jit
            sum_rx_jit += rx_jit
            sum_tx_loss += tx_loss
            sum_rx_loss += rx_loss
            worst_calls.append({'quality': quality, 'line': line, 'duration': duration, 'codec': codec, 'rtt': rtt, 'tx_loss': tx_loss, 'rx_loss': rx_loss})
        
        if count == 0: return output
        avg_quality = sum_quality // count
        avg_rtt = sum_rtt // count
        avg_tx_jit = sum_tx_jit // count
        avg_rx_jit = sum_rx_jit // count
        avg_tx_loss = sum_tx_loss // count
        avg_rx_loss = sum_rx_loss // count
        total_duration = format_duration(total_duration_sec)
        avg_dur_sec = total_duration_sec // count
        avg_dur_fmt = format_duration(avg_dur_sec)
        if min_rtt == 999999: min_rtt = 0
        
        q_col = Colors.GREEN if avg_quality >= self.config.warning_score else Colors.RED
        avg_dur_color = Colors.RED if avg_dur_sec < 10 else Colors.RESET
        output.append(f"{Colors.BOLD}{section_title:<20}{Colors.RESET} Avg Qual: {q_col}{avg_quality}%{Colors.RESET} | Calls: {count} | Avg Dur: {avg_dur_color}{avg_dur_fmt}{Colors.RESET} | Tot Dur: {total_duration}")
        output.append(f"{Colors.DIM}Stats:{Colors.RESET} RTT: {min_rtt}-{max_rtt}ms (Avg {avg_rtt}) | Jitter: TX {avg_tx_jit} RX {avg_rx_jit} (Avg {(avg_tx_jit + avg_rx_jit) // 2}) | Loss: TX {avg_tx_loss}% RX {avg_rx_loss}%")
        
        worst_calls.sort(key=lambda x: x['quality'])
        for i, call in enumerate(worst_calls[:3]):
            line = call['line']
            raw_name = re.sub(r'^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\] ', '', line.split('|')[0])
            time_match = re.search(r'^\[[\d-]+ (\d{2}:\d{2}:\d{2})', line)
            time_val = time_match.group(1) if time_match else ""
            bridge_match = re.search(r'Bridge: ([^ |]+)', line)
            bridge_id = bridge_match.group(1) if bridge_match else ""
            cid_match = re.search(r'CID: ([^|]+)', line)
            raw_cid = cid_match.group(1).strip() if cid_match else ""
            if "[Extension]" in line and raw_cid and re.match(r'^\d{3,5}$', raw_cid):
                trunk_cid = self._lookup_cid_by_bridge(bridge_id, all_calls, line)
                if trunk_cid: raw_cid = trunk_cid
            js_match = re.search(r'JitterSpikes: ([^| ]+)', line)
            js_val = js_match.group(1) if js_match else "0/0"
            tx_pkts_match = re.search(r'Pkts: TX ([^ ]+)', line)
            tx_pkts = tx_pkts_match.group(1) if tx_pkts_match else "0/0"
            rx_pkts_match = re.search(r'Pkts:.*RX ([^ |]+)', line)
            rx_pkts = rx_pkts_match.group(1) if rx_pkts_match else "0/0"
            if "[Extension]" in raw_name:
                ext_match = re.search(r'\[(\d+)\]', raw_name)
                ext_n = ext_match.group(1) if ext_match else ""
                comp_matches = re.findall(r'\[([a-zA-Z0-9 ]+)\]', raw_name)
                comp = next((m for m in comp_matches if m != "Extension" and m != ext_n), "")
                name_part = f"{comp} (Ext {ext_n})" if comp else f"Ext {ext_n}"
            elif "[Trunk]" in raw_name:
                comp_match = re.search(r'\[Trunk\] \[[^\]]+\] \[([^\]]+)', raw_name)
                name_part = comp_match.group(1) if comp_match else "Trunk Call"
            else: name_part = raw_name[:20]
            quality_display = get_quality_display(call['quality'], self.config)
            output.append(f"  {i+1}. {name_part[:35]:<35} ({raw_cid[:13]:<13}) T:{time_val} Q:{quality_display} D:{call['duration']} C:{call['codec']} RTT:{call['rtt']}ms JS:{js_val} L:{call['tx_loss']}/{call['rx_loss']}% TX {tx_pkts} RX {rx_pkts}")
        return output
    
    def get_recent_calls_section(self, recent_lines: List[str], all_calls: List[str]) -> List[str]:
        output = []
        recent_3 = recent_lines[-3:][::-1]
        if not recent_3: return output
        output.append("")
        output.append(f"{Colors.BOLD}Last 3 Recent Calls{Colors.RESET}")
        for i, line in enumerate(recent_3):
            if not line or "Quality:" not in line: continue
            raw_name = re.sub(r'^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\] ', '', line.split('|')[0])
            time_match = re.search(r'^\[[\d-]+ (\d{2}:\d{2}:\d{2})', line)
            time_val = time_match.group(1) if time_match else ""
            bridge_match = re.search(r'Bridge: ([^ |]+)', line)
            bridge_id = bridge_match.group(1) if bridge_match else ""
            cid_match = re.search(r'CID: ([^|]+)', line)
            raw_cid = cid_match.group(1).strip() if cid_match else ""
            if "[Extension]" in line and raw_cid and re.match(r'^\d{3,5}$', raw_cid):
                trunk_cid = self._lookup_cid_by_bridge(bridge_id, all_calls, line)
                if trunk_cid: raw_cid = trunk_cid
            dur_match = re.search(r'Dur: ([\d:]+)', line)
            duration = dur_match.group(1) if dur_match else "00:00:00"
            codec_match = re.search(r'Codec: ([^|]+)', line)
            codec = codec_match.group(1).strip() if codec_match else "-"
            qual_match = re.search(r'Quality: (\d+)', line)
            quality = safe_int(qual_match.group(1) if qual_match else "100")
            rtt_match = re.search(r'RTT: ([\d.]+)', line)
            rtt = safe_int(rtt_match.group(1) if rtt_match else "0")
            js_match = re.search(r'JitterSpikes: ([^| ]+)', line)
            js_val = js_match.group(1) if js_match else "0/0"
            tx_loss_match = re.search(r'Loss: TX ([\d.]+)', line)
            tx_loss = safe_int(tx_loss_match.group(1) if tx_loss_match else "0")
            rx_loss_match = re.search(r'Loss:.*RX ([\d.]+)', line)
            rx_loss = safe_int(rx_loss_match.group(1) if rx_loss_match else "0")
            tx_pkts_match = re.search(r'Pkts: TX ([^ ]+)', line)
            tx_pkts = tx_pkts_match.group(1) if tx_pkts_match else "0/0"
            rx_pkts_match = re.search(r'Pkts:.*RX ([^ |]+)', line)
            rx_pkts = rx_pkts_match.group(1) if rx_pkts_match else "0/0"
            if "[Extension]" in raw_name:
                ext_match = re.search(r'\[(\d+)\]', raw_name)
                ext_n = ext_match.group(1) if ext_match else ""
                comp_matches = re.findall(r'\[([a-zA-Z0-9 ]+)\]', raw_name)
                comp = next((m for m in comp_matches if m != "Extension" and m != ext_n), "")
                name_part = f"{comp} (Ext {ext_n})" if comp else f"Ext {ext_n}"
                call_type = "EXT"
            elif "[Trunk]" in raw_name:
                call_type = "IN" if "[Inbound]" in raw_name else "OUT" if "[Outbound]" in raw_name else "TRK"
                comp_match = re.search(r'\[Trunk\] \[[^\]]+\] \[([^\]]+)', raw_name)
                name_part = comp_match.group(1) if comp_match else "Trunk Call"
            else:
                name_part = raw_name[:20]
                call_type = "???"
            quality_display = get_quality_display(quality, self.config)
            output.append(f"  {i+1}. [{call_type}] {name_part[:30]:<30} ({raw_cid[:13]:<13}) T:{time_val} Q:{quality_display} D:{duration} C:{codec} RTT:{rtt}ms JS:{js_val} L:{tx_loss}/{rx_loss}% TX {tx_pkts} RX {rx_pkts}")
        return output
    
    def get_advanced_stats(self, recent_calls: List[str]) -> List[str]:
        output = []
        total_recent = len(recent_calls)
        short_count = 0
        short_call_map: Dict[str, int] = defaultdict(int)
        for line in recent_calls:
            dur_match = re.search(r'Dur: ([\d:]+)', line)
            if dur_match:
                dur_sec = parse_duration(dur_match.group(1))
                if dur_sec < 10:
                    short_count += 1
                    if "[Extension]" in line:
                        ext_match = re.search(r'\[Extension\] \[(\d+)', line)
                        t_name = ext_match.group(1) if ext_match else "Unknown Ext"
                    else:
                        comp_match = re.search(r'\[Trunk\] \[[^\]]+\] \[([^\]]+)', line)
                        t_name = comp_match.group(1) if comp_match else "Unknown"
                    short_call_map[t_name] += 1
        short_pct = (short_count * 100) // total_recent if total_recent > 0 else 0
        short_color = Colors.YELLOW if short_pct > 20 else Colors.GREEN
        
        outbound_counts: Dict[str, int] = defaultdict(int)
        inbound_counts: Dict[str, int] = defaultdict(int)
        for line in recent_calls:
            if "[Outbound]" in line:
                match = re.search(r'\[Trunk\] \[Outbound\] \[([^\]]+)', line)
                if match: outbound_counts[match.group(1)] += 1
            elif "[Inbound]" in line:
                match = re.search(r'\[Trunk\] \[Inbound\] \[([^\]]+)', line)
                if match: inbound_counts[match.group(1)] += 1
        top_out = max(outbound_counts.items(), key=lambda x: x[1], default=("None", 0))
        top_in = max(inbound_counts.items(), key=lambda x: x[1], default=("None", 0))
        out_fmt = f"{top_out[0]} ({top_out[1]})" if top_out[1] > 0 else "None"
        in_fmt = f"{top_in[0]} ({top_in[1]})" if top_in[1] > 0 else "None"
        
        codec_counts: Dict[str, int] = defaultdict(int)
        for line in recent_calls:
            codec_match = re.search(r'Codec: (\w+)', line)
            if codec_match:
                codec = codec_match.group(1).lower()
                if codec in ('ulaw', 'alaw'): codec_counts['G711'] += 1
                elif codec == 'g722': codec_counts['G722'] += 1
                elif codec == 'g729': codec_counts['G729'] += 1
                elif codec == 'opus': codec_counts['Opus'] += 1
                elif codec not in ('-', ''): codec_counts[codec.upper()] += 1
        codec_parts = [f"{c}:{n}" for c, n in sorted(codec_counts.items(), key=lambda x: x[1], reverse=True) if n > 0]
        codec_str = " ".join(codec_parts) if codec_parts else "None"
        
        output.append(f"{Colors.BOLD}Insights (Last {len(recent_calls)}):{Colors.RESET} Short Calls: {short_color}{short_pct}%{Colors.RESET} | Top Outbound: {Colors.CYAN}{out_fmt}{Colors.RESET} | Top Inbound: {Colors.CYAN}{in_fmt}{Colors.RESET} | Codecs: {codec_str}")
        if short_count > 0:
            leaders = sorted(short_call_map.items(), key=lambda x: x[1], reverse=True)[:2]
            output.append(f"{Colors.DIM}Short Call Leaders:{Colors.RESET} {', '.join(f'{k} ({v})' for k, v in leaders)}")
        
        # Quality leaders - based on average quality percentage
        loss_list = []
        for line in recent_calls:
            qual_match = re.search(r'Quality: (\d+)', line)
            quality = safe_int(qual_match.group(1) if qual_match else "100")
            if quality < 90:
                if "[Extension]" in line:
                    ext_match = re.search(r'\[Extension\] \[(\d+)\] \[([^\]]+)', line)
                    if ext_match: loss_list.append((quality, f"{ext_match.group(2)} (Ext {ext_match.group(1)}) [{quality}%]"))
                elif "[Trunk]" in line:
                    comp_match = re.search(r'\[Trunk\] \[[^\]]+\] \[([^\]]+)', line)
                    if comp_match: loss_list.append((quality, f"{comp_match.group(1)} [{quality}%]"))
        loss_list.sort(key=lambda x: x[0])
        if loss_list: output.append(f"{Colors.DIM}Quality Leaders (Lowest):{Colors.RESET} {', '.join(x[1] for x in loss_list[:3])}")
        else: output.append(f"{Colors.DIM}Quality Leaders:{Colors.RESET} None (all calls ≥90%)")
        return output
    
    def get_log_statistics(self) -> List[str]:
        output = []
        if not CALL_LOG_FILE.exists() or CALL_LOG_FILE.stat().st_size == 0: return output
        try:
            with open(CALL_LOG_FILE, 'r') as f: lines = f.readlines()
        except Exception: return output
        log_chunk = lines[-self.config.log_trim_lines:] if len(lines) > self.config.log_trim_lines else lines
        self.build_relations_map(''.join(log_chunk))
        recent_chunk = log_chunk[-self.config.stats_fetch_lines:] if len(log_chunk) > self.config.stats_fetch_lines else log_chunk
        recent_lines = [l.strip() for l in recent_chunk if l.strip()]
        ext_calls = [l for l in recent_lines if "[Extension]" in l]
        inbound_calls = [l for l in recent_lines if "[Trunk]" in l and "[Inbound]" in l]
        outbound_calls = [l for l in recent_lines if "[Trunk]" in l and "[Outbound]" in l]
        output.append(f"{Colors.BOLD}   CALL STATISTICS (Worst 3 Calls from Last {self.config.stats_fetch_lines} Calls){Colors.RESET}")
        output.extend(self.get_advanced_stats(recent_lines))
        width = get_terminal_width()
        output.append(f"{Colors.CYAN}{'-' * width}{Colors.RESET}")
        if ext_calls: output.extend(self.calculate_stats(ext_calls, "Extension Calls", recent_lines))
        output.append("")
        if inbound_calls: output.extend(self.calculate_stats(inbound_calls, "Inbound Trunks", recent_lines))
        output.append("")
        if outbound_calls: output.extend(self.calculate_stats(outbound_calls, "Outbound Trunks", recent_lines))
        output.extend(self.get_recent_calls_section(recent_lines, recent_lines))
        return output

class VitalPBXMonitor:
    def __init__(self):
        self.config = load_config()
        self.tenant_mgr = TenantManager()
        self.tracker = CallTracker(self.config)
        self.parser = ChannelParser(self.config, self.tenant_mgr, self.tracker)
        self.stats_display = StatisticsDisplay(self.config, self.tenant_mgr)
    
    def run_single_update(self):
        self.tracker.cleanup_old_files()
        self.tracker.trim_log_file()
        self.parser.load_persisted_state()
        channels_concise = run_asterisk_cmd("core show channels concise")
        current_ts = int(time.time())
        if CLI_CACHE_FILE.exists() and current_ts - int(CLI_CACHE_FILE.stat().st_mtime) < 2:
            stats_raw = CLI_CACHE_FILE.read_text()
        else:
            stats_raw = run_asterisk_cmd("pjsip show channelstats")
            CLI_CACHE_FILE.write_text(stats_raw)
        self.parser.parse_channels_concise(channels_concise)
        self.parser.parse_channel_stats(stats_raw)
        active_channels = set()
        for line in channels_concise.split('\n'):
            if line.startswith("PJSIP/"):
                active_channels.add(line.split('!')[0].replace("PJSIP/", "").lower())
        self.tracker.check_ended_calls(active_channels, self.tenant_mgr)
        self.display_output(channels_concise)
    
    def display_output(self, channels_concise: str):
        output_buffer = []
        width = get_terminal_width()
        sep_line = '-' * width
        uptime = get_uptime()
        cpu = get_cpu_usage()
        _, load_pct = get_load_average()
        _, _, mem_pct = get_memory_usage()
        active_count = sum(1 for l in channels_concise.split('\n') if l.startswith("PJSIP/"))
        logging_status = f"{Colors.GREEN}Logging Enabled{Colors.RESET}" if self.config.enable_logging else f"{Colors.YELLOW}Logging Disabled{Colors.RESET}"
        
        output_buffer.append(f"{Colors.CYAN}{sep_line}{Colors.RESET}")
        output_buffer.append(f"{Colors.BOLD}    LIVE CALL QUALITY MONITOR (v1.14 Python){Colors.RESET}")
        output_buffer.append(f"   {Colors.WHITE}Uptime:{Colors.RESET} {uptime}    |    {Colors.WHITE}Active Calls:{Colors.RESET} {active_count}    |    {logging_status}")
        output_buffer.append(f"   {Colors.WHITE}CPU:{Colors.RESET} {cpu}% [1min: {load_pct:.1f}%]    |    {Colors.WHITE}Memory:{Colors.RESET} {mem_pct:.1f}%")
        output_buffer.append(f"{Colors.CYAN}{sep_line}{Colors.RESET}")
        
        current_time = int(time.time())
        last_update = 0
        if STATS_TIMESTAMP_FILE.exists():
            try: last_update = int(STATS_TIMESTAMP_FILE.read_text().strip())
            except Exception: pass
        if current_time - last_update > 15 or not STATS_CACHE_FILE.exists():
            stats_output = self.stats_display.get_log_statistics()
            STATS_CACHE_FILE.write_text('\n'.join(stats_output))
            STATS_TIMESTAMP_FILE.write_text(str(current_time))
        if STATS_CACHE_FILE.exists():
            output_buffer.append(STATS_CACHE_FILE.read_text())
            output_buffer.append(f"{Colors.CYAN}{sep_line}{Colors.RESET}")
        
        live_buffer = []
        live_cnt_in, live_cnt_out, live_cnt_ext = 0, 0, 0
        
        for line in channels_concise.split('\n'):
            if not line.startswith("PJSIP/"): continue
            parts = line.split('!')
            chan = parts[0]
            chan_short = chan.replace("PJSIP/", "").lower()
            info = self.parser.channel_info.get(chan_short, ChannelInfo())
            duration_sec = 0
            for i in range(len(parts) - 1, -1, -1):
                if re.match(r'^[0-9a-f]{8}-[0-9a-f]{4}', parts[i]):
                    try: duration_sec = int(parts[i-1])
                    except (ValueError, IndexError): pass
                    break
            duration = format_duration(duration_sec)
            metrics = self.parser.get_metrics_for_channel(chan_short, info.bridge_id)
            quality_score = calculate_quality_score(metrics.rtt_ms, metrics.tx_jit_ms, metrics.rx_jit_ms,
                metrics.tx_pct, metrics.rx_pct, metrics.tx_cnt, metrics.tx_lost, metrics.rx_cnt, metrics.rx_lost)
            current_mos = calculate_mos(metrics.rtt_ms, metrics.tx_jit_ms, metrics.rx_jit_ms, metrics.tx_pct, metrics.rx_pct)
            safe_chan = chan_short.replace('/', '_')
            self.tracker.update_tracking(safe_chan, metrics.rtt_ms, metrics.tx_jit_ms, metrics.rx_jit_ms,
                metrics.tx_pct, metrics.rx_pct, quality_score, metrics.tx_cnt, metrics.tx_lost, metrics.rx_cnt, metrics.rx_lost, current_mos)
            overall_quality = self.tracker.get_overall_quality(safe_chan)
            stats = self.tracker.load_stats(safe_chan)
            
            display_name = chan_short
            direction = ""
            call_info = ""
            if info.exten and info.exten != "s":
                call_info = f"{Colors.DIM}-> {Colors.RESET}{Colors.YELLOW}{info.exten}{Colors.RESET}"
            if info.caller_id and info.caller_id != '""':
                call_info += f" {Colors.DIM}from{Colors.RESET} {Colors.CYAN}{info.caller_id.replace(chr(34), '')}{Colors.RESET}"
            
            ext_match = re.match(r'^t(\d+)_(\d+)', chan_short)
            if ext_match:
                live_cnt_ext += 1
                tenant_name = self.tenant_mgr.get_name(ext_match.group(1))
                ext_num = ext_match.group(2)
                display_name = f"{Colors.CYAN}[Extension]{Colors.RESET} {Colors.CYAN}[{tenant_name}]{Colors.RESET} {Colors.YELLOW}[{ext_num}]{Colors.RESET}"
            elif re.match(r'^admin\d+-[0-9a-f]+|^[a-z][a-z0-9_-]+-[0-9a-f]+', chan_short):
                saved_dir = self.parser.channel_direction.get(chan_short, "")
                saved_tenant = self.parser.channel_tenant.get(chan_short, "")
                linked_tenant = self.parser.get_tenant_from_linked_extension(chan_short)
                if linked_tenant:
                    saved_tenant = linked_tenant
                    self.parser.channel_tenant[chan_short] = linked_tenant
                if saved_dir == "inbound":
                    direction = f"{Colors.YELLOW}[Inbound]{Colors.RESET} "
                    call_info = f"{Colors.DIM}from{Colors.RESET} {Colors.CYAN}{info.caller_id.replace(chr(34), '')}{Colors.RESET}"
                    live_cnt_in += 1
                elif saved_dir == "outbound":
                    direction = f"{Colors.ORANGE}[Outbound]{Colors.RESET} "
                    call_info = f"{Colors.DIM}->{Colors.RESET} {Colors.YELLOW}{info.caller_id.replace(chr(34), '')}{Colors.RESET}"
                    live_cnt_out += 1
                else:
                    direction = f"{Colors.RED}[Unknown]{Colors.RESET} "
                tenant_label = f" {Colors.CYAN}[{self.tenant_mgr.get_name(saved_tenant)}]{Colors.RESET}" if saved_tenant else ""
                display_name = f"{direction}{Colors.GREY}[Trunk]{Colors.RESET}{tenant_label} {chan_short}"
            
            clean_dir = "inbound" if "Inbound" in direction else "outbound" if "Outbound" in direction else "unknown"
            effective_bridge = self.parser.get_effective_bridge_id(chan_short)
            self.tracker.save_call_info(safe_chan, display_name, duration, metrics.codec, info.context,
                effective_bridge, info.caller_id, clean_dir, info.app, self.parser.channel_tenant.get(chan_short, ""))
            
            quality_display = get_quality_display(quality_score, self.config)
            avg_mos = stats.sum_mos / stats.samples if stats.samples > 0 else current_mos
            js_pct = (stats.high_jitter_count * 100) // stats.samples if stats.samples > 0 else 0
            
            live_buffer.append("")
            live_buffer.append(f"{Colors.BOLD}{Colors.WHITE}{display_name}{Colors.RESET}  {Colors.GREEN}*{Colors.RESET} Active  {call_info}")
            live_buffer.append(f"{Colors.DIM}+-{Colors.RESET} Time: {Colors.CYAN}{duration}{Colors.RESET} | {Colors.CYAN}{metrics.codec}{Colors.RESET} | RTT: {metrics.rtt_ms:.1f}ms {Colors.DIM}({stats.peak_rtt}ms){Colors.RESET} | Q: {quality_display} {Colors.DIM}({overall_quality}%){Colors.RESET} | MOS: {Colors.GREEN}{current_mos:.1f}{Colors.RESET} {Colors.DIM}(Avg: {avg_mos:.1f}){Colors.RESET} | JS: {stats.high_jitter_count}/{stats.samples} ({js_pct}%) | J: TX {metrics.tx_jit_ms:.0f}ms RX {metrics.rx_jit_ms:.0f}ms | Loss: TX {metrics.tx_pct}% ({metrics.tx_lost}/{metrics.tx_cnt}) / RX {metrics.rx_pct}% ({metrics.rx_lost}/{metrics.rx_cnt})")
        
        total_live_calls = live_cnt_in + live_cnt_out + live_cnt_ext
        max_display = self.config.max_live_calls_display
        output_buffer.append("")
        output_buffer.append(f"{Colors.BOLD}   LIVE CALLS{Colors.RESET}")
        output_buffer.append(f"Inbound: {Colors.YELLOW}{live_cnt_in}{Colors.RESET} | Outbound: {Colors.ORANGE}{live_cnt_out}{Colors.RESET} | Extensions: {Colors.CYAN}{live_cnt_ext}{Colors.RESET}")
        output_buffer.append(f"{Colors.CYAN}{sep_line}{Colors.RESET}")
        
        max_lines = max_display * 3
        for line in live_buffer[:max_lines]: output_buffer.append(line)
        hidden_calls = total_live_calls - max_display
        if len(live_buffer) > max_lines and hidden_calls > 0:
            output_buffer.append("")
            output_buffer.append(f"{Colors.DIM}   ... plus {hidden_calls} more active call(s) not shown{Colors.RESET}")
        
        sys.stdout.write('\033[H\033[J')
        sys.stdout.write('\n'.join(output_buffer))
        sys.stdout.write('\n')
        sys.stdout.flush()
    
    def run(self):
        if os.geteuid() != 0:
            print("Error: This script must be run as root to access Asterisk CLI.")
            sys.exit(1)
        sys.stdout.write('\033[?1049h\033[?25l')
        sys.stdout.flush()
        try:
            while True:
                self.run_single_update()
                time.sleep(self.config.refresh_interval)
        except KeyboardInterrupt: pass
        finally:
            sys.stdout.write('\033[?25h\033[?1049l')
            sys.stdout.flush()
            print("\nMonitor stopped.")

if __name__ == "__main__":
    VitalPBXMonitor().run()
chmod +x /usr/local/bin/vitalpbx_monitor.py

Now run the file

python3 /usr/local/bin/vitalpbx_monitor.py

#To Exit - use Ctrl+C

Setup is automatic for first run and you can edit the auto generated config file like this:

nano /usr/local/bin/vitalpbx_monitor.conf
# VitalPBX Monitor Configuration
# Python version

# --- General Settings ---
ENABLE_LOGGING=true
REFRESH_INTERVAL=1

# --- Quality Thresholds (0-100) ---
WARNING_SCORE=90
CRITICAL_SCORE=70

# --- Advanced Log Processing ---
LOG_TRIM_LINES=800
STATS_FETCH_LINES=50

# --- Performance & Cleanup ---
RETENTION_MINUTES=120
MAX_TRACKING_FILES=100

# --- Display Settings ---
# Maximum number of live calls to display (default: 10)
MAX_LIVE_CALLS_DISPLAY=10

# --- Log File Management ---
# Maximum lines to keep in log file (default: 2000)
MAX_LOG_LINES=2000
# Remove log entries older than this many hours (default: 12)
LOG_RETENTION_HOURS=12

File Locations

FileLocationPurpose
Config{script_dir}/vitalpbx_monitor.confUser settings
Call Log/var/log/vitalpbx_call_quality.logCompleted call records
Tenant Map/usr/local/etc/tenant-lookup.txtTenant ID to company name
Tracking/tmp/vitalpbx_call_tracking/Runtime call data
Caches/tmp/vitalpbx_*Temporary cache files

Similar Posts

Leave a Reply