Day #49 PyATS Series] Check for Spanning-Tree Topology Changes (logs + CLI) using pyATS for Cisco

[Day #49 PyATS Series] Check for Spanning-Tree Topology Changes (logs + CLI) using pyATS for Cisco [Python for Network Engineer]


Introduction — key points

Spanning Tree Protocol (STP) topology changes — root bridge changes, port state transitions, flaps, and BPDU-related protections — are one of the most common causes of temporary outages and microbursts in switched networks. Detecting these events quickly and reliably across many switches requires automation and good observability.

In this Article you’ll learn exactly how to:

  • Collect STP state and historical changes from Cisco devices (IOS-XE / NX-OS / IOS-XR variants) using pyATS (Genie + device.execute fallbacks).
  • Parse device CLI (show spanning-tree, show spanning-tree detail) and device syslogs (show logging) to detect STP topology changes.
  • Correlate CLI snapshots with syslog events and visualize them in a GUI (Elasticsearch + Kibana / Grafana).
  • Build a repeatable workflow that snapshots current STP state, detects deltas, generates alerts, and stores history for trend analysis.
  • Validate and demonstrate results via CLI examples and GUI queries.

This is a hands-on masterclass Article— lots of code, exact commands, structured parsing tips, and real-world validation advice so you can replicate this in your lab or production staging environment.


Topology Overview

We will use a small but realistic lab topology for demonstrations. This is enough to exercise root election, port states and occasional induced topology changes.

  • VLANs 1, 10, 20 are configured across switches (PVST or MST possible).
  • CORE1 and CORE2 are candidates for root bridge; we will check root priority, root MAC per VLAN.
  • Syslog server (ELK/Kibana) receives syslogs from all switches.
  • AutomationHost (pyATS) connects to devices over a management network to fetch CLI outputs.

Topology & Communications

Management plane:

  • Management network: 192.0.2.0/24 (example) — AutomationHost and Syslog server are reachable here.
  • SSH used by pyATS to connect to devices.

Data sources (what we pull & parse):

  • CLI:
    • show spanning-tree (single line summary)
    • show spanning-tree vlan <vlan> (per-VLAN detail including root ID)
    • show spanning-tree detail (port states, last topology change times)
    • show running-config | include spanning-tree (config check)
  • Logs:
    • show logging or show logging | include SPANTREE|%SPANTREE|%LINK-3-UPDOWN
    • Syslog server (Elasticsearch index syslog-* or cisco-*) used as authoritative historical store

Validation flow:

  1. Snapshot baseline STP state (root per VLAN, root priority, port roles).
  2. After network changes or periodically, collect fresh state and detect deltas (root change, port state change, new topology change count).
  3. Cross-reference the time of detected change with syslog messages (BPDU guard, port flapping logs).
  4. Push findings to a report (JSON/CSV) and optionally to Elasticsearch for visualization/alerts.

Important: STP messages are often frequent on busy networks. Filter by only the events you care about (e.g., root changes, BPDU guard, root guard, TC events), and use time windows.


Workflow Script

Below is a production-ready pyATS script that:

  • Loads testbed
  • Collects STP CLI output and syslog excerpt
  • Parses STP output for root and port states
  • Detects changes vs previous snapshot
  • Produces structured JSON reports and optional Elasticsearch push

Save as: stp_change_detector.py

#!/usr/bin/env python3
"""
stp_change_detector.py
Collect spanning-tree info and logs from devices via pyATS, detect topology changes,
and generate reports. Fallback to raw execute() parsing when Genie parser is missing.
"""

import json, os, re, time
from datetime import datetime, timezone
from genie.testbed import load
from pathlib import Path

# Config
TESTBED_FILE = 'testbed.yml'
OUT_DIR = Path('stp_results')
OUT_DIR.mkdir(exist_ok=True)
HISTORY_FILE = OUT_DIR / 'stp_history.json'
ES_PUSH = False
ES_URL = "http://localhost:9200/stp-health/_doc/"

# Regex patterns
ROOT_RE = re.compile(r'Root ID\s+Priority\s+(\d+)\s+Address\s+([0-9a-f\.]+)', re.IGNORECASE)
VLAN_ROOT_RE = re.compile(r'^VLAN\s+(?P<vlan>\d+).*Root ID.*Address\s+(?P<mac>[0-9A-Fa-f\.:]+).*Priority\s+(?P<pri>\d+)', re.MULTILINE)
PORT_ROLE_RE = re.compile(r'^(?P<intf>\S+)\s+is\s+(?P<role>\w+).*Port\s+Priority', re.IGNORECASE)

# Syslog patterns to look for (examples)
SYSLOG_PATTERNS = [
    re.compile(r'%SPANTREE.*root.*changed', re.IGNORECASE),
    re.compile(r'%SPANTREE.*Topology change detected', re.IGNORECASE),
    re.compile(r'%BPDUGUARD|%SPAN-2-BPDU_GUARD', re.IGNORECASE),
    re.compile(r'Interface\s+\S+,\s+changed state to (blocking|listening|learning|forwarding|disabled)', re.IGNORECASE)
]

def load_history():
    if HISTORY_FILE.exists():
        with open(HISTORY_FILE) as f:
            return json.load(f)
    return {}

def save_history(h):
    with open(HISTORY_FILE, 'w') as f:
        json.dump(h, f, indent=2)

def parse_show_spanning_tree(raw):
    """
    Parse 'show spanning-tree' or 'show spanning-tree vlan X' raw output.
    Return dict: {vlan: {'root_mac':..,'root_pri':..,'top_changes':N}, ...}
    """
    result = {}
    # Try parsing per-vlan blocks
    vlan_blocks = re.split(r'\n(?=VLAN )', raw)
    for block in vlan_blocks:
        m = re.search(r'VLAN\s+(\d+)', block)
        if not m:
            continue
        vlan = m.group(1)
        # root mac
        root_match = re.search(r'Root ID\s+Priority\s+(\d+)\s+Address\s+([0-9A-Fa-f\.:]+)', block)
        if root_match:
            pri = int(root_match.group(1))
            mac = root_match.group(2)
        else:
            # alternative formats
            alt = re.search(r'Root ID.*Address\s*([0-9A-Fa-f\.:]+).*Priority\s*(\d+)', block, re.DOTALL)
            if alt:
                mac = alt.group(1)
                pri = int(alt.group(2))
            else:
                mac = None; pri = None

        # topology changes
        tc_m = re.search(r'Topology changes:\s*(\d+)', block)
        tc = int(tc_m.group(1)) if tc_m else 0

        result[vlan] = {'root_mac': mac, 'root_pri': pri, 'topology_changes': tc}
    return result

def parse_port_roles(raw):
    """
    Parse port roles from show spanning-tree detail or interface outputs.
    Returns dict: {interface: role}
    """
    roles = {}
    # common line: 'GigabitEthernet1/0/1 is forwarding port ...'
    for line in raw.splitlines():
        m = re.search(r'^(?P<intf>\S+)\s+is\s+(?P<role>\w+)\s+port', line, re.IGNORECASE)
        if m:
            roles[m.group('intf')] = m.group('role').lower()
    return roles

def collect_device(device):
    print(f"[{datetime.utcnow().isoformat()}] Collecting STP from {device.name}")
    device.connect(log_stdout=False)
    device.execute('terminal length 0')
    # get spanning-tree summary and details
    try:
        raw_stp = device.execute('show spanning-tree')
    except Exception:
        raw_stp = device.execute('show spanning-tree')  # try anyway
    # get per-vlan detail if needed
    try:
        raw_stp_detail = device.execute('show spanning-tree detail')
    except Exception:
        raw_stp_detail = raw_stp

    # syslog excerpt around last hour: use device.execute('show logging | include SPANTREE|%SPAN') or use syslog server
    try:
        raw_logs = device.execute('show logging | include SPANTREE|%SPAN|BPDU|Topology change|changed state')
    except Exception:
        raw_logs = device.execute('show logging | tail 200')

    device.disconnect()
    parsed = {
        'device': device.name,
        'collected_at': datetime.utcnow().isoformat() + 'Z',
        'stp_summary': parse_show_spanning_tree(raw_stp),
        'port_roles': parse_port_roles(raw_stp_detail),
        'logs_excerpt': raw_logs
    }
    return parsed

def detect_changes(current, history):
    """
    current: dict by device; history previously saved structure
    Return changes list
    """
    changes = []
    for devname, data in current.items():
        hist_dev = history.get(devname, {})
        for vlan, vinfo in data['stp_summary'].items():
            prev = hist_dev.get('stp_summary', {}).get(vlan)
            if not prev:
                continue
            # root changed?
            if prev.get('root_mac') != vinfo.get('root_mac'):
                changes.append({
                    'device': devname,
                    'vlan': vlan,
                    'change': 'root_changed',
                    'prev_root': prev.get('root_mac'),
                    'new_root': vinfo.get('root_mac'),
                    'time': data['collected_at']
                })
            # topology_count increased?
            if vinfo.get('topology_changes', 0) > prev.get('topology_changes', 0):
                changes.append({
                    'device': devname,
                    'vlan': vlan,
                    'change': 'topology_change_count',
                    'prev': prev.get('topology_changes'),
                    'new': vinfo.get('topology_changes'),
                    'time': data['collected_at']
                })
        # port role changes
        for intf, role in data.get('port_roles', {}).items():
            prev_role = hist_dev.get('port_roles', {}).get(intf)
            if prev_role and prev_role != role:
                changes.append({
                    'device': devname,
                    'interface': intf,
                    'change': 'port_role_change',
                    'prev_role': prev_role,
                    'new_role': role,
                    'time': data['collected_at']
                })
        # scan logs for BPDU guard / root guard events
        logs = data.get('logs_excerpt', '')
        for pat in SYSLOG_PATTERNS:
            for l in logs.splitlines():
                if pat.search(l):
                    changes.append({
                        'device': devname,
                        'change': 'syslog_event',
                        'event_line': l,
                        'time': data['collected_at']
                    })
    return changes

def push_to_es(doc):
    if not ES_PUSH:
        return False
    import requests
    r = requests.post(ES_URL, json=doc, timeout=10)
    r.raise_for_status()
    return True

def main():
    testbed = load(TESTBED_FILE)
    current = {}
    for name, device in testbed.devices.items():
        try:
            current[name] = collect_device(device)
            # persist raw device result
            with open(OUT_DIR / f"{name}_stp.json", 'w') as f:
                json.dump(current[name], f, indent=2)
        except Exception as e:
            print(f"Failed collect for {name}:", e)

    history = load_history()
    changes = detect_changes(current, history)
    # Save overall report
    report = {
        'generated_at': datetime.utcnow().isoformat() + 'Z',
        'devices': list(current.keys()),
        'changes': changes
    }
    with open(OUT_DIR / 'stp_changes_report.json', 'w') as f:
        json.dump(report, f, indent=2)

    # push to ES if wanted
    try:
        if ES_PUSH:
            for ch in changes:
                push_to_es(ch)
    except Exception as e:
        print("ES push failed:", e)

    # update history: store current as last-known snapshot
    for dev, d in current.items():
        history[dev] = {
            'stp_summary': d['stp_summary'],
            'port_roles': d['port_roles'],
            'collected_at': d['collected_at']
        }
    save_history(history)
    print("Done. Report saved to", OUT_DIR)

if __name__ == '__main__':
    main()

How to run:

python3 stp_change_detector.py

Explanation by Line

I’ll walk you through critical parts so you can adapt to real networks.

Regex and parsing choices

  • parse_show_spanning_tree() splits into VLAN blocks and attempts flexible patterns; STP output varies across IOS versions and modes (PVST, RPVST+, MST). The parser intentionally uses multiple regex attempts — adjust as needed.
  • We extract root_mac, root_pri, and topology_changes (a valuable metric). On some devices the field is Topology changes, on others Topology Change Count — adapt regex.

log excerpt

  • device.execute('show logging | include SPANTREE|%SPAN|BPDU|Topology change|changed state') tries to fetch only STP relevant syslog messages, minimizing parsing overhead. Some OS variants require different include terms — experiment and refine.

change detection

  • We compare history (previous snapshot) with current to detect:
    • Root changes per VLAN (critical)
    • Topology change counter increases
    • Port role changes (blocked → forwarding or vice versa)
    • Syslog events (BPDU guard, TC, root guard messages)

persistence

  • HISTORY_FILE keeps the last snapshot across script runs. For long-term trends you’d store all events in Elasticsearch or a timeseries DB.

ES push

  • Optional: push each change to ES for dashboarding/alerts. You may enrich with device metadata (site, role) before push.

testbed.yml Example

testbed:
  name: stp_masterclass
  credentials:
    default:
      username: admin
      password: Cisco123
  devices:
    CORE1:
      os: iosxe
      type: router
      connections:
        cli:
          protocol: ssh
          ip: 192.0.2.11
    CORE2:
      os: iosxe
      type: router
      connections:
        cli:
          protocol: ssh
          ip: 192.0.2.12
    SW1:
      os: iosxe
      type: switch
      connections:
        cli:
          protocol: ssh
          ip: 192.0.2.21
    SW2:
      os: iosxe
      type: switch
      connections:
        cli:
          protocol: ssh
          ip: 192.0.2.22

Secrets note: In production store credentials in HashiCorp Vault, ansible-vault, or environment variables and reference them in pyATS.


Post-validation CLI (Real expected output)

Below are textual screenshots you can paste into your blog as fixed-width screenshots.

A. show spanning-tree output (IOS-XE)

SW1# show spanning-tree
VLAN0001
  Spanning tree enabled protocol ieee
  Root ID    Priority    24577
             Address     001a.2b3c.4d5e
             Cost        4
             Port        20 (GigabitEthernet1/0/20)
             Hello Time  2 sec  Max Age 20 sec  Forward Delay 15 sec

VLAN0010
  Root ID    Priority    24578
             Address     001a.2b3c.4d5f
             Topology changes 3

B. show spanning-tree vlan 10 detail

SW1# show spanning-tree vlan 10 detail
VLAN 10
  Spanning tree enabled protocol ieee
  Root ID    Priority    24578
             Address     001a.2b3c.4d5f
  Bridge ID  Priority    32769
             Address     0022.3344.5566
  Port  Gi1/0/1 ('Gi1/0/1'), Role Root, State Forwarding
  Port  Gi1/0/2 ('Gi1/0/2'), Role Alternate, State Blocking
  Topology change count 3 last change occurred 00:12:34 ago

C. show logging | include SPANTREE|Topology change

Feb 12 12:34:12.345: %SPANTREE-2-THROUGH_PROBLEM: VLAN0010 Topology change detected on interface GigabitEthernet1/0/2
Feb 12 12:34:13.111: %SPANTREE-2-ROOT_CHANGED: VLAN0010 Root changed from 001a.2b3c.4d5e to 001a.2b3c.4d5f
Feb 12 12:34:15.000: %SPAN-2-BPDU_GUARD: Blocking interface GigabitEthernet1/0/5 due to BPDU guard

D. Sample stp_changes_report.json excerpt (script output)

{
  "generated_at": "2025-08-28T12:00:00Z",
  "devices": ["SW1", "SW2", "CORE1"],
  "changes": [
    {
      "device": "SW1",
      "vlan": "10",
      "change": "root_changed",
      "prev_root": "001a.2b3c.4d5e",
      "new_root": "001a.2b3c.4d5f",
      "time": "2025-08-28T12:00:00Z"
    },
    {
      "device": "SW1",
      "change": "syslog_event",
      "event_line": "Feb 12 12:34:15.000: %SPAN-2-BPDU_GUARD: Blocking interface GigabitEthernet1/0/5 due to BPDU guard",
      "time": "2025-08-28T12:00:00Z"
    }
  ]
}

FAQs

1. How do you reliably detect a root change across the entire network, not just per-device?

Answer: Snapshot root (MAC & priority) for every VLAN on every switch. A true network-wide root change will show consistent new root MAC across devices for the VLAN. Your script compares histograms: if >50% of switches report new root, treat as network-level root change. Use syslog ROOT_CHANGED messages to corroborate.

2. Which syslog messages indicate topology changes or protection events?

Answer: Common STP-related syslog mnemonics include:

  • %SPANTREE-2-THROUGH_PROBLEM / Topology change detected
  • %SPANTREE-2-ROOT_CHANGED
  • %SPAN-2-BPDU_GUARD (BPDU Guard)
  • %SPAN-2-ROOTGUARD (root guard)
  • %LINK-3-UPDOWN (interface changes that often trigger STP TCs)
    Filter syslog for these mnemonics for efficient detection.

3. How do you avoid false positives from transient messages?

Answer: Use a noise threshold: require changes to persist across two consecutive runs (e.g., 2× polling interval) before alerting, or aggregate multiple syslog events within a short time window and count unique occurrences. Also check topology_changes counters — a single transient message may not increment the counter.

4. How frequently should the script run?

Answer: For detection: every 1–5 minutes in production depending on size. For very large networks consider 5–15 minutes and rely more heavily on syslog streaming into ELK for real-time alerts. Polling too often can increase device load — use terminal length 0 and keep sessions lightweight.

5. Can we detect which port caused the root change?

Answer: Correlate the Root ID change time with show logging lines indicating port state transitions or BPDU guard events on specific interfaces. Where available, the show spanning-tree detail often has Port X role and last topology change timestamps per port — compare timestamps to isolate culprits.

6. How to handle different STP flavors (PVST, RPVST+, MST)?

Answer: PVST runs STP per VLAN, MST maps VLANs to regions. Your parser must handle per-VLAN outputs (PVST) and the MST region/instance outputs. Design parse functions for each mode and detect mode via show spanning-tree summary. Genie may have parsers per platform; otherwise use flexible regex.

7. How to build meaningful GUI dashboards from the output?

Answer: Push each change as a document into an index (e.g., stp-changes-*) with fields: device, vlan, change, prev_root, new_root, event_line, timestamp, severity. In Kibana create:

  • Time-series graph of topology changes per minute
  • Table of latest root changes
  • Map of devices with most TC events
  • Alert rule for root changes or BPDU guard events

8. Is it safe to run this in production?

Answer: Yes — the script uses read-only commands. Be careful if you add safe debug or clear commands; do not use clear spanning-tree in production. For heavy environments prefer reading syslog from the collector for historical data instead of repeatedly dumping large buffers from devices.


YouTube Link

Watch the Complete Python for Network Engineer: Check for spanning-tree topology changes (logs + CLI) Using pyATS for Cisco [Python for Network Engineer] Lab Demo & Explanation on our channel:

Master Python Network Automation, Ansible, REST API & Cisco DevNet
Master Python Network Automation, Ansible, REST API & Cisco DevNet
Master Python Network Automation, Ansible, REST API & Cisco DevNet
Why Robot Framework for Network Automation?

Join Our Training

If you want instructor-led, hands-on training to build production-grade automation like this — with deep coverage of pyATS, Genie parsers, ELK/Grafana integrations, Ansible playbooks and operational best practices — Trainer Sagar Dhawan runs a 3-month instructor-led program that teaches Python, Ansible, APIs, and Cisco DevNet for Network Engineers.

This course will turn you from CLI user to automation lead — mastering workflows like STP topology change detection end-to-end: collection, parsing, alerting, and remediation.

Enroll / learn more:
https://course.networkjourney.com/python-ansible-api-cisco-devnet-for-network-engineers/

Join the program to hone your Python for Network Engineer skills and deliver reliable, automated network observability.

Enroll Now & Future‑Proof Your Career
Emailinfo@networkjourney.com
WhatsApp / Call: +91 97395 21088