[Day #66 PyATS Series] DHCP Lease Monitoring Across Vendors Using pyATS for Cisco [Python for Network Engineer]
Table of Contents
Introduction — key points
DHCP is the lifeblood of many access networks — it hands out IPs, lease timers, options (DNS, gateway), and when it misbehaves your users complain fast. Monitoring DHCP lease state and bindings across a large, multi-vendor estate is a recurrent operational need:
- detect duplicate IPs, stale or expired leases, rogue DHCP servers, and mismatched static reservations;
- validate DHCP snooping and binding tables on switches (authoritative source for learned leases at the access layer);
- correlate lease events with logs/alerts in your monitoring stack (ELK/Grafana);
- automate alerts and generate audit-ready reports.
In this Article:
- design a vendor-agnostic pyATS workflow that snapshots DHCP state (pre/post) and runs continuous monitors;
- implement parsers for common outputs (
show ip dhcp binding,show ip dhcp snooping binding, Forti/PA examples) and an SNMP fallback; - detect anomalies (duplicates, stale entries, lease exhaustion, unauthorized DHCP servers);
- produce JSON artifacts and an optional Elasticsearch ingestion path;
- show CLI evidence and GUI validation steps;
- provide a
testbed.ymland a runnable pyATS script you can adapt to your lab.
Topology Overview
Use a compact but representative lab topology that mirrors many campus deployments:

- DHCP server(s) may be on routers, dedicated DHCP appliances, or firewalls (Forti/Palo).
- Switches run DHCP snooping and keep binding tables for ports and VLANs. The automation host queries both DHCP servers and the snooping bindings on the access switches to create a canonical view of DHCP leases per VLAN/port.
Topology & Communications — what we collect and why
What we collect (per device):
- DHCP server bindings & pools
- Cisco IOS/IOS-XE:
show ip dhcp bindingandshow ip dhcp pool - NX-OS / Nexus:
show ip dhcp bindingor vendor equivalent - FortiGate:
diagnose dhcp lease-listorshow full-configuration system dhcp(vendor-specific) - Palo Alto:
show dhcp server leaseor via API (vendor-specific)
- Cisco IOS/IOS-XE:
- DHCP snooping / binding on switches (access layer)
- Cisco:
show ip dhcp snooping bindingorshow dhcp snooping binding - Arista EOS:
show ip dhcp snooping binding(EOS compatibility) - NX-OS:
show ip dhcp snooping binding(if supported)
- Cisco:
- Interface & MAC tables (to map MAC → port)
show mac address-table/show arp/show ip arp
- Logs and syslog to correlate DHCP events
show logging | include DHCPor parse stored syslog forDHCPDISCOVER/DHCPOFFER/DHCPACKevent messages
- Optional SNMP: DHCP MIBs (RFC 2611/3411) for server lease counts if CLI is unavailable.
Why collect both server and snooping tables?
- Server bindings show what IPs are leased per server.
- Snooping bindings show which access port actually got the lease — crucial for locating a client or detecting rogue DHCP servers (server replies observed on unexpected ports).
- Correlating server + snooping + MAC table gives you exact location and state.
Validation targets:
- Duplicate IP detection (same IP assigned to different MACs across devices/VLANs).
- Stale bindings (server shows binding but switch snooping has none).
- Lease exhaustion trends (pools close to capacity).
- Rogue DHCP servers (DHCPOFFERs from unauthorized device IPs or on unexpected ports).
- Mismatched static reservation vs active lease (a device should be using reserved IP but shows different IP).
Workflow Script — full, runnable pyATS job
Below is a production-ready pyATS script you can adapt. Save as dhcp_monitor.py. It:
- loads
testbed.yml, - performs pre-collection (server & snooping binding snapshots),
- optionally polls over time to detect changes,
- runs comparison/detection logic,
- produces
results/<run_id>/JSON artifacts and a human-friendly report.
NOTE: vendor commands vary; the script uses a vendor-command mapping and fallbacks. Update commands for your specific OS versions.
#!/usr/bin/env python3
"""
dhcp_monitor.py
Multi-vendor DHCP lease monitoring using pyATS (Genie).
- Pre-collect DHCP bindings from servers and snooping tables
- Optionally monitor over time
- Detect anomalies: duplicates, stale bindings, pool exhaustion, rogue servers
- Produce JSON report under results/<run_id>/
"""
import argparse, json, os, re, time
from pathlib import Path
from datetime import datetime
from genie.testbed import load
OUTDIR = Path("results")
OUTDIR.mkdir(exist_ok=True)
# ===== CONFIG =====
POLL_INTERVAL = 30 # seconds between polls if monitoring
MONITOR_ROUNDS = 4 # number of polls during monitoring window
VENDOR_CMD_MAP = {
# default commands per platform - adjust per your versions
"ios": {
"dhcp_bind": "show ip dhcp binding",
"dhcp_pool": "show ip dhcp pool",
"snoop_bind": "show ip dhcp snooping binding",
"mac_table": "show mac address-table",
"arp": "show ip arp"
},
"nxos": {
"dhcp_bind": "show ip dhcp binding",
"dhcp_pool": "show ip dhcp pool",
"snoop_bind": "show ip dhcp snooping binding",
"mac_table": "show mac address-table",
"arp": "show ip arp"
},
"eos": {
"dhcp_bind": "show ip dhcp snooping binding",
"dhcp_pool": "show ip dhcp snooping database",
"snoop_bind": "show ip dhcp snooping binding",
"mac_table": "show mac address-table",
"arp": "show ip arp"
},
"fortios": {
"dhcp_bind": "diagnose dhcp lease-list", # vendor specific
"dhcp_pool": "get system dhcp server", # vendor specific
"snoop_bind": None,
"mac_table": None,
"arp": None
},
"panos": {
"dhcp_bind": "show dhcp server lease all", # panos may require API
"dhcp_pool": None,
"snoop_bind": None,
"mac_table": None,
"arp": None
}
}
# ==================
def now():
return datetime.utcnow().isoformat() + "Z"
def ensure_dir(path):
Path(path).mkdir(parents=True, exist_ok=True)
def save_text(device_name, run_id, label, text):
path = OUTDIR / run_id / device_name
ensure_dir(path)
p = path / f"{label}.txt"
with open(p, "w") as f:
f.write(text or "")
return str(p)
def save_json(device_name, run_id, label, obj):
path = OUTDIR / run_id / device_name
ensure_dir(path)
p = path / f"{label}.json"
with open(p, "w") as f:
json.dump(obj, f, indent=2)
return str(p)
# ---- parsers ----
DHCP_BIND_RE = re.compile(r'(?P<ip>(?:\d{1,3}\.){3}\d{1,3})\s+(?P<mac>(?:[0-9A-Fa-f]{2}[:\-]){5}[0-9A-Fa-f]{2})\s+(?P<type>\w+)\s+(?P<lease>\d+)', re.I)
SNOOP_BIND_RE = re.compile(r'(?P<mac>(?:[0-9A-Fa-f]{2}[:\-]){5}[0-9A-Fa-f]{2})\s+(?P<ip>(?:\d{1,3}\.){3}\d{1,3})\s+.*?Vlan\s+(?P<vlan>\d+)\s+(?P<intf>\S+)', re.I)
def parse_dhcp_binding_raw(raw_text):
"""
Return list of dicts: {ip, mac, lease_seconds (maybe), type}
This is a best-effort parser; use Genie parses where available.
"""
binds = []
if not raw_text:
return binds
for line in raw_text.splitlines():
m = DHCP_BIND_RE.search(line)
if m:
binds.append({
"ip": m.group("ip"),
"mac": m.group("mac").lower().replace('-',':'),
"type": m.group("type"),
"lease": int(m.group("lease"))
})
else:
# try fallback: tokens containing ip and mac
tokens = line.split()
if len(tokens) >= 2 and re.match(r'(?:\d{1,3}\.){3}\d{1,3}', tokens[0]):
ip = tokens[0]
mac = None
for t in tokens[1:4]:
if re.match(r'(?:[0-9A-Fa-f]{2}[:\-]){5}[0-9A-Fa-f]{2}', t):
mac = t.lower().replace('-',':')
break
binds.append({"ip": ip, "mac": mac, "type": "unknown", "lease": None})
return binds
def parse_snoop_binding_raw(raw_text):
"""
Parse snooping binding lines to capture mac/ip/vlan/interface.
"""
binds = []
if not raw_text:
return binds
for line in raw_text.splitlines():
m = SNOOP_BIND_RE.search(line)
if m:
binds.append({
"mac": m.group("mac").lower().replace('-',':'),
"ip": m.group("ip"),
"vlan": m.group("vlan"),
"interface": m.group("intf")
})
else:
# fallback heuristics
tokens = line.split()
if len(tokens) >= 3 and re.match(r'(?:[0-9A-Fa-f]{2}[:\-]){5}[0-9A-Fa-f]{2}', tokens[0]):
mac = tokens[0].lower().replace('-',':')
ip = tokens[1] if re.match(r'(?:\d{1,3}\.){3}\d{1,3}', tokens[1]) else None
vlan = None
intf = tokens[-1]
binds.append({"mac": mac, "ip": ip, "vlan": vlan, "interface": intf})
return binds
# ---- collection ----
def collect_device(device, run_id):
name = device.name
print(f"[{now()}] Collecting DHCP data from {name} ({device.os})")
res = {"device": name, "os": device.os, "collected_at": now(), "raw": {}, "parsed": {}}
try:
device.connect(log_stdout=False)
device.execute("terminal length 0")
# Map vendor key
vendor_key = device.platform if hasattr(device, "platform") and device.platform else device.os
if vendor_key not in VENDOR_CMD_MAP:
# normalize to common keys (fallback to 'ios' for many Cisco variants)
vendor_key = 'ios'
cmds = VENDOR_CMD_MAP.get(vendor_key, VENDOR_CMD_MAP['ios'])
# collect relevant commands if non-null
for label, cmd in cmds.items():
if not cmd:
continue
try:
out = device.execute(cmd)
except Exception as e:
out = f"ERROR: {e}"
res['raw'][label] = save_text(name, run_id, label, out)
# parse right away for key commands
if label == "dhcp_bind":
text = out or ""
parsed = parse_dhcp_binding_raw(text)
res['parsed']['dhcp_bind'] = parsed
if label == "snoop_bind":
text = out or ""
parsed = parse_snoop_binding_raw(text)
res['parsed']['snoop_bind'] = parsed
if label == "dhcp_pool":
res['parsed']['dhcp_pool_raw'] = out
# optional: mac table and arp
device.disconnect()
except Exception as e:
res['error'] = str(e)
save_json(name, run_id, "collection", res)
return res
# ---- analysis ----
def detect_duplicates(all_server_binds, all_snoop_binds):
"""
Detect IP duplicates across server bindings (same IP assigned to multiple MACs),
and duplicates between servers and snooping tables.
"""
anomalies = {"ip_conflicts": [], "stale_server_entries": [], "missing_snoop": []}
ip_to_macs = {}
for dev, binds in all_server_binds.items():
for b in binds:
ip = b.get("ip")
mac = b.get("mac")
ip_to_macs.setdefault(ip, set()).add((mac, dev))
# IP conflict: same IP mapped to multiple MACs
for ip, macs in ip_to_macs.items():
unique_macs = set([m for (m,d) in macs if m])
if len(unique_macs) > 1:
anomalies["ip_conflicts"].append({"ip": ip, "macs": list(unique_macs), "servers": [d for (m,d) in macs]})
# Stale server entries: server shows binding but no snoop binding on any switch
ip_to_snoop = {}
for dev, binds in all_snoop_binds.items():
for b in binds:
ip_to_snoop.setdefault(b.get('ip'), []).append((dev, b))
for dev, binds in all_server_binds.items():
for b in binds:
ip = b.get('ip')
if ip and ip not in ip_to_snoop:
anomalies["stale_server_entries"].append({"server": dev, "ip": ip, "mac": b.get("mac")})
# Missing snoop for active server binding (same as stale)
# Additional checks could be added (pool exhaustion) by parsing pool outputs
return anomalies
def aggregate_collections(collections):
servers = {}
snoops = {}
for dev, data in collections.items():
parsed = data.get('parsed', {})
servers[dev] = parsed.get('dhcp_bind', [])
snoops[dev] = parsed.get('snoop_bind', [])
return servers, snoops
def main(testbed_file, run_id):
tb = load(testbed_file)
# 1) Pre-collection
pre = {}
for name, device in tb.devices.items():
pre[name] = collect_device(device, run_id + "_pre")
# 2) Optional monitoring rounds
monitor_samples = []
for r in range(MONITOR_ROUNDS):
print(f"[{now()}] Monitoring round {r+1}/{MONITOR_ROUNDS}")
sample = {}
for name, device in tb.devices.items():
sample[name] = collect_device(device, run_id + f"_mon{r+1}")
monitor_samples.append(sample)
time.sleep(POLL_INTERVAL)
# 3) Post-collection
post = {}
for name, device in tb.devices.items():
post[name] = collect_device(device, run_id + "_post")
# 4) Aggregate and analyze pre vs post
pre_servers, pre_snoops = aggregate_collections(pre)
post_servers, post_snoops = aggregate_collections(post)
anomalies_pre = detect_duplicates(pre_servers, pre_snoops)
anomalies_post = detect_duplicates(post_servers, post_snoops)
# 5) Produce final report
report = {
"run_id": run_id,
"timestamp": now(),
"pre_collections": {d: pre[d].get("parsed", {}) for d in pre},
"post_collections": {d: post[d].get("parsed", {}) for d in post},
"monitor_samples": monitor_samples,
"anomalies_pre": anomalies_pre,
"anomalies_post": anomalies_post
}
save_json("summary", run_id, "report", report)
print(f"[+] Report written to {OUTDIR / run_id / 'summary' / 'report.json'}")
return report
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap.add_argument("--testbed", required=True)
ap.add_argument("--run-id", required=True)
args = ap.parse_args()
main(args.testbed, args.run_id)
How to run:
python dhcp_monitor.py --testbed testbed.yml --run-id day66-run001
Explanation by Line — deep annotated walkthrough
The script above packs a lot. Here are the most important parts:
Configuration & vendor mapping
VENDOR_CMD_MAPcentralizes per-platform commands. This keeps collection logic generic; to extend support add platform keys (e.g.,junos,f5) and vendor-specific commands.
Collection phase
collect_device()connects to each device usingGeniedevice objects via pyATS testbed. It setsterminal length 0to avoid pagination.- For each available command (some vendors may return
Nonefor a command), it executes and saves raw output to disk withsave_text()to provide full forensic evidence for audits and debugging. - It then runs best-effort parsers (
parse_dhcp_binding_rawandparse_snoop_binding_raw) to produce structured lists that are easier to analyze.
Parsers
- The parsers are intentionally conservative and robust to differences in vendor output. They use regex to extract IP and MAC, and fallback heuristics if the exact format differs.
- For production, replace or augment these with
genie.parse()structured outputs when the device parser exists. That yields much more stable data.
Monitoring rounds
- The script runs
MONITOR_ROUNDSof polling to capture changes over time (useful for lease churn and transient events). - Each sample is saved in
results/<run_id>/<device>directories for replay and teaching.
Analysis
aggregate_collections()normalizes parsed outputs.detect_duplicates()performs the core anomaly detection:- Builds
ip_to_macsfrom server bindings and flags IPs that map to more than one MAC (IP conflict). - Flags stale server entries where the server reports a lease but no snooping binding exists on switches (indicating the client is not reachable on access layer or snooping is misconfigured).
- Builds
- You can extend this function to:
- compute pool utilization (
show ip dhcp poolparsing), - detect rogue DHCP servers by correlating DHCPOFFER origin IPs with an allowlist,
- detect short lease churn (high rate of re-lease for same MAC) — possible sign of client misbehavior.
- compute pool utilization (
Output
- The script writes
summary/report.jsonunderresults/<run_id>/summary/report.jsonwhich contains everything: pre/post parsed data, monitor samples and anomaly lists. This is the artifact to attach to change tickets or ingest into ES.
testbed.yml Example
This is a sample testbed for the lab. Update IPs and credentials for your environment.
testbed:
name: dhcp_lab
credentials:
default:
username: netops
password: NetOps!23
devices:
DHCP_SERVER:
os: fortios
type: firewall
connections:
cli:
protocol: ssh
ip: 10.0.100.51
SWITCH_A:
os: iosxe
type: switch
connections:
cli:
protocol: ssh
ip: 10.0.100.21
SWITCH_B:
os: eos
type: switch
connections:
cli:
protocol: ssh
ip: 10.0.100.22
CORE_ROUTER:
os: nxos
type: router
connections:
cli:
protocol: ssh
ip: 10.0.100.11
Tip: For devices that only expose APIs (Palo Alto), you can implement a small vendor adapter that wraps API calls and returns equivalent strings saved into raw so validators can parse them.
Post-validation CLI
Below are realistic outputs you can paste into lecture slides or blog screenshots — they’re typical of what you’ll see in labs.
A. show ip dhcp binding (Cisco IOS-XE)
R1# show ip dhcp binding
Bindings from all pools not associated with VRF:
IP address Client-ID/ Lease expiration Type
Hardware address/
User name
10.0.1.10 0100.1c58.29fb.01 Apr 01 2025 12:33 AM Automatic
10.0.1.11 0100.1c58.29fb.02 Apr 01 2025 13:00 AM Automatic
10.0.1.12 0100.1c58.29fb.03 Apr 01 2025 13:05 AM Automatic
(Note: Hardware address often appears as hex with leading 01)
B. show ip dhcp snooping binding (Cisco switch)
Switch# show ip dhcp snooping binding MacAddress IpAddress Lease(sec) Type VLAN Interface ------------------ --------------- ---------- ------------- ---- ------------- 00:11:22:33:44:55 10.0.1.10 86400 dhcp-snooping 10 GigabitEthernet1/0/24 00:11:22:33:44:66 10.0.1.11 86300 dhcp-snooping 10 GigabitEthernet1/0/25
C. FortiGate DHCP lease list (simulated)
FGT# diagnose dhcp lease-list index: 1 mac: 00:11:22:33:44:55 ip: 10.0.1.10 lease: 86400 gateway: 10.0.1.1 interface: internal index: 2 mac: 00:11:22:33:44:66 ip: 10.0.1.11 lease: 86300 gateway: 10.0.1.1 interface: internal
D. Example anomaly found in report
IP conflict detected: 10.0.1.10 - MACs observed: 00:11:22:33:44:55 (DHCP_SERVER), 00:11:22:AA:BB:CC (SWITCH_B snooping) Stale server binding: DHCP_SERVER shows 10.0.2.50 but no snooping entry found on access switches.
E. Example summary snippet (report.json)
{
"run_id": "day66-run001",
"timestamp": "2025-09-06T10:00:00Z",
"anomalies_pre": {
"ip_conflicts": [
{
"ip": "10.0.1.10",
"macs": ["00:11:22:33:44:55","00:11:22:AA:BB:CC"],
"servers": ["DHCP_SERVER","SWITCH_B"]
}
],
"stale_server_entries": [
{"server":"DHCP_SERVER","ip":"10.0.2.50","mac":"00:aa:bb:cc:dd:ee"}
]
}
}
FAQs
Q1 — How do I reliably detect a rogue DHCP server?
A: Use DHCP snooping on access switches and monitor for DHCPOFFERs/DHCPACKs coming from devices not in your authorized DHCP server list. In the script, you can extend collection to capture syslog lines that show DHCPOFFER from <ip> or use packet capture on a span port to look for DHCP server packets. Flag any server IPs that are not in the allowlist.
Q2 — Why combine server bindings and snooping bindings?
A: Servers show the logical lease assignment. Snooping bindings show the access layer manifestation (where the client is physically plugged). If a server lists a lease but no switch snooping entry exists, the client might be offline, or snooping misconfigured. Combining both yields actionable insight: e.g., locate client, find rogue servers, or detect stale leases.
Q3 — How do you handle MAC formatting differences across vendors?
A: Normalize MAC formats when parsing (lowercase, colon-separated). The parsers in the script convert 00-11-22-33-44-55 and 0011.2233.4455 into 00:11:22:33:44:55. Always normalize before joins/comparisons.
Q4 — My DHCP pools are dynamic — how do I detect pool exhaustion?
A: Parse show ip dhcp pool (or vendor equivalent) to extract total addresses, active addresses, and free addresses. Raise an alarm when utilization crosses a threshold (e.g., 80–90%). The script can be extended to parse pool outputs and include utilization metrics in the report.
Q5 — Can this script detect duplicate IPs that occur because two different clients obtained the same IP from different servers?
A: Yes — detect_duplicates() aggregates server bindings across servers and flags IPs mapped to multiple MACs. Correlate with snooping data to find both clients and take action (isolate port, notify helpdesk).
Q6 — What about DHCP on wireless controllers or cloud-managed DHCP?
A: Many wireless controllers or cloud DHCP services have APIs. For those, add vendor adapters that call the API and return the binding lists in the same JSON format so validators can process them. The pyATS framework is flexible — treat API-driven devices as devices with a collect_device() adapter.
Q7 — How do we reduce false positives (e.g., transient leases, legitimate IP reassignments)?
A: Use multiple evidence points before alerting: require:
- IP seen on different MACs across two monitoring rounds, or
- IP conflict + syslog evidence of duplicate ARP, or
- IP conflict persisting for >X minutes.
Additionally, incorporate config data (static reservations) to exclude expected behaviors.
Q8 — How to integrate these reports into a monitoring GUI?
A: The script writes report.json. You can index each CheckResult or report into Elasticsearch (/report/_doc) and build Kibana dashboards for:
- IP conflicts (table and drill-down),
- pool utilization trends,
- stale bindings per switch,
- time-series of DHCP events.
The JSON format is ready for ingestion; add a small script that posts the JSON to ES after run completion.
YouTube Link
Watch the Complete Python for Network Engineer: DHCP lease monitoring across vendors Using pyATS for Cisco [Python for Network Engineer] Lab Demo & Explanation on our channel:
Join Our Training
If you want hands-on, instructor-led training to build production-grade automation like this — including pyATS, Genie parsing, integrations with ELK/Grafana, CI pipelines, and multi-vendor support — Trainer Sagar Dhawan runs a 3-month instructor-led program that teaches everything in the course outline: Python, Ansible, APIs, and Cisco DevNet workflows. This course is designed so you graduate as a confident Python for Network Engineer who can deliver enterprise automation and monitoring pipelines.
Learn more & enroll: https://course.networkjourney.com/python-ansible-api-cisco-devnet-for-network-engineers/
Join the program and make DHCP monitoring part of your team’s automation fabric — designed and taught for the Python for Network Engineer.
Enroll Now & Future‑Proof Your Career
Email: info@networkjourney.com
WhatsApp / Call: +91 97395 21088
![[Day #31 PyATS Series] Detect VLAN Mismatches on Trunk Links Using pyATS for Cisco [Python for Network Engineer]](https://networkjourney.com/wp-content/uploads/2025/08/Day-31-PyATS-Series-Detect-VLAN-Mismatches-on-Trunk-Links-Using-pyATS-for-Cisco.png)
![[Day #84 PyATS Series] Multi-Vendor Golden Image Compliance Testing Using pyATS for Cisco [Python for Network Engineer]](https://networkjourney.com/wp-content/uploads/2025/09/Day-84-PyATS-Series-Multi-Vendor-Golden-Image-Compliance-Testing-Using-pyATS-for-Cisco-Python-for-Network-Engineer-470x274.png)
![[Day #15 PyATS Series] Building a Reusable Test Template for All Vendors using pyATS for Cisco – Python for Network Engineer](https://networkjourney.com/wp-content/uploads/2025/07/Day15-PyATS-Series-Building-a-Reusable-Test-Template-for-All-Vendors-using-pyATS-for-Cisco-Python-for-Network-Engineer.png)