This script monitors call quality on extensions and trunks when using VitalPBX PJSIP via SSH.
New update 10th Dec 2025 – New Python Version – Live and working beta. The bash edition could not handle large call volume.
Real-Time Call Quality Monitoring for VitalPBX Multi-Tenant Systems
A powerful, efficient Python-based monitoring solution that provides instant visibility into call quality metrics across your entire VitalPBX infrastructure.

1. Real-Time Dashboard
System Health Display
- Server Uptime: Current system uptime displayed in header
- CPU Usage: Real-time CPU percentage with 1-minute load average
- Memory Usage: Current memory utilization percentage
- Active Call Count: Total active calls with breakdown by type (Inbound/Outbound/Extensions)
- Logging Status: Visual indicator showing if call logging is enabled
Live Call Display
For each active PJSIP call, displays:
- Call Duration: Live counter showing connection time (HH:MM:SS)
- Codec Detection: Active codec in use (G722, G711/ulaw/alaw, G729, Opus, etc.)
- Round Trip Time (RTT): Current and peak values in milliseconds
- Jitter Metrics: TX and RX jitter in milliseconds
- Packet Loss: TX and RX loss percentages with packet counts (lost/total)
- Quality Score: Calculated health percentage (0-100%) – instant and average
- MOS Score: Mean Opinion Score (1.0-4.5) – instant and average
- Jitter Spikes: Count and percentage of samples exceeding threshold (JS: 5/100 (5%))
Visual Quality Indicators
- 🟢 Green: Good quality (≥90% or configurable)
- 🟡 Yellow: Warning level (70-89% or configurable)
- 🔴 Red: Critical issues (<70% or configurable)
- 🟠 Orange: Outbound call direction indicator
- Color-coded metrics highlight problems instantly
Display Optimization
- Flicker-Free Updates: Uses terminal double-buffering and alternate screen buffer
- Configurable Refresh: Default 1-second updates (adjustable via config)
- Configurable Call Limit: Shows up to N live calls (default 10) with “plus X more” indicator
- Clean Layout: Separator lines, bold headers, dimmed secondary information
2. Intelligent Call Logic
Bridge Linking & Call Correlation
- Automatic Bridge Matching: Correlates Trunks to Extensions by matching Bridge UUIDs
- Local Channel Proxy Support: Handles bridge linking across Local channel proxies (common in Queues and Ring Groups)
- Cross-Call Correlation: Extension and Trunk log entries share the same Bridge ID for easy matching
Direction Detection
- Heuristic Analysis: Determines [Inbound] or [Outbound] based on:
- Dial Context patterns (pstn, inbound, did, outbound, outrt, etc.)
- Application type (Dial vs Queue vs AppDial)
- Channel naming conventions
- Persistent Direction Tracking: Stores detected direction per channel for consistency
Multi-Tenant Support
- Tenant ID Resolution: Extracts tenant from context patterns (e.g.,
T2_,trk-2-) - Company Name Mapping: Resolves Tenant IDs to human-readable names via lookup file (
/usr/local/etc/tenant-lookup.txt) - Extension-Based Tenant Detection: For trunk calls, looks up tenant from linked extension
- Outbound Tenant Override: Correctly attributes outbound trunk calls to originating tenant
Call Filtering
- IVR/System Call Exclusion: Automatically excludes non-real calls from logging:
- Voicemail greetings and playbacks
- IVR announcements
- Music on hold
- Parking lots, Conference bridges
- Page/Intercom, System recordings
- Validation Requirements: Only logs calls with valid bridge ID, minimum 2-second duration, real codec, and actual packet activity
3. Historical Statistics (On-Screen)
Insights Panel
Displays analytics for recent calls (configurable, default last 50):
- Short Call Detection: Percentage of calls under 10 seconds with warning indicator
- Short Call Leaders: Identifies extensions/tenants with most short calls
- Top Outbound: Most active outbound caller by company
- Top Inbound: Most active inbound destination by company
- Codec Distribution: Breakdown of all codecs in use with counts
- Loss Leaders: Extensions experiencing packet loss
Per-Category Statistics
Separate statistics for each call type:
- Extension Calls: Internal/extension-to-extension calls
- Inbound Trunks: Calls received from external sources
- Outbound Trunks: Calls placed to external destinations
Each category shows:
- Average Quality Score with color coding
- Total call count
- Average and total call duration
- RTT statistics: Min, Max, and Average
- Jitter statistics: TX and RX averages
- Packet loss percentages
Worst 3 Calls Display
- Automatic Identification: Lists the 3 lowest quality calls from recent history
- Caller ID Lookup: For extension calls, displays the actual external caller ID (looked up from trunk via bridge ID)
- Full Metrics Display: Shows time, quality, duration, codec, RTT, jitter spikes, loss percentages, and packet counts
- Quick Troubleshooting: Helps spot recurring problem sources immediately
Fuzzy Matching & Self-Learning
- Relations Table: Builds mapping of Bridge IDs to Extensions and Companies
- CID Mapping: Associates caller IDs with company names for improved display
- Time-Based Correlation: Matches calls by timestamp and CID when bridge linking unavailable
4. Logging & Alerting
Visual Alerting
- ANSI Color Coding: Green/Yellow/Red based on configurable thresholds
- Configurable Thresholds:
- Warning Score: Default 90% (Yellow)
- Critical Score: Default 70% (Red)
- Instant Visibility: Problem calls stand out immediately in the display
Persistent Call Logging
When each call ends, comprehensive statistics are written to /var/log/vitalpbx_call_quality.log:
- Timestamp and call direction ([Inbound]/[Outbound])
- Extension number and Company name
- Channel identifier
- Caller ID and Bridge ID (for correlation)
- Duration and Codec
- Quality Score: Average and Minimum
- Jitter Spikes: Count/Samples
- Peak RTT value
- Peak Jitter: TX and RX
- Peak Packet Loss: TX and RX percentages
- Packet Counts: TX lost/total, RX lost/total
- Context and Application
- Tenant ID
- MOS Score: Average and Minimum
Log Sanitization
- Pipe Character Removal: Cleans data to ensure machine-readable log format
- ANSI Code Stripping: Removes color codes from logged names
- Consistent Formatting: Structured format for easy parsing and analysis
Automatic Log Management
- Line Limit: Trims log when exceeding MAX_LOG_LINES (default: 2000)
- Age Limit: Removes entries older than LOG_RETENTION_HOURS (default: 12)
- Whichever First: Both limits checked, earliest trigger wins
- Periodic Check: Validates every 60 seconds to minimize disk I/O
5. Performance Optimization
Efficient Design
- Pure Python: No external process spawning for calculations
- Native String Operations: All parsing and math done in-memory
- Minimal Dependencies: Uses only Python standard library
- Low Memory Footprint: File-based tracking instead of memory accumulation
Smart Caching
- Asterisk CLI Cache:
pjsip show channelstatsoutput cached for 2 seconds - Statistics Cache: Historical statistics recalculated only every 15 seconds
- Direction/Tenant Persistence: Stored in files, loaded once per cycle
- CPU Calculation Cache: Previous values stored for delta calculation
Automatic Cleanup
- Tracking File Management: Old
.track,.info,.direction,.tenantfiles cleaned when:- Count exceeds MAX_TRACKING_FILES (default: 100)
- Age exceeds RETENTION_MINUTES (default: 120)
- Call End Detection: Tracking files removed immediately when calls end
- No Manual Maintenance: System is completely self-managing
Resource-Conscious Operation
- Suitable for Busy Servers: Designed for production PBX environments
- Non-Blocking Updates: Display updates don’t interfere with call processing
- Graceful Degradation: Continues operation despite transient errors
6. Technical Specifications
| Specification | Details |
|---|---|
| Language | Python 3.6+ |
| Platform | Debian/Ubuntu with VitalPBX |
| PBX Integration | Asterisk CLI (PJSIP channels) |
| Refresh Rate | 1 second (configurable) |
| Memory Usage | Minimal (file-based tracking) |
| Disk Usage | Self-managing (auto-cleanup) |
| Dependencies | None (Python standard library only) |
| Terminal | Color support required |
| Access | Root (for Asterisk CLI access) |
7. Use Cases
- NOC Monitoring: Real-time visibility into call quality across all tenants
- Troubleshooting: Quickly identify problematic calls and recurring patterns
- Capacity Planning: Track codec usage, call volumes, and peak times
- SLA Compliance: Document and audit call quality metrics
- Carrier Quality Assessment: Monitor trunk performance by direction
- Endpoint Diagnostics: Identify extensions with consistent quality problems
- Quality Trending: Historical log analysis for pattern detection
⚠️ Important Notes & Disclaimers
Status: This is an Experimental Beta Release. While unlikely to interfere with VitalPBX operations, please proceed with standard caution.
- System Impact: Expect a ±1% increase in CPU usage while the script is actively running. This should not impact low-to-medium capacity systems.
- Best Practice: Always create a system backup before deploying new software on your server.
- Support: Please send questions or feedback via the contacts page.
Installation & Usage Guide
- Connect: Login to your VPBX server via SSH.
- Configure: Add the optional
tenant-lookup.txtfile (if required). - Install: Add the required
vitalpbx_monitor.pybash script to your directory. - Permissions: Make the script executable using the
chmodcommand. - First Run (Setup): Execute the following command to initialize, which allows you to configure logging and refresh times:Bash
python3 /usr/local/bin/vitalpbx_monitor.py
Logging Details
Purpose: This size prevents system slowdowns while providing enough data to copy/paste into AI tools for issue analysis.
File Location: /var/log/vitalpbx_call_quality.log
Size Limit: The log is default capped at 1000 entries (approx. 280kb). User configurable in config file.
Get started by adding the optional tenant lookup table.
nano /usr/local/etc/tenant-lookup.txt
Paste the code below into the optional tenant-lookup.txt and modify with your own tenant table.
# Tenant Lookup Table - OPTIONAL
# Format: tenant_number=Company Name
# Lines starting with # are comments
1=Default
2=Tesla Inc
3=Example Corp
# Add more tenants below:
nano /usr/local/bin/vitalpbx_monitor.py
Paste the code below into the vitalpbx_monitor.py file and continue at the very bottom.
#!/usr/bin/env python3
"""
VitalPBX Live Call Quality Monitor (v2.14 - Python itproexpert.com)
Converted from Bash to Python for improved efficiency
Changes in v1.14:
- Quality score now heavily penalizes packet loss (one-way audio situations)
- Log shows average quality only (removed Min quality)
- MOS displays current and average (not minimum)
- Loss leaders based on average quality percentage
- Added "Last 3 Recent Calls" section after statistics
- Improved packet loss detection for quality scoring
"""
import os
import sys
import re
import time
import subprocess
from pathlib import Path
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Dict, List, Tuple, Any
from collections import defaultdict
SCRIPT_DIR = Path(__file__).parent.resolve()
CONFIG_FILE = SCRIPT_DIR / "vitalpbx_monitor.conf"
TENANT_LOOKUP_FILE = Path("/usr/local/etc/tenant-lookup.txt")
CALL_TRACKING_DIR = Path("/tmp/vitalpbx_call_tracking")
CALL_LOG_FILE = Path("/var/log/vitalpbx_call_quality.log")
CPU_PREV_FILE = Path("/tmp/vitalpbx_cpu_prev")
CLI_CACHE_FILE = Path("/tmp/vitalpbx_cli_cache")
STATS_CACHE_FILE = Path("/tmp/vitalpbx_stats_cache")
STATS_TIMESTAMP_FILE = Path("/tmp/vitalpbx_stats_timestamp")
@dataclass
class Config:
enable_logging: bool = True
refresh_interval: int = 1
warning_score: int = 90
critical_score: int = 70
retention_minutes: int = 120
max_tracking_files: int = 100
log_trim_lines: int = 800
stats_fetch_lines: int = 50
max_live_calls_display: int = 10
max_log_lines: int = 2000
log_retention_hours: int = 12
class Colors:
GREEN = '\033[32m'
RED = '\033[31m'
YELLOW = '\033[33m'
ORANGE = '\033[38;5;208m'
CYAN = '\033[36m'
WHITE = '\033[97m'
GREY = '\033[90m'
RESET = '\033[0m'
BOLD = '\033[1m'
DIM = '\033[2m'
@dataclass
class CallStats:
samples: int = 0
total_score: int = 0
peak_rtt: float = 0
peak_tx_jit: float = 0
peak_rx_jit: float = 0
peak_tx_pct: float = 0
peak_rx_pct: float = 0
min_score: int = 0
sum_rtt: float = 0
sum_jitter: float = 0
start_time: int = 0
high_jitter_count: int = 0
max_tx_cnt: int = 0
max_tx_lost: int = 0
max_rx_cnt: int = 0
max_rx_lost: int = 0
sum_mos: float = 0
min_mos: float = 4.5
def to_string(self) -> str:
return '|'.join(map(str, [
self.samples, self.total_score, self.peak_rtt, self.peak_tx_jit,
self.peak_rx_jit, self.peak_tx_pct, self.peak_rx_pct, self.min_score,
self.sum_rtt, self.sum_jitter, self.start_time, self.high_jitter_count,
self.max_tx_cnt, self.max_tx_lost, self.max_rx_cnt, self.max_rx_lost,
self.sum_mos, self.min_mos
]))
@classmethod
def from_string(cls, data: str) -> 'CallStats':
parts = data.strip().split('|')
if len(parts) < 18:
parts.extend(['0'] * (18 - len(parts)))
try:
return cls(
samples=int(parts[0] or 0), total_score=int(parts[1] or 0),
peak_rtt=float(parts[2] or 0), peak_tx_jit=float(parts[3] or 0),
peak_rx_jit=float(parts[4] or 0), peak_tx_pct=float(parts[5] or 0),
peak_rx_pct=float(parts[6] or 0), min_score=int(parts[7] or 0),
sum_rtt=float(parts[8] or 0), sum_jitter=float(parts[9] or 0),
start_time=int(parts[10] or 0), high_jitter_count=int(parts[11] or 0),
max_tx_cnt=int(parts[12] or 0), max_tx_lost=int(parts[13] or 0),
max_rx_cnt=int(parts[14] or 0), max_rx_lost=int(parts[15] or 0),
sum_mos=float(parts[16] or 0),
min_mos=float(parts[17] or 4.5) if parts[17] and float(parts[17]) > 0 else 4.5,
)
except (ValueError, IndexError):
return cls()
@dataclass
class ChannelInfo:
context: str = ""
exten: str = ""
caller_id: str = ""
app: str = ""
data: str = ""
state: str = ""
priority: str = ""
bridge_id: str = ""
@dataclass
class ChannelMetrics:
codec: str = "-"
rtt_ms: float = 0.0
tx_jit_ms: float = 0
rx_jit_ms: float = 0
tx_pct: float = 0
rx_pct: float = 0
rx_cnt: int = 0
rx_lost: int = 0
tx_cnt: int = 0
tx_lost: int = 0
def load_config() -> Config:
config = Config()
if not CONFIG_FILE.exists():
save_default_config()
try:
with open(CONFIG_FILE, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
key = key.strip().lower()
value = value.strip()
if key == 'enable_logging': config.enable_logging = value.lower() == 'true'
elif key == 'refresh_interval': config.refresh_interval = int(value)
elif key == 'warning_score': config.warning_score = int(value)
elif key == 'critical_score': config.critical_score = int(value)
elif key == 'retention_minutes': config.retention_minutes = int(value)
elif key == 'max_tracking_files': config.max_tracking_files = int(value)
elif key == 'log_trim_lines': config.log_trim_lines = int(value)
elif key == 'stats_fetch_lines': config.stats_fetch_lines = int(value)
elif key == 'max_live_calls_display': config.max_live_calls_display = int(value)
elif key == 'max_log_lines': config.max_log_lines = int(value)
elif key == 'log_retention_hours': config.log_retention_hours = int(value)
except Exception:
pass
return config
def save_default_config():
content = """# VitalPBX Monitor Configuration
ENABLE_LOGGING=true
REFRESH_INTERVAL=1
WARNING_SCORE=90
CRITICAL_SCORE=70
LOG_TRIM_LINES=800
STATS_FETCH_LINES=50
RETENTION_MINUTES=120
MAX_TRACKING_FILES=100
MAX_LIVE_CALLS_DISPLAY=10
MAX_LOG_LINES=2000
LOG_RETENTION_HOURS=12
"""
CONFIG_FILE.write_text(content)
def run_asterisk_cmd(cmd: str) -> str:
try:
result = subprocess.run(['asterisk', '-rx', cmd], capture_output=True, text=True, timeout=10)
return result.stdout
except Exception:
return ""
def get_terminal_width() -> int:
try: return os.get_terminal_size().columns
except Exception: return 80
def parse_duration(duration_str: str) -> int:
try:
parts = duration_str.split(':')
if len(parts) == 3:
return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
except Exception: pass
return 0
def format_duration(seconds: int) -> str:
return f"{seconds // 3600:02d}:{(seconds % 3600) // 60:02d}:{seconds % 60:02d}"
def safe_int(value: Any, default: int = 0) -> int:
try:
if isinstance(value, str): value = value.split('.')[0]
return int(value)
except (ValueError, TypeError): return default
def safe_float(value: Any, default: float = 0.0) -> float:
try: return float(value)
except (ValueError, TypeError): return default
class TenantManager:
def __init__(self):
self.tenant_map: Dict[str, str] = {}
self._load_tenants()
def _load_tenants(self):
if TENANT_LOOKUP_FILE.exists():
try:
with open(TENANT_LOOKUP_FILE, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'): continue
if '=' in line:
tenant_num, company_name = line.split('=', 1)
self.tenant_map[tenant_num.strip()] = company_name.strip()
except Exception: pass
def get_name(self, tenant_num: str) -> str:
return self.tenant_map.get(tenant_num, f"Tenant {tenant_num}")
class CallTracker:
def __init__(self, config: Config):
self.config = config
self._last_log_trim_check = 0
CALL_TRACKING_DIR.mkdir(parents=True, exist_ok=True)
if not CALL_LOG_FILE.exists():
CALL_LOG_FILE.touch()
CALL_LOG_FILE.chmod(0o666)
def cleanup_old_files(self):
try:
track_files = list(CALL_TRACKING_DIR.glob("*.track"))
if len(track_files) > self.config.max_tracking_files:
retention_secs = self.config.retention_minutes * 60
current_time = time.time()
for f in track_files:
if current_time - f.stat().st_mtime > retention_secs:
f.unlink(missing_ok=True)
f.with_suffix('.info').unlink(missing_ok=True)
except Exception: pass
def trim_log_file(self):
current_time = time.time()
if current_time - self._last_log_trim_check < 60: return
self._last_log_trim_check = current_time
if not CALL_LOG_FILE.exists(): return
try:
with open(CALL_LOG_FILE, 'r') as f: lines = f.readlines()
if not lines: return
original_count = len(lines)
cutoff_time = datetime.now() - timedelta(hours=self.config.log_retention_hours)
filtered_lines = []
for line in lines:
timestamp_match = re.match(r'^\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]', line)
if timestamp_match:
try:
line_time = datetime.strptime(timestamp_match.group(1), '%Y-%m-%d %H:%M:%S')
if line_time >= cutoff_time: filtered_lines.append(line)
except ValueError: filtered_lines.append(line)
else: filtered_lines.append(line)
if len(filtered_lines) > self.config.max_log_lines:
filtered_lines = filtered_lines[-self.config.max_log_lines:]
if len(filtered_lines) < original_count:
with open(CALL_LOG_FILE, 'w') as f: f.writelines(filtered_lines)
except Exception: pass
def get_track_file(self, channel: str) -> Path:
return CALL_TRACKING_DIR / f"{channel.replace('/', '_')}.track"
def get_info_file(self, channel: str) -> Path:
return CALL_TRACKING_DIR / f"{channel.replace('/', '_')}.info"
def get_direction_file(self, channel: str) -> Path:
return CALL_TRACKING_DIR / f"{channel.replace('/', '_')}.direction"
def get_tenant_file(self, channel: str) -> Path:
return CALL_TRACKING_DIR / f"{channel.replace('/', '_')}.tenant"
def load_stats(self, channel: str) -> CallStats:
track_file = self.get_track_file(channel)
if track_file.exists():
try: return CallStats.from_string(track_file.read_text())
except Exception: pass
return CallStats()
def save_stats(self, channel: str, stats: CallStats):
self.get_track_file(channel).write_text(stats.to_string())
def update_tracking(self, channel: str, rtt: float, tx_jit: float, rx_jit: float,
tx_pct: float, rx_pct: float, current_score: int,
tx_cnt: int, tx_lost: int, rx_cnt: int, rx_lost: int,
current_mos: float = 4.5):
current_time = int(time.time())
stats = self.load_stats(channel)
if stats.samples == 0: stats.start_time = current_time
call_duration = current_time - stats.start_time
stats.samples += 1
stats.total_score += current_score
stats.sum_mos += current_mos
if current_mos < stats.min_mos: stats.min_mos = current_mos
stats.max_tx_cnt = max(stats.max_tx_cnt, tx_cnt)
stats.max_tx_lost = max(stats.max_tx_lost, tx_lost)
stats.max_rx_cnt = max(stats.max_rx_cnt, rx_cnt)
stats.max_rx_lost = max(stats.max_rx_lost, rx_lost)
if tx_jit > 70 or rx_jit > 70: stats.high_jitter_count += 1
if stats.min_score == 0 or current_score < stats.min_score: stats.min_score = current_score
if rtt > 0 and rtt > stats.peak_rtt:
stats.peak_rtt = rtt
stats.sum_rtt += rtt
if call_duration > 3:
if 0 < tx_jit <= 3000 and tx_jit > stats.peak_tx_jit: stats.peak_tx_jit = tx_jit
if 0 < rx_jit <= 3000 and rx_jit not in (999, 1000) and rx_jit > stats.peak_rx_jit: stats.peak_rx_jit = rx_jit
if tx_pct > stats.peak_tx_pct: stats.peak_tx_pct = tx_pct
if rx_pct > stats.peak_rx_pct: stats.peak_rx_pct = rx_pct
self.save_stats(channel, stats)
def get_overall_quality(self, channel: str) -> int:
stats = self.load_stats(channel)
if stats.samples == 0: return 100
avg_score = stats.total_score // stats.samples
if stats.high_jitter_count > 0:
jitter_pct = (stats.high_jitter_count * 100) // stats.samples
if jitter_pct > 20: avg_score -= 25
elif jitter_pct > 10: avg_score -= 15
elif jitter_pct > 5: avg_score -= 10
elif jitter_pct > 2: avg_score -= 5
return max(0, avg_score)
def save_call_info(self, channel: str, display_name: str, duration: str,
codec: str, context: str, bridge_id: str, caller_id: str,
direction: str, app: str, tenant: str):
info_file = self.get_info_file(channel)
clean_name = re.sub(r'\x1b\[[0-9;]*m', '', display_name).replace('|', '-')
info = '|'.join([clean_name, duration.replace('|', '-'), codec.replace('|', '-'),
context.replace('|', '-'), bridge_id.replace('|', '-'), caller_id.replace('|', '-'),
direction, app.replace('|', '-'), tenant.replace('|', '-')])
info_file.write_text(info)
def log_call_completion(self, channel: str, display_name: str, duration: str,
codec: str, context: str, bridge_id: str, caller_id: str,
direction: str, app: str, tenant: str, tenant_mgr: TenantManager):
if codec in ('-', '') or not self.config.enable_logging: return
stats = self.load_stats(channel)
if stats.samples == 0: return
if not bridge_id or bridge_id.strip() == '': return
dur_seconds = parse_duration(duration)
if dur_seconds < 2: return
skip_contexts = ('sub-extvm', 'vm-', 'voicemail', 'playback', 'ivr', 'announcement', 'moh', 'parking', 'page', 'intercom', 'conf-', 'meetme', 'spy', 'chanspy', 'pickup')
context_lower = context.lower()
if any(skip in context_lower for skip in skip_contexts): return
skip_apps = ('playback', 'background', 'read', 'saydigits', 'saynumber', 'voicemail', 'voicemailmain', 'record', 'mixmonitor', 'chanspy', 'page', 'park', 'parkedcall')
app_lower = app.lower()
if any(skip_app == app_lower for skip_app in skip_apps): return
if stats.max_tx_cnt == 0 and stats.max_rx_cnt == 0: return
avg_quality = stats.total_score // stats.samples
clean_name = re.sub(r'\x1b\[[0-9;]*m', '', display_name).replace('|', ' ')
if "Extension" in display_name:
ext_match = re.search(r'\[(\d+)\]', clean_name)
ext_num = ext_match.group(1) if ext_match else "Unknown"
comp_matches = re.findall(r'\[([a-zA-Z0-9 ]+)\]', clean_name)
comp_name = next((m for m in comp_matches if m != "Extension" and m != ext_num), "Unknown")
final_name = f"[Extension] [{ext_num}] [{comp_name}] {channel}"
else:
dir_fmt = {"inbound": "[Inbound]", "outbound": "[Outbound]"}.get(direction, "[Unknown]")
comp_name = tenant_mgr.get_name(tenant)
trunk_match = re.search(r'\[Trunk\] \[([^\]]+)', clean_name)
if trunk_match: comp_name = trunk_match.group(1)
final_name = f"[Trunk] {dir_fmt} [{comp_name}] {channel}"
caller_id = caller_id.replace('|', '-')
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
avg_mos = stats.sum_mos / stats.samples if stats.samples > 0 else 4.5
log_entry = (
f"[{timestamp}] {final_name} | CID: {caller_id} | Bridge: {bridge_id} | "
f"Dur: {duration} | Codec: {codec} | Quality: {avg_quality}% | "
f"JitterSpikes: {stats.high_jitter_count}/{stats.samples} | RTT: {stats.peak_rtt}ms | "
f"Jitter: TX {stats.peak_tx_jit}ms RX {stats.peak_rx_jit}ms | "
f"Loss: TX {stats.peak_tx_pct}% RX {stats.peak_rx_pct}% | "
f"Pkts: TX {stats.max_tx_lost}/{stats.max_tx_cnt} RX {stats.max_rx_lost}/{stats.max_rx_cnt} | "
f"Ctx: {context} | App: {app} | Tenant: {tenant} | MOS: {avg_mos:.1f}\n"
)
with open(CALL_LOG_FILE, 'a') as f: f.write(log_entry)
def check_ended_calls(self, active_channels: set, tenant_mgr: TenantManager):
if not self.config.enable_logging: return
active_normalized = set()
for c in active_channels:
normalized = c.replace("PJSIP/", "").replace("pjsip/", "").lower().replace('/', '_')
active_normalized.add(normalized)
for track_file in CALL_TRACKING_DIR.glob("*.track"):
tracked_channel = track_file.stem.lower()
if tracked_channel not in active_normalized:
info_file = track_file.with_suffix('.info')
if info_file.exists():
try:
parts = info_file.read_text().split('|')
if len(parts) >= 9:
self.log_call_completion(tracked_channel, parts[0], parts[1], parts[2],
parts[3], parts[4], parts[5], parts[6], parts[7], parts[8], tenant_mgr)
except Exception: pass
info_file.unlink(missing_ok=True)
track_file.unlink(missing_ok=True)
self.get_direction_file(tracked_channel).unlink(missing_ok=True)
self.get_tenant_file(tracked_channel).unlink(missing_ok=True)
def calculate_quality_score(rtt: float, tx_jit: float, rx_jit: float, tx_pct: float, rx_pct: float,
tx_cnt: int = 0, tx_lost: int = 0, rx_cnt: int = 0, rx_lost: int = 0) -> int:
"""
Calculate quality score (0-100) based on metrics.
QUALITY FACTORS SUMMARY:
========================
1. RTT (Round Trip Time) - Network latency
- >300ms: -30 points, >200ms: -20, >150ms: -15, >100ms: -8
2. Jitter (TX and RX averaged) - Packet timing variation
- >200ms: -35 points, >100ms: -25, >70ms: -15, >50ms: -10, >30ms: -5
3. Packet Loss Percentage (TX and RX averaged) - Lost audio data
- >15%: -70 points, >10%: -60, >5%: -45, >3%: -30, >1%: -15, >0%: -8
4. One-Way Audio Detection - Asymmetric packet loss
- TX loss 100% with RX OK: -50 points (caller can't hear you)
- RX loss 100% with TX OK: -50 points (you can't hear caller)
- Severe asymmetric loss: -30 to -40 points
5. High Jitter Spike Penalty (applied in get_overall_quality)
- >20% of samples with jitter >70ms: -25, >10%: -15, >5%: -10, >2%: -5
"""
score = 100
rtt_int = safe_int(rtt)
if rtt_int > 300: score -= 30
elif rtt_int > 200: score -= 20
elif rtt_int > 150: score -= 15
elif rtt_int > 100: score -= 8
avg_jitter = (safe_int(tx_jit) + safe_int(rx_jit)) // 2
if avg_jitter > 200: score -= 35
elif avg_jitter > 100: score -= 25
elif avg_jitter > 70: score -= 15
elif avg_jitter > 50: score -= 10
elif avg_jitter > 30: score -= 5
avg_loss = (safe_float(tx_pct) + safe_float(rx_pct)) / 2
if avg_loss > 15: score -= 70
elif avg_loss > 10: score -= 60
elif avg_loss > 5: score -= 45
elif avg_loss > 3: score -= 30
elif avg_loss > 1: score -= 15
elif avg_loss > 0: score -= 8
# ONE-WAY AUDIO DETECTION
if tx_pct >= 99 and rx_pct < 5: score -= 50
elif rx_pct >= 99 and tx_pct < 5: score -= 50
elif tx_pct >= 50 and rx_pct < 5: score -= 30
elif rx_pct >= 50 and tx_pct < 5: score -= 30
if tx_cnt > 100 and rx_cnt > 100:
actual_tx_loss = (tx_lost / tx_cnt * 100) if tx_cnt > 0 else 0
actual_rx_loss = (rx_lost / rx_cnt * 100) if rx_cnt > 0 else 0
if actual_tx_loss > 80 and actual_rx_loss < 10: score -= 40
elif actual_rx_loss > 80 and actual_tx_loss < 10: score -= 40
elif actual_tx_loss > 50 and actual_rx_loss < 10: score -= 25
elif actual_rx_loss > 50 and actual_tx_loss < 10: score -= 25
return max(0, score)
def calculate_mos(rtt: float, tx_jitter: float, rx_jitter: float, tx_loss: float, rx_loss: float) -> float:
jitter = max(tx_jitter, rx_jitter)
loss = max(tx_loss, rx_loss)
r_pen = 0
if rtt > 200: r_pen = 0.8 + ((rtt - 200) / 100) * 0.5
elif rtt > 150: r_pen = 0.5 + ((rtt - 150) / 50) * 0.3
elif rtt > 100: r_pen = 0.2 + ((rtt - 100) / 50) * 0.3
elif rtt > 50: r_pen = ((rtt - 50) / 50) * 0.2
j_pen = 0
if jitter > 100: j_pen = 1.0 + ((jitter - 100) / 100) * 0.5
elif jitter > 50: j_pen = 0.5 + ((jitter - 50) / 50) * 0.5
elif jitter > 30: j_pen = 0.2 + ((jitter - 30) / 20) * 0.3
elif jitter > 10: j_pen = ((jitter - 10) / 20) * 0.2
l_pen = 0
if loss > 5: l_pen = 2.0 + ((loss - 5) / 5) * 1.0
elif loss > 2: l_pen = 1.0 + ((loss - 2) / 3) * 1.0
elif loss > 1: l_pen = 0.5 + ((loss - 1)) * 0.5
elif loss > 0.5: l_pen = 0.2 + ((loss - 0.5) / 0.5) * 0.3
elif loss > 0: l_pen = loss * 0.4
return max(1.0, min(4.5, round(4.5 - r_pen - j_pen - l_pen, 1)))
def get_quality_display(score: int, config: Config) -> str:
if score >= config.warning_score: return f"{Colors.GREEN}{score}%{Colors.RESET}"
elif score >= config.critical_score: return f"{Colors.YELLOW}{score}%{Colors.RESET}"
else: return f"{Colors.RED}{score}%{Colors.RESET}"
def get_cpu_usage() -> int:
try:
with open('/proc/stat', 'r') as f: line = f.readline()
parts = line.split()
user, nice, system, idle, iowait, irq, softirq, steal = map(int, parts[1:9])
total = user + nice + system + idle + iowait + irq + softirq + steal
idle_sum = idle + iowait
if CPU_PREV_FILE.exists():
prev_data = CPU_PREV_FILE.read_text().split()
prev_total, prev_idle = int(prev_data[0]), int(prev_data[1])
diff_total = total - prev_total
diff_idle = idle_sum - prev_idle
usage = 100 * (diff_total - diff_idle) // diff_total if diff_total > 0 else 0
else: usage = 0
CPU_PREV_FILE.write_text(f"{total} {idle_sum}")
return usage
except Exception: return 0
def get_memory_usage() -> Tuple[int, int, float]:
try:
result = subprocess.run(['free', '-m'], capture_output=True, text=True)
lines = result.stdout.strip().split('\n')
if len(lines) >= 2:
parts = lines[1].split()
total, used = int(parts[1]), int(parts[2])
return used, total, (used / total) * 100 if total > 0 else 0
except Exception: pass
return 0, 0, 0.0
def get_load_average() -> Tuple[float, float]:
try:
with open('/proc/loadavg', 'r') as f: load_1min = float(f.read().split()[0])
cores = os.cpu_count() or 1
return load_1min, (load_1min / cores) * 100
except Exception: return 0.0, 0.0
def get_uptime() -> str:
try:
result = subprocess.run(['uptime', '-p'], capture_output=True, text=True)
return result.stdout.strip().replace('up ', '')
except Exception: return "unknown"
class ChannelParser:
def __init__(self, config: Config, tenant_mgr: TenantManager, tracker: CallTracker):
self.config = config
self.tenant_mgr = tenant_mgr
self.tracker = tracker
self.channel_info: Dict[str, ChannelInfo] = {}
self.channel_stats: Dict[str, str] = {}
self.channel_direction: Dict[str, str] = {}
self.channel_tenant: Dict[str, str] = {}
self.bridge_groups: Dict[str, List[str]] = defaultdict(list)
self.local_link_map: Dict[str, str] = {}
def load_persisted_state(self):
for f in CALL_TRACKING_DIR.glob("*.direction"):
self.channel_direction[f.stem] = f.read_text().strip()
for f in CALL_TRACKING_DIR.glob("*.tenant"):
self.channel_tenant[f.stem] = f.read_text().strip()
def parse_channels_concise(self, raw_output: str) -> Dict[str, ChannelInfo]:
local_chans_1, local_chans_2 = {}, {}
for line in raw_output.strip().split('\n'):
if not line: continue
parts = line.split('!')
if len(parts) < 8: continue
chan, context, exten = parts[0], parts[1] if len(parts) > 1 else "", parts[2] if len(parts) > 2 else ""
state, app, data = parts[4] if len(parts) > 4 else "", parts[5] if len(parts) > 5 else "", parts[6] if len(parts) > 6 else ""
caller_id = parts[7] if len(parts) > 7 else ""
bridge_id = ""
for i in range(len(parts) - 1, -1, -1):
if re.match(r'^[0-9a-f]{8}-[0-9a-f]{4}', parts[i]):
bridge_id = parts[i]
break
if chan.startswith("Local/"):
base_name = chan.split(';')[0]
suffix = chan.split(';')[1] if ';' in chan else ""
if suffix == "1": local_chans_1[base_name] = bridge_id
elif suffix == "2": local_chans_2[base_name] = bridge_id
if chan.startswith("PJSIP/"):
chan_short = chan.replace("PJSIP/", "").lower()
self.channel_info[chan_short] = ChannelInfo(context=context, exten=exten, caller_id=caller_id, app=app, data=data, state=state, bridge_id=bridge_id)
if bridge_id: self.bridge_groups[bridge_id].append(chan_short)
if re.match(r'^admin\d+-[0-9a-f]+|^[a-z][a-z0-9_-]+-[0-9a-f]+', chan_short):
detected_dir, detected_tenant = "", ""
tenant_match = re.search(r'^T(\d+)_|^trk-(\d+)-', context)
if tenant_match: detected_tenant = tenant_match.group(1) or tenant_match.group(2)
if re.search(r'(AppDial|Dial)', app) and "(Outgoing Line)" in data: detected_dir = "outbound"
elif re.search(r'(Queue|AppQueue)', app): detected_dir = "inbound"
if not detected_dir:
if re.search(r'(pstn|inbound|did|ivr|ext-group|ringgroups|ext-queues|sub-local-dialing)', context): detected_dir = "inbound"
elif re.search(r'(internal|outbound|outrt|macro-dialout)', context): detected_dir = "outbound"
elif re.search(r'trk-\d+-in', context): detected_dir = "outbound"
if detected_dir and chan_short not in self.channel_direction:
self.channel_direction[chan_short] = detected_dir
self.tracker.get_direction_file(chan_short).write_text(detected_dir)
if detected_tenant and chan_short not in self.channel_tenant:
self.channel_tenant[chan_short] = detected_tenant
self.tracker.get_tenant_file(chan_short).write_text(detected_tenant)
for base in local_chans_1:
b1, b2 = local_chans_1.get(base, ""), local_chans_2.get(base, "")
if b1 and b2:
self.local_link_map[b1] = b2
self.local_link_map[b2] = b1
return self.channel_info
def get_all_linked_channels(self, bridge_id: str) -> List[str]:
channels = list(self.bridge_groups.get(bridge_id, []))
linked_bridge = self.local_link_map.get(bridge_id, "")
if linked_bridge: channels.extend(self.bridge_groups.get(linked_bridge, []))
return channels
def get_effective_bridge_id(self, chan_short: str) -> str:
info = self.channel_info.get(chan_short)
if not info or not info.bridge_id: return ""
bridge_id = info.bridge_id
linked_bridge = self.local_link_map.get(bridge_id, "")
if linked_bridge and re.match(r'^t\d+_\d+', chan_short):
for lc in self.bridge_groups.get(linked_bridge, []):
if re.match(r'^admin\d+-[0-9a-f]+|^[a-z][a-z0-9_-]+-[0-9a-f]+', lc): return linked_bridge
return bridge_id
def get_tenant_from_linked_extension(self, chan_short: str) -> str:
info = self.channel_info.get(chan_short)
if not info or not info.bridge_id: return ""
for linked_chan in self.get_all_linked_channels(info.bridge_id):
if linked_chan == chan_short: continue
ext_match = re.match(r'^t(\d+)_(\d+)', linked_chan)
if ext_match: return ext_match.group(1)
return ""
def parse_channel_stats(self, raw_output: str):
for line in raw_output.strip().split('\n'):
if not line: continue
parts = line.split()
if len(parts) < 4: continue
col1, col2 = parts[0], parts[1] if len(parts) > 1 else ""
chan_id = col1 if re.match(r'^\d{2}:\d{2}:\d{2}$', col2) else col2
if chan_id == "ChannelId" or not chan_id: continue
self.channel_stats[chan_id.lower()] = line
def get_metrics_for_channel(self, chan_short: str, bridge_id: str) -> ChannelMetrics:
trunc_key = chan_short[:18].lower()
stat_line = self.channel_stats.get(f"{bridge_id}|{trunc_key}") or self.channel_stats.get(trunc_key)
metrics = ChannelMetrics()
if not stat_line: return metrics
parts = stat_line.split()
col2 = parts[1] if len(parts) > 1 else ""
try:
if re.match(r'^\d{2}:\d{2}:\d{2}$', col2):
metrics.rx_jit_ms = safe_float(parts[6]) if len(parts) > 6 else 0
metrics.tx_jit_ms = safe_float(parts[10]) * 1000 if len(parts) > 10 else 0
metrics.rtt_ms = safe_float(parts[11]) * 1000 if len(parts) > 11 else 0
metrics.tx_pct = safe_float(parts[9]) if len(parts) > 9 else 0
metrics.rx_pct = safe_float(parts[5]) if len(parts) > 5 else 0
metrics.codec = parts[2] if len(parts) > 2 else "-"
metrics.rx_cnt = safe_int(parts[3]) if len(parts) > 3 else 0
metrics.rx_lost = safe_int(parts[4]) if len(parts) > 4 else 0
metrics.tx_cnt = safe_int(parts[7]) if len(parts) > 7 else 0
metrics.tx_lost = safe_int(parts[8]) if len(parts) > 8 else 0
else:
metrics.rx_jit_ms = safe_float(parts[7]) if len(parts) > 7 else 0
metrics.tx_jit_ms = safe_float(parts[11]) * 1000 if len(parts) > 11 else 0
metrics.rtt_ms = safe_float(parts[12]) * 1000 if len(parts) > 12 else 0
metrics.tx_pct = safe_float(parts[10]) if len(parts) > 10 else 0
metrics.rx_pct = safe_float(parts[6]) if len(parts) > 6 else 0
metrics.codec = parts[3] if len(parts) > 3 else "-"
metrics.rx_cnt = safe_int(parts[4]) if len(parts) > 4 else 0
metrics.rx_lost = safe_int(parts[5]) if len(parts) > 5 else 0
metrics.tx_cnt = safe_int(parts[8]) if len(parts) > 8 else 0
metrics.tx_lost = safe_int(parts[9]) if len(parts) > 9 else 0
except (IndexError, ValueError): pass
return metrics
class StatisticsDisplay:
def __init__(self, config: Config, tenant_mgr: TenantManager):
self.config = config
self.tenant_mgr = tenant_mgr
self.relations_table: Dict[str, str] = {}
self.global_cid_map: Dict[str, str] = {}
def build_relations_map(self, log_chunk: str):
for line in log_chunk.split('\n'):
if "[Extension]" not in line: continue
bridge_match = re.search(r'Bridge: ([^ |]+)', line)
b_id = bridge_match.group(1) if bridge_match else ""
ext_match = re.search(r'\[Extension\] .*?\[(\d+)\]', line)
e_num = ext_match.group(1) if ext_match else ""
comp_match = re.search(r'\[Extension\] \[\d+\] \[([^\]]+)\]', line)
comp = comp_match.group(1) if comp_match else ""
cid_match = re.search(r'CID: ([^|]+)', line)
cid = cid_match.group(1).strip().replace('|', '-') if cid_match else ""
match_cid = cid.replace('+', '')
if match_cid.startswith('44'): match_cid = '0' + match_cid[2:]
if b_id and e_num: self.relations_table[b_id] = f"{comp}|{e_num}"
if match_cid and comp: self.global_cid_map[match_cid] = comp
def _lookup_cid_by_bridge(self, bridge_id: str, all_calls: List[str], exclude_line: str) -> str:
if not bridge_id: return ""
for line in all_calls:
if line == exclude_line or "[Trunk]" not in line: continue
bridge_match = re.search(r'Bridge: ([^ |]+)', line)
if bridge_match and bridge_match.group(1) == bridge_id:
cid_match = re.search(r'CID: ([^|]+)', line)
if cid_match: return cid_match.group(1).strip()
return ""
def calculate_stats(self, call_data: List[str], section_title: str, all_calls: List[str] = None) -> List[str]:
output = []
count, sum_quality, min_rtt, max_rtt, sum_rtt = 0, 0, 999999, 0, 0
sum_tx_jit, sum_rx_jit, sum_tx_loss, sum_rx_loss, total_duration_sec = 0, 0, 0, 0, 0
worst_calls = []
if all_calls is None: all_calls = call_data
for line in call_data:
if not line or "Quality:" not in line: continue
codec_match = re.search(r'Codec: ([^|]+)', line)
codec = codec_match.group(1).strip() if codec_match else "-"
if codec == "-": continue
count += 1
rtt = safe_int(re.search(r'RTT: ([\d.]+)', line).group(1) if re.search(r'RTT: ([\d.]+)', line) else "0")
tx_jit = safe_int(re.search(r'Jitter: TX ([\d.]+)', line).group(1) if re.search(r'Jitter: TX ([\d.]+)', line) else "0")
rx_jit = safe_int(re.search(r'Jitter:.*RX ([\d.]+)', line).group(1) if re.search(r'Jitter:.*RX ([\d.]+)', line) else "0")
tx_loss = safe_int(re.search(r'Loss: TX ([\d.]+)', line).group(1) if re.search(r'Loss: TX ([\d.]+)', line) else "0")
rx_loss = safe_int(re.search(r'Loss:.*RX ([\d.]+)', line).group(1) if re.search(r'Loss:.*RX ([\d.]+)', line) else "0")
dur_match = re.search(r'Dur: ([\d:]+)', line)
duration = dur_match.group(1) if dur_match else "00:00:00"
qual_match = re.search(r'Quality: (\d+)', line)
quality = safe_int(qual_match.group(1) if qual_match else "100")
sum_quality += quality
dur_sec = parse_duration(duration)
total_duration_sec += dur_sec
sum_rtt += rtt
if 0 < rtt < min_rtt: min_rtt = rtt
if rtt > max_rtt: max_rtt = rtt
sum_tx_jit += tx_jit
sum_rx_jit += rx_jit
sum_tx_loss += tx_loss
sum_rx_loss += rx_loss
worst_calls.append({'quality': quality, 'line': line, 'duration': duration, 'codec': codec, 'rtt': rtt, 'tx_loss': tx_loss, 'rx_loss': rx_loss})
if count == 0: return output
avg_quality = sum_quality // count
avg_rtt = sum_rtt // count
avg_tx_jit = sum_tx_jit // count
avg_rx_jit = sum_rx_jit // count
avg_tx_loss = sum_tx_loss // count
avg_rx_loss = sum_rx_loss // count
total_duration = format_duration(total_duration_sec)
avg_dur_sec = total_duration_sec // count
avg_dur_fmt = format_duration(avg_dur_sec)
if min_rtt == 999999: min_rtt = 0
q_col = Colors.GREEN if avg_quality >= self.config.warning_score else Colors.RED
avg_dur_color = Colors.RED if avg_dur_sec < 10 else Colors.RESET
output.append(f"{Colors.BOLD}{section_title:<20}{Colors.RESET} Avg Qual: {q_col}{avg_quality}%{Colors.RESET} | Calls: {count} | Avg Dur: {avg_dur_color}{avg_dur_fmt}{Colors.RESET} | Tot Dur: {total_duration}")
output.append(f"{Colors.DIM}Stats:{Colors.RESET} RTT: {min_rtt}-{max_rtt}ms (Avg {avg_rtt}) | Jitter: TX {avg_tx_jit} RX {avg_rx_jit} (Avg {(avg_tx_jit + avg_rx_jit) // 2}) | Loss: TX {avg_tx_loss}% RX {avg_rx_loss}%")
worst_calls.sort(key=lambda x: x['quality'])
for i, call in enumerate(worst_calls[:3]):
line = call['line']
raw_name = re.sub(r'^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\] ', '', line.split('|')[0])
time_match = re.search(r'^\[[\d-]+ (\d{2}:\d{2}:\d{2})', line)
time_val = time_match.group(1) if time_match else ""
bridge_match = re.search(r'Bridge: ([^ |]+)', line)
bridge_id = bridge_match.group(1) if bridge_match else ""
cid_match = re.search(r'CID: ([^|]+)', line)
raw_cid = cid_match.group(1).strip() if cid_match else ""
if "[Extension]" in line and raw_cid and re.match(r'^\d{3,5}$', raw_cid):
trunk_cid = self._lookup_cid_by_bridge(bridge_id, all_calls, line)
if trunk_cid: raw_cid = trunk_cid
js_match = re.search(r'JitterSpikes: ([^| ]+)', line)
js_val = js_match.group(1) if js_match else "0/0"
tx_pkts_match = re.search(r'Pkts: TX ([^ ]+)', line)
tx_pkts = tx_pkts_match.group(1) if tx_pkts_match else "0/0"
rx_pkts_match = re.search(r'Pkts:.*RX ([^ |]+)', line)
rx_pkts = rx_pkts_match.group(1) if rx_pkts_match else "0/0"
if "[Extension]" in raw_name:
ext_match = re.search(r'\[(\d+)\]', raw_name)
ext_n = ext_match.group(1) if ext_match else ""
comp_matches = re.findall(r'\[([a-zA-Z0-9 ]+)\]', raw_name)
comp = next((m for m in comp_matches if m != "Extension" and m != ext_n), "")
name_part = f"{comp} (Ext {ext_n})" if comp else f"Ext {ext_n}"
elif "[Trunk]" in raw_name:
comp_match = re.search(r'\[Trunk\] \[[^\]]+\] \[([^\]]+)', raw_name)
name_part = comp_match.group(1) if comp_match else "Trunk Call"
else: name_part = raw_name[:20]
quality_display = get_quality_display(call['quality'], self.config)
output.append(f" {i+1}. {name_part[:35]:<35} ({raw_cid[:13]:<13}) T:{time_val} Q:{quality_display} D:{call['duration']} C:{call['codec']} RTT:{call['rtt']}ms JS:{js_val} L:{call['tx_loss']}/{call['rx_loss']}% TX {tx_pkts} RX {rx_pkts}")
return output
def get_recent_calls_section(self, recent_lines: List[str], all_calls: List[str]) -> List[str]:
output = []
recent_3 = recent_lines[-3:][::-1]
if not recent_3: return output
output.append("")
output.append(f"{Colors.BOLD}Last 3 Recent Calls{Colors.RESET}")
for i, line in enumerate(recent_3):
if not line or "Quality:" not in line: continue
raw_name = re.sub(r'^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\] ', '', line.split('|')[0])
time_match = re.search(r'^\[[\d-]+ (\d{2}:\d{2}:\d{2})', line)
time_val = time_match.group(1) if time_match else ""
bridge_match = re.search(r'Bridge: ([^ |]+)', line)
bridge_id = bridge_match.group(1) if bridge_match else ""
cid_match = re.search(r'CID: ([^|]+)', line)
raw_cid = cid_match.group(1).strip() if cid_match else ""
if "[Extension]" in line and raw_cid and re.match(r'^\d{3,5}$', raw_cid):
trunk_cid = self._lookup_cid_by_bridge(bridge_id, all_calls, line)
if trunk_cid: raw_cid = trunk_cid
dur_match = re.search(r'Dur: ([\d:]+)', line)
duration = dur_match.group(1) if dur_match else "00:00:00"
codec_match = re.search(r'Codec: ([^|]+)', line)
codec = codec_match.group(1).strip() if codec_match else "-"
qual_match = re.search(r'Quality: (\d+)', line)
quality = safe_int(qual_match.group(1) if qual_match else "100")
rtt_match = re.search(r'RTT: ([\d.]+)', line)
rtt = safe_int(rtt_match.group(1) if rtt_match else "0")
js_match = re.search(r'JitterSpikes: ([^| ]+)', line)
js_val = js_match.group(1) if js_match else "0/0"
tx_loss_match = re.search(r'Loss: TX ([\d.]+)', line)
tx_loss = safe_int(tx_loss_match.group(1) if tx_loss_match else "0")
rx_loss_match = re.search(r'Loss:.*RX ([\d.]+)', line)
rx_loss = safe_int(rx_loss_match.group(1) if rx_loss_match else "0")
tx_pkts_match = re.search(r'Pkts: TX ([^ ]+)', line)
tx_pkts = tx_pkts_match.group(1) if tx_pkts_match else "0/0"
rx_pkts_match = re.search(r'Pkts:.*RX ([^ |]+)', line)
rx_pkts = rx_pkts_match.group(1) if rx_pkts_match else "0/0"
if "[Extension]" in raw_name:
ext_match = re.search(r'\[(\d+)\]', raw_name)
ext_n = ext_match.group(1) if ext_match else ""
comp_matches = re.findall(r'\[([a-zA-Z0-9 ]+)\]', raw_name)
comp = next((m for m in comp_matches if m != "Extension" and m != ext_n), "")
name_part = f"{comp} (Ext {ext_n})" if comp else f"Ext {ext_n}"
call_type = "EXT"
elif "[Trunk]" in raw_name:
call_type = "IN" if "[Inbound]" in raw_name else "OUT" if "[Outbound]" in raw_name else "TRK"
comp_match = re.search(r'\[Trunk\] \[[^\]]+\] \[([^\]]+)', raw_name)
name_part = comp_match.group(1) if comp_match else "Trunk Call"
else:
name_part = raw_name[:20]
call_type = "???"
quality_display = get_quality_display(quality, self.config)
output.append(f" {i+1}. [{call_type}] {name_part[:30]:<30} ({raw_cid[:13]:<13}) T:{time_val} Q:{quality_display} D:{duration} C:{codec} RTT:{rtt}ms JS:{js_val} L:{tx_loss}/{rx_loss}% TX {tx_pkts} RX {rx_pkts}")
return output
def get_advanced_stats(self, recent_calls: List[str]) -> List[str]:
output = []
total_recent = len(recent_calls)
short_count = 0
short_call_map: Dict[str, int] = defaultdict(int)
for line in recent_calls:
dur_match = re.search(r'Dur: ([\d:]+)', line)
if dur_match:
dur_sec = parse_duration(dur_match.group(1))
if dur_sec < 10:
short_count += 1
if "[Extension]" in line:
ext_match = re.search(r'\[Extension\] \[(\d+)', line)
t_name = ext_match.group(1) if ext_match else "Unknown Ext"
else:
comp_match = re.search(r'\[Trunk\] \[[^\]]+\] \[([^\]]+)', line)
t_name = comp_match.group(1) if comp_match else "Unknown"
short_call_map[t_name] += 1
short_pct = (short_count * 100) // total_recent if total_recent > 0 else 0
short_color = Colors.YELLOW if short_pct > 20 else Colors.GREEN
outbound_counts: Dict[str, int] = defaultdict(int)
inbound_counts: Dict[str, int] = defaultdict(int)
for line in recent_calls:
if "[Outbound]" in line:
match = re.search(r'\[Trunk\] \[Outbound\] \[([^\]]+)', line)
if match: outbound_counts[match.group(1)] += 1
elif "[Inbound]" in line:
match = re.search(r'\[Trunk\] \[Inbound\] \[([^\]]+)', line)
if match: inbound_counts[match.group(1)] += 1
top_out = max(outbound_counts.items(), key=lambda x: x[1], default=("None", 0))
top_in = max(inbound_counts.items(), key=lambda x: x[1], default=("None", 0))
out_fmt = f"{top_out[0]} ({top_out[1]})" if top_out[1] > 0 else "None"
in_fmt = f"{top_in[0]} ({top_in[1]})" if top_in[1] > 0 else "None"
codec_counts: Dict[str, int] = defaultdict(int)
for line in recent_calls:
codec_match = re.search(r'Codec: (\w+)', line)
if codec_match:
codec = codec_match.group(1).lower()
if codec in ('ulaw', 'alaw'): codec_counts['G711'] += 1
elif codec == 'g722': codec_counts['G722'] += 1
elif codec == 'g729': codec_counts['G729'] += 1
elif codec == 'opus': codec_counts['Opus'] += 1
elif codec not in ('-', ''): codec_counts[codec.upper()] += 1
codec_parts = [f"{c}:{n}" for c, n in sorted(codec_counts.items(), key=lambda x: x[1], reverse=True) if n > 0]
codec_str = " ".join(codec_parts) if codec_parts else "None"
output.append(f"{Colors.BOLD}Insights (Last {len(recent_calls)}):{Colors.RESET} Short Calls: {short_color}{short_pct}%{Colors.RESET} | Top Outbound: {Colors.CYAN}{out_fmt}{Colors.RESET} | Top Inbound: {Colors.CYAN}{in_fmt}{Colors.RESET} | Codecs: {codec_str}")
if short_count > 0:
leaders = sorted(short_call_map.items(), key=lambda x: x[1], reverse=True)[:2]
output.append(f"{Colors.DIM}Short Call Leaders:{Colors.RESET} {', '.join(f'{k} ({v})' for k, v in leaders)}")
# Quality leaders - based on average quality percentage
loss_list = []
for line in recent_calls:
qual_match = re.search(r'Quality: (\d+)', line)
quality = safe_int(qual_match.group(1) if qual_match else "100")
if quality < 90:
if "[Extension]" in line:
ext_match = re.search(r'\[Extension\] \[(\d+)\] \[([^\]]+)', line)
if ext_match: loss_list.append((quality, f"{ext_match.group(2)} (Ext {ext_match.group(1)}) [{quality}%]"))
elif "[Trunk]" in line:
comp_match = re.search(r'\[Trunk\] \[[^\]]+\] \[([^\]]+)', line)
if comp_match: loss_list.append((quality, f"{comp_match.group(1)} [{quality}%]"))
loss_list.sort(key=lambda x: x[0])
if loss_list: output.append(f"{Colors.DIM}Quality Leaders (Lowest):{Colors.RESET} {', '.join(x[1] for x in loss_list[:3])}")
else: output.append(f"{Colors.DIM}Quality Leaders:{Colors.RESET} None (all calls ≥90%)")
return output
def get_log_statistics(self) -> List[str]:
output = []
if not CALL_LOG_FILE.exists() or CALL_LOG_FILE.stat().st_size == 0: return output
try:
with open(CALL_LOG_FILE, 'r') as f: lines = f.readlines()
except Exception: return output
log_chunk = lines[-self.config.log_trim_lines:] if len(lines) > self.config.log_trim_lines else lines
self.build_relations_map(''.join(log_chunk))
recent_chunk = log_chunk[-self.config.stats_fetch_lines:] if len(log_chunk) > self.config.stats_fetch_lines else log_chunk
recent_lines = [l.strip() for l in recent_chunk if l.strip()]
ext_calls = [l for l in recent_lines if "[Extension]" in l]
inbound_calls = [l for l in recent_lines if "[Trunk]" in l and "[Inbound]" in l]
outbound_calls = [l for l in recent_lines if "[Trunk]" in l and "[Outbound]" in l]
output.append(f"{Colors.BOLD} CALL STATISTICS (Worst 3 Calls from Last {self.config.stats_fetch_lines} Calls){Colors.RESET}")
output.extend(self.get_advanced_stats(recent_lines))
width = get_terminal_width()
output.append(f"{Colors.CYAN}{'-' * width}{Colors.RESET}")
if ext_calls: output.extend(self.calculate_stats(ext_calls, "Extension Calls", recent_lines))
output.append("")
if inbound_calls: output.extend(self.calculate_stats(inbound_calls, "Inbound Trunks", recent_lines))
output.append("")
if outbound_calls: output.extend(self.calculate_stats(outbound_calls, "Outbound Trunks", recent_lines))
output.extend(self.get_recent_calls_section(recent_lines, recent_lines))
return output
class VitalPBXMonitor:
def __init__(self):
self.config = load_config()
self.tenant_mgr = TenantManager()
self.tracker = CallTracker(self.config)
self.parser = ChannelParser(self.config, self.tenant_mgr, self.tracker)
self.stats_display = StatisticsDisplay(self.config, self.tenant_mgr)
def run_single_update(self):
self.tracker.cleanup_old_files()
self.tracker.trim_log_file()
self.parser.load_persisted_state()
channels_concise = run_asterisk_cmd("core show channels concise")
current_ts = int(time.time())
if CLI_CACHE_FILE.exists() and current_ts - int(CLI_CACHE_FILE.stat().st_mtime) < 2:
stats_raw = CLI_CACHE_FILE.read_text()
else:
stats_raw = run_asterisk_cmd("pjsip show channelstats")
CLI_CACHE_FILE.write_text(stats_raw)
self.parser.parse_channels_concise(channels_concise)
self.parser.parse_channel_stats(stats_raw)
active_channels = set()
for line in channels_concise.split('\n'):
if line.startswith("PJSIP/"):
active_channels.add(line.split('!')[0].replace("PJSIP/", "").lower())
self.tracker.check_ended_calls(active_channels, self.tenant_mgr)
self.display_output(channels_concise)
def display_output(self, channels_concise: str):
output_buffer = []
width = get_terminal_width()
sep_line = '-' * width
uptime = get_uptime()
cpu = get_cpu_usage()
_, load_pct = get_load_average()
_, _, mem_pct = get_memory_usage()
active_count = sum(1 for l in channels_concise.split('\n') if l.startswith("PJSIP/"))
logging_status = f"{Colors.GREEN}Logging Enabled{Colors.RESET}" if self.config.enable_logging else f"{Colors.YELLOW}Logging Disabled{Colors.RESET}"
output_buffer.append(f"{Colors.CYAN}{sep_line}{Colors.RESET}")
output_buffer.append(f"{Colors.BOLD} LIVE CALL QUALITY MONITOR (v1.14 Python){Colors.RESET}")
output_buffer.append(f" {Colors.WHITE}Uptime:{Colors.RESET} {uptime} | {Colors.WHITE}Active Calls:{Colors.RESET} {active_count} | {logging_status}")
output_buffer.append(f" {Colors.WHITE}CPU:{Colors.RESET} {cpu}% [1min: {load_pct:.1f}%] | {Colors.WHITE}Memory:{Colors.RESET} {mem_pct:.1f}%")
output_buffer.append(f"{Colors.CYAN}{sep_line}{Colors.RESET}")
current_time = int(time.time())
last_update = 0
if STATS_TIMESTAMP_FILE.exists():
try: last_update = int(STATS_TIMESTAMP_FILE.read_text().strip())
except Exception: pass
if current_time - last_update > 15 or not STATS_CACHE_FILE.exists():
stats_output = self.stats_display.get_log_statistics()
STATS_CACHE_FILE.write_text('\n'.join(stats_output))
STATS_TIMESTAMP_FILE.write_text(str(current_time))
if STATS_CACHE_FILE.exists():
output_buffer.append(STATS_CACHE_FILE.read_text())
output_buffer.append(f"{Colors.CYAN}{sep_line}{Colors.RESET}")
live_buffer = []
live_cnt_in, live_cnt_out, live_cnt_ext = 0, 0, 0
for line in channels_concise.split('\n'):
if not line.startswith("PJSIP/"): continue
parts = line.split('!')
chan = parts[0]
chan_short = chan.replace("PJSIP/", "").lower()
info = self.parser.channel_info.get(chan_short, ChannelInfo())
duration_sec = 0
for i in range(len(parts) - 1, -1, -1):
if re.match(r'^[0-9a-f]{8}-[0-9a-f]{4}', parts[i]):
try: duration_sec = int(parts[i-1])
except (ValueError, IndexError): pass
break
duration = format_duration(duration_sec)
metrics = self.parser.get_metrics_for_channel(chan_short, info.bridge_id)
quality_score = calculate_quality_score(metrics.rtt_ms, metrics.tx_jit_ms, metrics.rx_jit_ms,
metrics.tx_pct, metrics.rx_pct, metrics.tx_cnt, metrics.tx_lost, metrics.rx_cnt, metrics.rx_lost)
current_mos = calculate_mos(metrics.rtt_ms, metrics.tx_jit_ms, metrics.rx_jit_ms, metrics.tx_pct, metrics.rx_pct)
safe_chan = chan_short.replace('/', '_')
self.tracker.update_tracking(safe_chan, metrics.rtt_ms, metrics.tx_jit_ms, metrics.rx_jit_ms,
metrics.tx_pct, metrics.rx_pct, quality_score, metrics.tx_cnt, metrics.tx_lost, metrics.rx_cnt, metrics.rx_lost, current_mos)
overall_quality = self.tracker.get_overall_quality(safe_chan)
stats = self.tracker.load_stats(safe_chan)
display_name = chan_short
direction = ""
call_info = ""
if info.exten and info.exten != "s":
call_info = f"{Colors.DIM}-> {Colors.RESET}{Colors.YELLOW}{info.exten}{Colors.RESET}"
if info.caller_id and info.caller_id != '""':
call_info += f" {Colors.DIM}from{Colors.RESET} {Colors.CYAN}{info.caller_id.replace(chr(34), '')}{Colors.RESET}"
ext_match = re.match(r'^t(\d+)_(\d+)', chan_short)
if ext_match:
live_cnt_ext += 1
tenant_name = self.tenant_mgr.get_name(ext_match.group(1))
ext_num = ext_match.group(2)
display_name = f"{Colors.CYAN}[Extension]{Colors.RESET} {Colors.CYAN}[{tenant_name}]{Colors.RESET} {Colors.YELLOW}[{ext_num}]{Colors.RESET}"
elif re.match(r'^admin\d+-[0-9a-f]+|^[a-z][a-z0-9_-]+-[0-9a-f]+', chan_short):
saved_dir = self.parser.channel_direction.get(chan_short, "")
saved_tenant = self.parser.channel_tenant.get(chan_short, "")
linked_tenant = self.parser.get_tenant_from_linked_extension(chan_short)
if linked_tenant:
saved_tenant = linked_tenant
self.parser.channel_tenant[chan_short] = linked_tenant
if saved_dir == "inbound":
direction = f"{Colors.YELLOW}[Inbound]{Colors.RESET} "
call_info = f"{Colors.DIM}from{Colors.RESET} {Colors.CYAN}{info.caller_id.replace(chr(34), '')}{Colors.RESET}"
live_cnt_in += 1
elif saved_dir == "outbound":
direction = f"{Colors.ORANGE}[Outbound]{Colors.RESET} "
call_info = f"{Colors.DIM}->{Colors.RESET} {Colors.YELLOW}{info.caller_id.replace(chr(34), '')}{Colors.RESET}"
live_cnt_out += 1
else:
direction = f"{Colors.RED}[Unknown]{Colors.RESET} "
tenant_label = f" {Colors.CYAN}[{self.tenant_mgr.get_name(saved_tenant)}]{Colors.RESET}" if saved_tenant else ""
display_name = f"{direction}{Colors.GREY}[Trunk]{Colors.RESET}{tenant_label} {chan_short}"
clean_dir = "inbound" if "Inbound" in direction else "outbound" if "Outbound" in direction else "unknown"
effective_bridge = self.parser.get_effective_bridge_id(chan_short)
self.tracker.save_call_info(safe_chan, display_name, duration, metrics.codec, info.context,
effective_bridge, info.caller_id, clean_dir, info.app, self.parser.channel_tenant.get(chan_short, ""))
quality_display = get_quality_display(quality_score, self.config)
avg_mos = stats.sum_mos / stats.samples if stats.samples > 0 else current_mos
js_pct = (stats.high_jitter_count * 100) // stats.samples if stats.samples > 0 else 0
live_buffer.append("")
live_buffer.append(f"{Colors.BOLD}{Colors.WHITE}{display_name}{Colors.RESET} {Colors.GREEN}*{Colors.RESET} Active {call_info}")
live_buffer.append(f"{Colors.DIM}+-{Colors.RESET} Time: {Colors.CYAN}{duration}{Colors.RESET} | {Colors.CYAN}{metrics.codec}{Colors.RESET} | RTT: {metrics.rtt_ms:.1f}ms {Colors.DIM}({stats.peak_rtt}ms){Colors.RESET} | Q: {quality_display} {Colors.DIM}({overall_quality}%){Colors.RESET} | MOS: {Colors.GREEN}{current_mos:.1f}{Colors.RESET} {Colors.DIM}(Avg: {avg_mos:.1f}){Colors.RESET} | JS: {stats.high_jitter_count}/{stats.samples} ({js_pct}%) | J: TX {metrics.tx_jit_ms:.0f}ms RX {metrics.rx_jit_ms:.0f}ms | Loss: TX {metrics.tx_pct}% ({metrics.tx_lost}/{metrics.tx_cnt}) / RX {metrics.rx_pct}% ({metrics.rx_lost}/{metrics.rx_cnt})")
total_live_calls = live_cnt_in + live_cnt_out + live_cnt_ext
max_display = self.config.max_live_calls_display
output_buffer.append("")
output_buffer.append(f"{Colors.BOLD} LIVE CALLS{Colors.RESET}")
output_buffer.append(f"Inbound: {Colors.YELLOW}{live_cnt_in}{Colors.RESET} | Outbound: {Colors.ORANGE}{live_cnt_out}{Colors.RESET} | Extensions: {Colors.CYAN}{live_cnt_ext}{Colors.RESET}")
output_buffer.append(f"{Colors.CYAN}{sep_line}{Colors.RESET}")
max_lines = max_display * 3
for line in live_buffer[:max_lines]: output_buffer.append(line)
hidden_calls = total_live_calls - max_display
if len(live_buffer) > max_lines and hidden_calls > 0:
output_buffer.append("")
output_buffer.append(f"{Colors.DIM} ... plus {hidden_calls} more active call(s) not shown{Colors.RESET}")
sys.stdout.write('\033[H\033[J')
sys.stdout.write('\n'.join(output_buffer))
sys.stdout.write('\n')
sys.stdout.flush()
def run(self):
if os.geteuid() != 0:
print("Error: This script must be run as root to access Asterisk CLI.")
sys.exit(1)
sys.stdout.write('\033[?1049h\033[?25l')
sys.stdout.flush()
try:
while True:
self.run_single_update()
time.sleep(self.config.refresh_interval)
except KeyboardInterrupt: pass
finally:
sys.stdout.write('\033[?25h\033[?1049l')
sys.stdout.flush()
print("\nMonitor stopped.")
if __name__ == "__main__":
VitalPBXMonitor().run()
chmod +x /usr/local/bin/vitalpbx_monitor.py
Now run the file
python3 /usr/local/bin/vitalpbx_monitor.py
#To Exit - use Ctrl+C
Setup is automatic for first run and you can edit the auto generated config file like this:
nano /usr/local/bin/vitalpbx_monitor.conf
# VitalPBX Monitor Configuration
# Python version
# --- General Settings ---
ENABLE_LOGGING=true
REFRESH_INTERVAL=1
# --- Quality Thresholds (0-100) ---
WARNING_SCORE=90
CRITICAL_SCORE=70
# --- Advanced Log Processing ---
LOG_TRIM_LINES=800
STATS_FETCH_LINES=50
# --- Performance & Cleanup ---
RETENTION_MINUTES=120
MAX_TRACKING_FILES=100
# --- Display Settings ---
# Maximum number of live calls to display (default: 10)
MAX_LIVE_CALLS_DISPLAY=10
# --- Log File Management ---
# Maximum lines to keep in log file (default: 2000)
MAX_LOG_LINES=2000
# Remove log entries older than this many hours (default: 12)
LOG_RETENTION_HOURS=12
File Locations
| File | Location | Purpose |
|---|---|---|
| Config | {script_dir}/vitalpbx_monitor.conf | User settings |
| Call Log | /var/log/vitalpbx_call_quality.log | Completed call records |
| Tenant Map | /usr/local/etc/tenant-lookup.txt | Tenant ID to company name |
| Tracking | /tmp/vitalpbx_call_tracking/ | Runtime call data |
| Caches | /tmp/vitalpbx_* | Temporary cache files |
