[Day #49 PyATS Series] Check for Spanning-Tree Topology Changes (logs + CLI) using pyATS for Cisco [Python for Network Engineer]
Table of Contents
Introduction — key points
Spanning Tree Protocol (STP) topology changes — root bridge changes, port state transitions, flaps, and BPDU-related protections — are one of the most common causes of temporary outages and microbursts in switched networks. Detecting these events quickly and reliably across many switches requires automation and good observability.
In this Article you’ll learn exactly how to:
- Collect STP state and historical changes from Cisco devices (IOS-XE / NX-OS / IOS-XR variants) using pyATS (Genie + device.execute fallbacks).
- Parse device CLI (
show spanning-tree
,show spanning-tree detail
) and device syslogs (show logging
) to detect STP topology changes. - Correlate CLI snapshots with syslog events and visualize them in a GUI (Elasticsearch + Kibana / Grafana).
- Build a repeatable workflow that snapshots current STP state, detects deltas, generates alerts, and stores history for trend analysis.
- Validate and demonstrate results via CLI examples and GUI queries.
This is a hands-on masterclass Article— lots of code, exact commands, structured parsing tips, and real-world validation advice so you can replicate this in your lab or production staging environment.
Topology Overview
We will use a small but realistic lab topology for demonstrations. This is enough to exercise root election, port states and occasional induced topology changes.

- VLANs 1, 10, 20 are configured across switches (PVST or MST possible).
- CORE1 and CORE2 are candidates for root bridge; we will check root priority, root MAC per VLAN.
- Syslog server (ELK/Kibana) receives syslogs from all switches.
- AutomationHost (pyATS) connects to devices over a management network to fetch CLI outputs.
Topology & Communications
Management plane:
- Management network:
192.0.2.0/24
(example) — AutomationHost and Syslog server are reachable here. - SSH used by pyATS to connect to devices.
Data sources (what we pull & parse):
- CLI:
show spanning-tree
(single line summary)show spanning-tree vlan <vlan>
(per-VLAN detail including root ID)show spanning-tree detail
(port states, last topology change times)show running-config | include spanning-tree
(config check)
- Logs:
show logging
orshow logging | include SPANTREE|%SPANTREE|%LINK-3-UPDOWN
- Syslog server (Elasticsearch index
syslog-*
orcisco-*
) used as authoritative historical store
Validation flow:
- Snapshot baseline STP state (root per VLAN, root priority, port roles).
- After network changes or periodically, collect fresh state and detect deltas (root change, port state change, new topology change count).
- Cross-reference the time of detected change with syslog messages (BPDU guard, port flapping logs).
- Push findings to a report (JSON/CSV) and optionally to Elasticsearch for visualization/alerts.
Important: STP messages are often frequent on busy networks. Filter by only the events you care about (e.g., root changes, BPDU guard, root guard, TC events), and use time windows.
Workflow Script
Below is a production-ready pyATS script that:
- Loads testbed
- Collects STP CLI output and syslog excerpt
- Parses STP output for root and port states
- Detects changes vs previous snapshot
- Produces structured JSON reports and optional Elasticsearch push
Save as: stp_change_detector.py
#!/usr/bin/env python3 """ stp_change_detector.py Collect spanning-tree info and logs from devices via pyATS, detect topology changes, and generate reports. Fallback to raw execute() parsing when Genie parser is missing. """ import json, os, re, time from datetime import datetime, timezone from genie.testbed import load from pathlib import Path # Config TESTBED_FILE = 'testbed.yml' OUT_DIR = Path('stp_results') OUT_DIR.mkdir(exist_ok=True) HISTORY_FILE = OUT_DIR / 'stp_history.json' ES_PUSH = False ES_URL = "http://localhost:9200/stp-health/_doc/" # Regex patterns ROOT_RE = re.compile(r'Root ID\s+Priority\s+(\d+)\s+Address\s+([0-9a-f\.]+)', re.IGNORECASE) VLAN_ROOT_RE = re.compile(r'^VLAN\s+(?P<vlan>\d+).*Root ID.*Address\s+(?P<mac>[0-9A-Fa-f\.:]+).*Priority\s+(?P<pri>\d+)', re.MULTILINE) PORT_ROLE_RE = re.compile(r'^(?P<intf>\S+)\s+is\s+(?P<role>\w+).*Port\s+Priority', re.IGNORECASE) # Syslog patterns to look for (examples) SYSLOG_PATTERNS = [ re.compile(r'%SPANTREE.*root.*changed', re.IGNORECASE), re.compile(r'%SPANTREE.*Topology change detected', re.IGNORECASE), re.compile(r'%BPDUGUARD|%SPAN-2-BPDU_GUARD', re.IGNORECASE), re.compile(r'Interface\s+\S+,\s+changed state to (blocking|listening|learning|forwarding|disabled)', re.IGNORECASE) ] def load_history(): if HISTORY_FILE.exists(): with open(HISTORY_FILE) as f: return json.load(f) return {} def save_history(h): with open(HISTORY_FILE, 'w') as f: json.dump(h, f, indent=2) def parse_show_spanning_tree(raw): """ Parse 'show spanning-tree' or 'show spanning-tree vlan X' raw output. Return dict: {vlan: {'root_mac':..,'root_pri':..,'top_changes':N}, ...} """ result = {} # Try parsing per-vlan blocks vlan_blocks = re.split(r'\n(?=VLAN )', raw) for block in vlan_blocks: m = re.search(r'VLAN\s+(\d+)', block) if not m: continue vlan = m.group(1) # root mac root_match = re.search(r'Root ID\s+Priority\s+(\d+)\s+Address\s+([0-9A-Fa-f\.:]+)', block) if root_match: pri = int(root_match.group(1)) mac = root_match.group(2) else: # alternative formats alt = re.search(r'Root ID.*Address\s*([0-9A-Fa-f\.:]+).*Priority\s*(\d+)', block, re.DOTALL) if alt: mac = alt.group(1) pri = int(alt.group(2)) else: mac = None; pri = None # topology changes tc_m = re.search(r'Topology changes:\s*(\d+)', block) tc = int(tc_m.group(1)) if tc_m else 0 result[vlan] = {'root_mac': mac, 'root_pri': pri, 'topology_changes': tc} return result def parse_port_roles(raw): """ Parse port roles from show spanning-tree detail or interface outputs. Returns dict: {interface: role} """ roles = {} # common line: 'GigabitEthernet1/0/1 is forwarding port ...' for line in raw.splitlines(): m = re.search(r'^(?P<intf>\S+)\s+is\s+(?P<role>\w+)\s+port', line, re.IGNORECASE) if m: roles[m.group('intf')] = m.group('role').lower() return roles def collect_device(device): print(f"[{datetime.utcnow().isoformat()}] Collecting STP from {device.name}") device.connect(log_stdout=False) device.execute('terminal length 0') # get spanning-tree summary and details try: raw_stp = device.execute('show spanning-tree') except Exception: raw_stp = device.execute('show spanning-tree') # try anyway # get per-vlan detail if needed try: raw_stp_detail = device.execute('show spanning-tree detail') except Exception: raw_stp_detail = raw_stp # syslog excerpt around last hour: use device.execute('show logging | include SPANTREE|%SPAN') or use syslog server try: raw_logs = device.execute('show logging | include SPANTREE|%SPAN|BPDU|Topology change|changed state') except Exception: raw_logs = device.execute('show logging | tail 200') device.disconnect() parsed = { 'device': device.name, 'collected_at': datetime.utcnow().isoformat() + 'Z', 'stp_summary': parse_show_spanning_tree(raw_stp), 'port_roles': parse_port_roles(raw_stp_detail), 'logs_excerpt': raw_logs } return parsed def detect_changes(current, history): """ current: dict by device; history previously saved structure Return changes list """ changes = [] for devname, data in current.items(): hist_dev = history.get(devname, {}) for vlan, vinfo in data['stp_summary'].items(): prev = hist_dev.get('stp_summary', {}).get(vlan) if not prev: continue # root changed? if prev.get('root_mac') != vinfo.get('root_mac'): changes.append({ 'device': devname, 'vlan': vlan, 'change': 'root_changed', 'prev_root': prev.get('root_mac'), 'new_root': vinfo.get('root_mac'), 'time': data['collected_at'] }) # topology_count increased? if vinfo.get('topology_changes', 0) > prev.get('topology_changes', 0): changes.append({ 'device': devname, 'vlan': vlan, 'change': 'topology_change_count', 'prev': prev.get('topology_changes'), 'new': vinfo.get('topology_changes'), 'time': data['collected_at'] }) # port role changes for intf, role in data.get('port_roles', {}).items(): prev_role = hist_dev.get('port_roles', {}).get(intf) if prev_role and prev_role != role: changes.append({ 'device': devname, 'interface': intf, 'change': 'port_role_change', 'prev_role': prev_role, 'new_role': role, 'time': data['collected_at'] }) # scan logs for BPDU guard / root guard events logs = data.get('logs_excerpt', '') for pat in SYSLOG_PATTERNS: for l in logs.splitlines(): if pat.search(l): changes.append({ 'device': devname, 'change': 'syslog_event', 'event_line': l, 'time': data['collected_at'] }) return changes def push_to_es(doc): if not ES_PUSH: return False import requests r = requests.post(ES_URL, json=doc, timeout=10) r.raise_for_status() return True def main(): testbed = load(TESTBED_FILE) current = {} for name, device in testbed.devices.items(): try: current[name] = collect_device(device) # persist raw device result with open(OUT_DIR / f"{name}_stp.json", 'w') as f: json.dump(current[name], f, indent=2) except Exception as e: print(f"Failed collect for {name}:", e) history = load_history() changes = detect_changes(current, history) # Save overall report report = { 'generated_at': datetime.utcnow().isoformat() + 'Z', 'devices': list(current.keys()), 'changes': changes } with open(OUT_DIR / 'stp_changes_report.json', 'w') as f: json.dump(report, f, indent=2) # push to ES if wanted try: if ES_PUSH: for ch in changes: push_to_es(ch) except Exception as e: print("ES push failed:", e) # update history: store current as last-known snapshot for dev, d in current.items(): history[dev] = { 'stp_summary': d['stp_summary'], 'port_roles': d['port_roles'], 'collected_at': d['collected_at'] } save_history(history) print("Done. Report saved to", OUT_DIR) if __name__ == '__main__': main()
How to run:
python3 stp_change_detector.py
Explanation by Line
I’ll walk you through critical parts so you can adapt to real networks.
Regex and parsing choices
parse_show_spanning_tree()
splits into VLAN blocks and attempts flexible patterns; STP output varies across IOS versions and modes (PVST, RPVST+, MST). The parser intentionally uses multiple regex attempts — adjust as needed.- We extract
root_mac
,root_pri
, andtopology_changes
(a valuable metric). On some devices the field isTopology changes
, on othersTopology Change Count
— adapt regex.
log excerpt
device.execute('show logging | include SPANTREE|%SPAN|BPDU|Topology change|changed state')
tries to fetch only STP relevant syslog messages, minimizing parsing overhead. Some OS variants require different include terms — experiment and refine.
change detection
- We compare
history
(previous snapshot) withcurrent
to detect:- Root changes per VLAN (critical)
- Topology change counter increases
- Port role changes (blocked → forwarding or vice versa)
- Syslog events (BPDU guard, TC, root guard messages)
persistence
HISTORY_FILE
keeps the last snapshot across script runs. For long-term trends you’d store all events in Elasticsearch or a timeseries DB.
ES push
- Optional: push each change to ES for dashboarding/alerts. You may enrich with device metadata (site, role) before push.
testbed.yml Example
testbed: name: stp_masterclass credentials: default: username: admin password: Cisco123 devices: CORE1: os: iosxe type: router connections: cli: protocol: ssh ip: 192.0.2.11 CORE2: os: iosxe type: router connections: cli: protocol: ssh ip: 192.0.2.12 SW1: os: iosxe type: switch connections: cli: protocol: ssh ip: 192.0.2.21 SW2: os: iosxe type: switch connections: cli: protocol: ssh ip: 192.0.2.22
Secrets note: In production store credentials in HashiCorp Vault, ansible-vault, or environment variables and reference them in pyATS.
Post-validation CLI (Real expected output)
Below are textual screenshots you can paste into your blog as fixed-width screenshots.
A. show spanning-tree
output (IOS-XE)
SW1# show spanning-tree VLAN0001 Spanning tree enabled protocol ieee Root ID Priority 24577 Address 001a.2b3c.4d5e Cost 4 Port 20 (GigabitEthernet1/0/20) Hello Time 2 sec Max Age 20 sec Forward Delay 15 sec VLAN0010 Root ID Priority 24578 Address 001a.2b3c.4d5f Topology changes 3
B. show spanning-tree vlan 10 detail
SW1# show spanning-tree vlan 10 detail VLAN 10 Spanning tree enabled protocol ieee Root ID Priority 24578 Address 001a.2b3c.4d5f Bridge ID Priority 32769 Address 0022.3344.5566 Port Gi1/0/1 ('Gi1/0/1'), Role Root, State Forwarding Port Gi1/0/2 ('Gi1/0/2'), Role Alternate, State Blocking Topology change count 3 last change occurred 00:12:34 ago
C. show logging | include SPANTREE|Topology change
Feb 12 12:34:12.345: %SPANTREE-2-THROUGH_PROBLEM: VLAN0010 Topology change detected on interface GigabitEthernet1/0/2 Feb 12 12:34:13.111: %SPANTREE-2-ROOT_CHANGED: VLAN0010 Root changed from 001a.2b3c.4d5e to 001a.2b3c.4d5f Feb 12 12:34:15.000: %SPAN-2-BPDU_GUARD: Blocking interface GigabitEthernet1/0/5 due to BPDU guard
D. Sample stp_changes_report.json
excerpt (script output)
{ "generated_at": "2025-08-28T12:00:00Z", "devices": ["SW1", "SW2", "CORE1"], "changes": [ { "device": "SW1", "vlan": "10", "change": "root_changed", "prev_root": "001a.2b3c.4d5e", "new_root": "001a.2b3c.4d5f", "time": "2025-08-28T12:00:00Z" }, { "device": "SW1", "change": "syslog_event", "event_line": "Feb 12 12:34:15.000: %SPAN-2-BPDU_GUARD: Blocking interface GigabitEthernet1/0/5 due to BPDU guard", "time": "2025-08-28T12:00:00Z" } ] }
FAQs
1. How do you reliably detect a root change across the entire network, not just per-device?
Answer: Snapshot root (MAC & priority) for every VLAN on every switch. A true network-wide root change will show consistent new root MAC across devices for the VLAN. Your script compares histograms: if >50% of switches report new root, treat as network-level root change. Use syslog ROOT_CHANGED
messages to corroborate.
2. Which syslog messages indicate topology changes or protection events?
Answer: Common STP-related syslog mnemonics include:
%SPANTREE-2-THROUGH_PROBLEM
/Topology change detected
%SPANTREE-2-ROOT_CHANGED
%SPAN-2-BPDU_GUARD
(BPDU Guard)%SPAN-2-ROOTGUARD
(root guard)%LINK-3-UPDOWN
(interface changes that often trigger STP TCs)
Filter syslog for these mnemonics for efficient detection.
3. How do you avoid false positives from transient messages?
Answer: Use a noise threshold: require changes to persist across two consecutive runs (e.g., 2× polling interval) before alerting, or aggregate multiple syslog events within a short time window and count unique occurrences. Also check topology_changes
counters — a single transient message may not increment the counter.
4. How frequently should the script run?
Answer: For detection: every 1–5 minutes in production depending on size. For very large networks consider 5–15 minutes and rely more heavily on syslog streaming into ELK for real-time alerts. Polling too often can increase device load — use terminal length 0
and keep sessions lightweight.
5. Can we detect which port caused the root change?
Answer: Correlate the Root ID
change time with show logging
lines indicating port state transitions or BPDU guard events on specific interfaces. Where available, the show spanning-tree detail
often has Port X role
and last topology change
timestamps per port — compare timestamps to isolate culprits.
6. How to handle different STP flavors (PVST, RPVST+, MST)?
Answer: PVST runs STP per VLAN, MST maps VLANs to regions. Your parser must handle per-VLAN outputs (PVST) and the MST region/instance outputs. Design parse functions for each mode and detect mode via show spanning-tree summary
. Genie may have parsers per platform; otherwise use flexible regex.
7. How to build meaningful GUI dashboards from the output?
Answer: Push each change
as a document into an index (e.g., stp-changes-*
) with fields: device, vlan, change, prev_root, new_root, event_line, timestamp, severity
. In Kibana create:
- Time-series graph of topology changes per minute
- Table of latest root changes
- Map of devices with most TC events
- Alert rule for root changes or BPDU guard events
8. Is it safe to run this in production?
Answer: Yes — the script uses read-only commands. Be careful if you add safe debug or clear
commands; do not use clear spanning-tree
in production. For heavy environments prefer reading syslog from the collector for historical data instead of repeatedly dumping large buffers from devices.
YouTube Link
Watch the Complete Python for Network Engineer: Check for spanning-tree topology changes (logs + CLI) Using pyATS for Cisco [Python for Network Engineer] Lab Demo & Explanation on our channel:
Join Our Training
If you want instructor-led, hands-on training to build production-grade automation like this — with deep coverage of pyATS, Genie parsers, ELK/Grafana integrations, Ansible playbooks and operational best practices — Trainer Sagar Dhawan runs a 3-month instructor-led program that teaches Python, Ansible, APIs, and Cisco DevNet for Network Engineers.
This course will turn you from CLI user to automation lead — mastering workflows like STP topology change detection end-to-end: collection, parsing, alerting, and remediation.
Enroll / learn more:
https://course.networkjourney.com/python-ansible-api-cisco-devnet-for-network-engineers/
Join the program to hone your Python for Network Engineer skills and deliver reliable, automated network observability.
Enroll Now & Future‑Proof Your Career
Email: info@networkjourney.com
WhatsApp / Call: +91 97395 21088