[Day #69 PyATS Series] Automate NTP stratum consistency check using pyATS for Cisco

[Day #69 PyATS Series] Automate NTP stratum consistency check using pyATS for Cisco [Python for Network Engineer]


Introduction — key points

Time is everything on a network. NTP (Network Time Protocol) correctness is foundational — wrong time breaks logs, certificates, security tokens, RADIUS, syslog correlation, HA elections and more. The specific goal of this Article is to automate an NTP stratum consistency check across your Cisco devices so you can answer these questions automatically:

  • Are all devices synchronized (Clock is synchronized)?
  • What stratum does each device report? Are client stratum values acceptable relative to their configured NTP server(s)?
  • Are there devices using different or unexpected NTP sources?
  • Are offsets and reachability within acceptable thresholds?

Topology Overview

For this Article we use a small, realistic lab topology that you can scale to any size. The topology demonstrates multi-tier devices and a central NTP server (or cluster).

Device roles

  • NTP-PRIMARY (10.0.0.10) — authoritative source (GPS or stratum 1 reference).
  • NTP-SECONDARY (10.0.0.11) — backup NTP server.
  • Core-R1, R1-A, R3-B, R4 — Cisco routers/switches (clients). They should be configured to point to NTP-PRIMARY (and fallback to SECONDARY).

We’ll validate each managed node’s reported stratum, NTP association state, offset, and reachability to the server(s).


Topology & Communications (IP plan, protocols, and GUI touchpoints)

IP addressing (example)

  • NTP-PRIMARY: 10.0.0.10
  • NTP-SECONDARY: 10.0.0.11
  • Core-R1: 10.0.1.1 (management)
  • R1-A: 10.0.1.2
  • R3-B: 10.0.1.3
  • R4: 10.0.1.4
  • pyATS controller (automation workstation): 10.0.1.100 (must reach all management IPs via SSH)

Ports & protocols

  • NTP uses UDP/123. Ensure management network and any firewall ACLs allow UDP/123 from devices to NTP servers (and vice versa if needed for symmetric NTP).
  • NTP servers should respond to queries. If your NTP servers are behind firewall rules, ensure permit udp any host <ntp-ip> eq 123 or equivalent on management control plane.

GUI / NMS touchpoints

  • If you use a management console (Cisco Prime/ DNAC / SolarWinds / LibreNMS), you can visually confirm NTP status on monitored devices. Typical GUI verification: Inventory → Device → Details → Time / NTP → show last sync time, server IP, and drift/offset. We’ll include specific GUI steps later in the Validation Flow.

Validation checkpoints (what the automation verifies)

  1. Device reports synchronized state (Clock is synchronized) in show ntp status.
  2. Device has an NTP association with an expected server (look for */selected association in show ntp associations).
  3. Stratum reported on the device is within acceptable bounds compared to the selected server (default: max_stratum_diff = 1).
  4. Offset (measured difference between local clock and server) is within tolerance (default: max_offset_seconds = 0.5 seconds — tune for your environment).
  5. Reachability: server is in the device routing table / pingable.

Workflow Script — Full pyATS script

Save this as ntp_stratum_check.py. It uses pyATS (and Genie parsers when available). It performs the checks above and writes a JSON report.

Important: adapt device names to your testbed, and set EXPECTED_NTP_SERVERS if you want to enforce a specific server list.

#!/usr/bin/env python3
"""
ntp_stratum_check.py
pyATS script to validate NTP stratum consistency across Cisco devices.

Usage:
  python ntp_stratum_check.py testbed.yml
"""

import sys
import re
import json
import time
from collections import defaultdict
from pyats.topology import loader

# === CONFIGURATION ===
TESTBED_FILE = sys.argv[1] if len(sys.argv) > 1 else "testbed.yml"
EXPECTED_NTP_SERVERS = ["10.0.0.10", "10.0.0.11"]   # optional: expected list of servers
MAX_STRATUM_DIFF = 1    # maximum allowed difference between client and its server
MAX_OFFSET_SECONDS = 0.5  # maximum allowed offset (seconds) between client and server
REPORT_FILE = "ntp_stratum_report.json"

# === UTIL HELPERS ===
def extract_stratum_from_status(raw):
    """
    Parse 'show ntp status' output to get stratum and sync status.
    """
    # typical line: "Clock is synchronized, stratum 2, reference is 10.0.0.10"
    m = re.search(r"Clock is (?:synchronized|synchronised),\s*stratum\s*(\d+)", raw, re.I)
    if m:
        return int(m.group(1))
    # some IOS variants: "System clock is synchronized, stratum 2"
    m2 = re.search(r"stratum\s*(\d+)", raw, re.I)
    if m2:
        return int(m2.group(1))
    return None

def is_synchronized(raw):
    return bool(re.search(r"Clock is (?:synchronized|synchronised)", raw, re.I))

def parse_associations(raw):
    """
    Parse 'show ntp associations' to find selected server and offset.
    Returns list of dicts: [{"addr": "10.0.0.10", "selected": True, "offset": 0.012, "reach": 377}, ...]
    """
    lines = raw.splitlines()
    entries = []
    # A simple regex that can match lines like:
    # *~10.0.0.10   127.127.1.1  1  46  64  377  0.020 -0.003  0.483
    # where columns are: flag+addr ref clock st when poll reach delay offset disp
    for ln in lines:
        ln = ln.strip()
        if not ln:
            continue
        # skip header lines
        if ln.lower().startswith("address") or ln.lower().startswith("assoc"):
            continue
        # find an IP in the line
        m = re.search(r"([\*\+\~\-\^]?)(\d+\.\d+\.\d+\.\d+)\s+.*?(\d+\.?\d*)\s+(-?\d+\.?\d*)\s+(\d+)", ln)
        if m:
            flag = m.group(1)
            addr = m.group(2)
            # last groups are approximate; offset often present before reach in some outputs - this is heuristic
            # we'll extract reach from patterns like 'reach 377' or numeric near end
            # For robust parsing use Genie parse; this is fallback.
            offset_match = re.search(r"(-?\d+\.\d+)\s+(-?\d+\.\d+)\s+(\d+)$", ln)
            offset = None
            reach = None
            if offset_match:
                offset = float(offset_match.group(2))
                reach = int(offset_match.group(3))
            entries.append({
                "addr": addr,
                "selected": flag == '*',
                "flag": flag,
                "raw": ln,
                "offset": offset,
                "reach": reach
            })
        else:
            # catch lines with just IP
            m2 = re.search(r"([\*\+\~\-\^]?)(\d+\.\d+\.\d+\.\d+)", ln)
            if m2:
                entries.append({"addr": m2.group(2), "selected": m2.group(1) == '*', "flag": m2.group(1), "raw": ln})
    return entries

# === CONNECT & GATHER ===
def run():
    testbed = loader.load(TESTBED_FILE)
    devices = testbed.devices

    # Connect to devices
    for name, dev in devices.items():
        try:
            print(f"[CONNECT] {name} ...")
            dev.connect(log_stdout=False, learn_hostname=False)
        except Exception as e:
            print(f"[WARN] Could not connect to {name}: {e}")

    report = {"timestamp": time.time(), "devices": {}}

    # Gather NTP info
    for name, dev in devices.items():
        print(f"[GATHER] {name}")
        dev_entry = {"connected": dev.is_connected(), "ntp_status_raw": "", "ntp_assoc_raw": ""}
        try:
            # Prefer Genie parse when available
            try:
                ntp_status_parsed = dev.parse("show ntp status")
                # If parse succeeds, stringify for storage
                dev_entry["ntp_status_parsed"] = ntp_status_parsed
                # Try to pull meaningful fields if present
                # Genie schema can differ; we attempt to extract
                if isinstance(ntp_status_parsed, dict):
                    # naive extraction
                    # structured keys may vary; fallback to raw execute if unknown
                    dev_entry["ntp_status_raw"] = json.dumps(ntp_status_parsed)
            except Exception:
                out = dev.execute("show ntp status")
                dev_entry["ntp_status_raw"] = out

            try:
                ntp_assoc_parsed = dev.parse("show ntp associations")
                dev_entry["ntp_assoc_parsed"] = ntp_assoc_parsed
                dev_entry["ntp_assoc_raw"] = json.dumps(ntp_assoc_parsed)
            except Exception:
                out2 = dev.execute("show ntp associations")
                dev_entry["ntp_assoc_raw"] = out2

            # fallback: ensure we have raw outputs
            raw_status = dev_entry.get("ntp_status_raw", "")
            if not raw_status and "ntp_status_parsed" in dev_entry:
                raw_status = json.dumps(dev_entry["ntp_status_parsed"])
            raw_assoc = dev_entry.get("ntp_assoc_raw", "")
            if not raw_assoc and "ntp_assoc_parsed" in dev_entry:
                raw_assoc = json.dumps(dev_entry["ntp_assoc_parsed"])

            # Extract stratum + sync
            client_stratum = None
            synced = False
            if raw_status:
                client_stratum = extract_stratum_from_status(raw_status)
                synced = is_synchronized(raw_status)

            # parse associations
            assoc_list = parse_associations(raw_assoc) if raw_assoc else []
            selected = next((a for a in assoc_list if a.get("selected")), None)
            # If no selected peer/assoc, we may still look for first reachable assoc
            if not selected and assoc_list:
                selected = assoc_list[0]

            dev_entry.update({
                "stratum": client_stratum,
                "synchronized": synced,
                "associations": assoc_list,
                "selected_assoc": selected
            })

            # If selected assoc is an NTP server and that server is part of testbed, query its stratum
            server_stratum = None
            if selected:
                server_addr = selected.get("addr")
                # If server present in testbed, query it
                for sname, sdev in devices.items():
                    try:
                        # check if mgmt IP matches server addr
                        # NOTE: this is heuristic: check connection ip in sdev.connections
                        conns = getattr(sdev, "connections", {})
                        for _, cinfo in conns.items():
                            if isinstance(cinfo, dict) and cinfo.get("ip") == server_addr:
                                # query server
                                try:
                                    out_s = sdev.execute("show ntp status")
                                    server_stratum = extract_stratum_from_status(out_s)
                                except Exception:
                                    pass
                    except Exception:
                        continue
            dev_entry["server_stratum"] = server_stratum

            # Evaluate checks
            checks = []
            if not dev_entry["connected"]:
                checks.append(("connected", False, "Device not reachable via pyATS/SSH"))
            else:
                checks.append(("connected", True, "SSH connection OK"))
            if not synced:
                checks.append(("synchronized", False, "Device clock not synchronized"))
            else:
                checks.append(("synchronized", True, "Device clock synchronized"))

            # stratum check
            if client_stratum is None:
                checks.append(("stratum", False, "Could not determine client stratum"))
            elif server_stratum is not None:
                diff = abs(client_stratum - server_stratum)
                ok = diff <= MAX_STRATUM_DIFF
                checks.append(("stratum_vs_server", ok, f"client {client_stratum} server {server_stratum} diff {diff}"))
            else:
                # when server stratum unknown, ensure client stratum reasonable
                ok = client_stratum <= 6  # arbitrary default
                checks.append(("stratum_value", ok, f"client {client_stratum} within acceptable absolute bounds"))

            # offset check if available
            offset_ok = True
            if selected and selected.get("offset") is not None:
                try:
                    off = abs(float(selected["offset"]))
                    offset_ok = off <= MAX_OFFSET_SECONDS
                    checks.append(("offset", offset_ok, f"{off}s (<= {MAX_OFFSET_SECONDS}s)"))
                except Exception:
                    checks.append(("offset", False, "Could not parse offset"))
            else:
                checks.append(("offset", None, "No offset available in parsed assoc output"))

            dev_entry["checks"] = checks

        except Exception as e:
            dev_entry["error"] = str(e)

        report["devices"][name] = dev_entry

    # Summarize results
    overall_ok = True
    summary = {}
    for name, info in report["devices"].items():
        device_ok = True
        for chk in info.get("checks", []):
            status = chk[1]
            if status is False:
                device_ok = False
                overall_ok = False
        summary[name] = {"ok": device_ok, "checks": info.get("checks", [])}

    report["summary"] = summary
    report["overall_ok"] = overall_ok

    # write report
    with open(REPORT_FILE, "w") as f:
        json.dump(report, f, indent=2)

    print(f"[REPORT] Written: {REPORT_FILE}")
    print("[RESULT] Overall OK:", overall_ok)
    return 0 if overall_ok else 2

if __name__ == "__main__":
    rc = run()
    sys.exit(rc)

What this script does (short):

  • Loads the testbed.yml.
  • Connects to every device.
  • Gathers show ntp status and show ntp associations (using Genie parsers when available).
  • Extracts client stratum, sync state, selected association, offset and reach.
  • If the selected server is in the same testbed, queries the server to get server stratum.
  • Evaluates checks using MAX_STRATUM_DIFF and MAX_OFFSET_SECONDS.
  • Writes a JSON report (ntp_stratum_report.json) for later ingestion (Grafana, ELK, etc.) and returns exit code 0 on success, non-zero on failure.

Explanation by Line

Let’s dissect the important blocks and explain the why and how so you can extend it to production.

Config block

EXPECTED_NTP_SERVERS = ["10.0.0.10", "10.0.0.11"]
MAX_STRATUM_DIFF = 1
MAX_OFFSET_SECONDS = 0.5

These are your policy parameters. MAX_STRATUM_DIFF=1 means the client can be one stratum away from its server — a reasonable default to allow server chaining (stratum 1 → 2 → 3). MAX_OFFSET_SECONDS=0.5 is strict but safe for infra devices; tune to your environment (some deployments can accept 1–2s or even higher for low-precision devices).

Parsers with Genie fallback

We attempt dev.parse("show ntp status") and dev.parse("show ntp associations"). Genie returns structured dictionaries which are ideal for reliable assertions. However, parser availability varies by platform and version — therefore we fallback to dev.execute(...) and apply regex heuristics (extract_stratum_from_status, parse_associations) to avoid brittle test failures just because parsers are not present.

Why both? Using parsed schemas is the most robust long-term. Fallback to raw text parsing prevents the test from being completely unusable in labs.

Association parsing

We look for the * flag in show ntp associations to determine which server the device uses as its preferred peer. On Cisco devices the * indicates the selected association. Offset and reach are useful to determine data quality (reachability and measured offset).

Server stratum cross-check

If the selected server is part of the testbed, the script queries the server to get the server’s own stratum. That allows a direct stratum comparison (client_stratum − server_stratum), which is the most meaningful check. If server stratum is unknown, the script falls back to absolute stratum bounds.

Checks list & reporting

The checks list stores tuples: (check_name, boolean_or_none, message) — this makes it easy to construct human readable and machine parsable reports (we later write JSON).

Exit codes & CI

The script returns 0 for pass (all checks OK) and 2 for failure. Use this in Jenkins/GitLab CI pipelines and create alerts on nonzero exit codes.

Extensibility notes

  • Replace the regex heuristics with Genie schema fields (if you control platform versions).
  • Add optional SNMP checks to read sysUpTime or .1.3.6.1.4.1.2021 if CLI access is limited.
  • Integrate with Prometheus/Grafana by pushing metrics (stratum, offset) from the JSON report.
  • Add retry/backoff and tolerance for transient conditions (NTP can take a few minutes to settle after restart).

testbed.yml Example

this skeleton and substitute actual management IPs and credentials.

testbed:
  name: ntp_lab
  credentials:
    default:
      username: lab
      password: labpass
  devices:
    ntp-primary:
      os: linux
      type: server
      connections:
        cli:
          protocol: ssh
          ip: 10.0.0.10
    ntp-secondary:
      os: linux
      type: server
      connections:
        cli:
          protocol: ssh
          ip: 10.0.0.11
    core-r1:
      os: ios
      type: router
      connections:
        cli:
          protocol: ssh
          ip: 10.0.1.1
    r1-a:
      os: ios
      type: router
      connections:
        cli:
          protocol: ssh
          ip: 10.0.1.2
    r3-b:
      os: ios
      type: router
      connections:
        cli:
          protocol: ssh
          ip: 10.0.1.3
    r4:
      os: ios
      type: router
      connections:
        cli:
          protocol: ssh
          ip: 10.0.1.4

Notes

  • For IOS XE/NX-OS set os: iosxe or os: nxos accordingly.
  • If device uses key-based SSH, use credentials with key_file entries or set a default credential with the correct method.
  • Put your pyATS controller on the same management network or ensure routing to device management IPs.

Post-validation CLI — real expected output (examples)

Below are realistic sample outputs you will see when NTP is healthy and when it is not. Use these as your “golden” references.

A. show ntp status (healthy)

R1# show ntp status
Clock is synchronized, stratum 2, reference is 10.0.0.10
nominal freq is 250.0000 Hz, actual freq is 249.9998 Hz, precision is 2**(-20)
reference time is DCD0.2BFA.3E53.00000000 (10:30:12.000 UTC Mon Sep  1 2025)
clock offset is 0.0045 sec, root delay is 0.021 sec

Interpretation: Clock synchronized, stratum 2; reference IP 10.0.0.10 (your NTP primary).


B. show ntp associations (healthy)

R1# show ntp associations
     address         ref clock       st  when  poll reach  delay  offset   disp
*~10.0.0.10       127.127.1.0       1    45    64  377   0.020  -0.004  0.123
 +10.0.0.11       127.127.1.0       2    46    64  177   0.040  -0.006  0.234
  • * indicates the system selected the 10.0.0.10 server.
  • offset is small (milliseconds), reach is 377 (good).

C. show ntp status (unsynchronized / problem)

R3# show ntp status
Clock is unsynchronized, stratum 0, no reference clock
no system peer

Interpretation: R3 is not synchronized — likely it cannot reach any configured NTP server or has no configured servers.


D. show run | include ntp (verify configuration)

R1# show running-config | include ntp
ntp server 10.0.0.10
ntp server 10.0.0.11 prefer
ntp authenticate
ntp trusted-key 1

Interpretation: NTP servers are configured; prefer indicates priority. If ntp authenticate is used, ensure keys match on server and clients.


E. Linux chrony chronyc sources (NTP server side)

$ chronyc sources
210 Number of sources = 2
MS Name/IP address         Stratum Poll Reach LastRx Last sample
^* 10.0.0.10                     1   6    377    15    +0us[+0] +/- 1ms
^+ 10.0.0.11                     2   6    377    10    +20us[+20] +/- 2ms

Interpretation: Server sees its own upstream; * means selected.


Validation flow (GUI + CLI)

Here’s a reproducible lab validation flow you can walk students through. Each step includes CLI commands and GUI steps.

Step 0 — Pre-flight checks (controller)

  • Ensure pyATS environment: pip install pyats[full] genie.
  • Confirm SSH connectivity: ssh lab@10.0.1.1 from controller.
  • Confirm NTP servers reachable: ping 10.0.0.10 -c 5.

Step 1 — Confirm config on devices (CLI)

  • show running-config | include ntp — confirm servers defined and auth keys (if used).
  • show ip interface brief — confirm management interfaces are up.

Step 2 — Check sync state (CLI)

  • show ntp status — look for “Clock is synchronized”.
  • show ntp associations — find * (selected) server and offset column.
  • show clock detail or show clock — check local clock vs. expectations.

Step 3 — GUI verification (Cisco Prime / DNAC / SolarWinds)

  • Login to your NMS.
  • Navigate: Inventory → Devices → Select Device → Time / NTP → check last sync timestamp, NTP server(s), and offset.
  • On server dashboard: check server’s NTP sources (chronyc sources or ntpq -p) for upstream status.

Step 4 — Run the pyATS script

  • python ntp_stratum_check.py testbed.yml
  • Review ntp_stratum_report.json.
  • Example artifacts to look for: synchronized: false, stratum_vs_server: False, offset: 2.3s.

Step 5 — Troubleshoot failing devices

  • If synchronized is False:
    1. ping <ntp-server> from device (or use telnet to UDP/123 is not possible; use controller to test routing).
    2. Verify show running-config | include ntp — ensure correct server IP and auth.
    3. Check firewall/ACLs for UDP/123.
    4. Check server logs (/var/log/chrony/ or ntpd logs).
    5. Avoid running debug ntp on production without a maintenance window.

Step 6 — Re-run and confirm remediation

  • After fixes, re-run the pyATS task and confirm overall_ok in JSON report and exit code 0.

FAQs

Q1. Why is NTP stratum consistency important in network environments?
A1. NTP stratum consistency ensures all network devices are synchronized to a reliable and consistent time source. Accurate timekeeping is critical for log correlation, troubleshooting, security protocols (e.g., certificates), and proper functioning of time-sensitive applications.


Q2. How does pyATS automate NTP stratum consistency validation?
A2. pyATS connects to devices using SSH or API, runs commands like show ntp status or show ntp associations, and parses the output to extract stratum numbers. It compares the stratum values against an expected baseline and reports inconsistencies automatically.


Q3. What is a valid stratum value for a network device?
A3.

  • Stratum 1: Directly connected to a reference clock
  • Stratum 2 or 3: Synced to upstream NTP servers
    Typically, in production, devices should show stratum 2 or 3. Values higher than expected or unsynchronized state indicate a configuration or connectivity issue.

Q4. Can pyATS detect unsynchronized NTP status?
A4. Yes. The script will detect cases where the device shows “unsynchronized” in show ntp status or if the stratum is set to 16 (meaning unsynchronized), and will clearly mark the device as failed in the report.


Q5. How are validation results presented in pyATS?
A5. The output is available in structured JSON, HTML, or console format. Each device’s hostname, current stratum, reference clock, and sync status are listed along with pass/fail flags and timestamps for easy audit and troubleshooting.


Q6. Can pyATS validate NTP across multi-vendor environments?
A6. Yes. By using vendor-specific command parsers or generic CLI commands, pyATS supports NTP status validation on Cisco, Arista, Juniper, Fortinet, and other vendors, making the solution vendor-agnostic and suitable for heterogeneous environments.


Q7. How does automating NTP stratum validation improve network reliability?
A7. Automation provides continuous and consistent checks without manual intervention, reducing human error and enabling early detection of time synchronization problems. This leads to better log correlation, improved troubleshooting, and adherence to security standards.


YouTube Link

Watch the Complete Python for Network Engineer: Automate NTP stratum consistency check using pyATS for Cisco [Python for Network Engineer] Lab Demo & Explanation on our channel:

Master Python Network Automation, Ansible, REST API & Cisco DevNet
Master Python Network Automation, Ansible, REST API & Cisco DevNet
Master Python Network Automation, Ansible, REST API & Cisco DevNet
Why Robot Framework for Network Automation?

Join Our Training

If this Article helped you see how much manual NTP checks slow down operations, imagine converting every repetitive verification into a reproducible automation pipeline.

Trainer Sagar Dhawan runs a comprehensive 3-month instructor-led program that teaches network engineers everything from Python basics to production-grade automation:

  • Python for Network Engineer fundamentals (pyATS, Genie, Netmiko)
  • Building validation test-suites (like the NTP stratum check you just saw)
  • Integrating Ansible and APIs for device configuration and telemetry
  • Test-driven networking: CI pipelines, JUnit reporting, and dashboards
  • Real labs: BGP, NTP, multicast, EVPN, security automation
  • Hands-on mentorship + lab files, templates, and playbooks

This course is designed to make your team self-sufficient: you’ll walk away with the exact scripts, templates, and a library of automations you can run in your infra. See the full outline and enroll here: https://course.networkjourney.com/python-ansible-api-cisco-devnet-for-network-engineers/

Enroll Now & Future‑Proof Your Career
Emailinfo@networkjourney.com
WhatsApp / Call: +91 97395 21088