|
3 | 3 | import json
|
4 | 4 | import re
|
5 | 5 | import copy
|
| 6 | +import datetime |
6 | 7 |
|
7 | 8 | from natsort import natsorted
|
8 | 9 | import jmespath
|
@@ -432,6 +433,13 @@ def get_mac_table(self, network_instance: Optional[str] = "*") -> Dict[str, Any]
|
432 | 433 | Dest:destination, Type:type}}',
|
433 | 434 | "datatype": "state",
|
434 | 435 | }
|
| 436 | + if ( |
| 437 | + not "bridged" |
| 438 | + in self.get(paths=["/system/features"], datatype="state")[0][ |
| 439 | + "system/features" |
| 440 | + ] |
| 441 | + ): |
| 442 | + return {"mac_table": []} |
435 | 443 | resp = self.get(
|
436 | 444 | paths=[path_spec.get("path", "")], datatype=path_spec["datatype"]
|
437 | 445 | )
|
@@ -653,6 +661,64 @@ def set_es_peers(resp):
|
653 | 661 | res = jmespath.search(path_spec["jmespath"], resp[0])
|
654 | 662 | return {"es": res}
|
655 | 663 |
|
| 664 | + def get_arp(self) -> Dict[str, Any]: |
| 665 | + path_spec = { |
| 666 | + "path": f"/interface[name=*]/subinterface[index=*]/ipv4/arp/neighbor", |
| 667 | + "jmespath": '"interface"[*].subinterface[].{interface:"_subitf", entries:ipv4.arp.neighbor[].{IPv4:"ipv4-address",MAC:"link-layer-address",Type:origin,expiry:"_rel_expiry" }}', |
| 668 | + "datatype": "state", |
| 669 | + } |
| 670 | + resp = self.get( |
| 671 | + paths=[path_spec.get("path", "")], datatype=path_spec["datatype"] |
| 672 | + ) |
| 673 | + for itf in resp[0].get("interface", []): |
| 674 | + for subitf in itf.get("subinterface", []): |
| 675 | + subitf["_subitf"] = f"{itf['name']}.{subitf['index']}" |
| 676 | + for arp_entry in ( |
| 677 | + subitf.get("ipv4", {}).get("arp", {}).get("neighbor", []) |
| 678 | + ): |
| 679 | + try: |
| 680 | + ts = datetime.datetime.strptime( |
| 681 | + arp_entry["expiration-time"], "%Y-%m-%dT%H:%M:%S.%fZ" |
| 682 | + ) |
| 683 | + arp_entry["_rel_expiry"] = ( |
| 684 | + str(ts - datetime.datetime.now()).split(".")[0] + "s" |
| 685 | + ) |
| 686 | + except: |
| 687 | + arp_entry["_rel_expiry"] = "-" |
| 688 | + |
| 689 | + res = jmespath.search(path_spec["jmespath"], resp[0]) |
| 690 | + return {"arp": res} |
| 691 | + |
| 692 | + def get_nd(self) -> Dict[str, Any]: |
| 693 | + path_spec = { |
| 694 | + "path": f"/interface[name=*]/subinterface[index=*]/ipv6/neighbor-discovery/neighbor", |
| 695 | + "jmespath": '"interface"[*].subinterface[].{interface:"_subitf", entries:ipv6."neighbor-discovery".neighbor[].{IPv6:"ipv6-address",MAC:"link-layer-address",Type:origin,next_state:"_rel_expiry" }}', |
| 696 | + "datatype": "state", |
| 697 | + } |
| 698 | + resp = self.get( |
| 699 | + paths=[path_spec.get("path", "")], datatype=path_spec["datatype"] |
| 700 | + ) |
| 701 | + for itf in resp[0].get("interface", []): |
| 702 | + for subitf in itf.get("subinterface", []): |
| 703 | + subitf["_subitf"] = f"{itf['name']}.{subitf['index']}" |
| 704 | + for nd_entry in ( |
| 705 | + subitf.get("ipv6", {}) |
| 706 | + .get("neighbor-discovery", {}) |
| 707 | + .get("neighbor", []) |
| 708 | + ): |
| 709 | + try: |
| 710 | + ts = datetime.datetime.strptime( |
| 711 | + nd_entry["next-state-time"], "%Y-%m-%dT%H:%M:%S.%fZ" |
| 712 | + ) |
| 713 | + nd_entry["_rel_expiry"] = ( |
| 714 | + str(ts - datetime.datetime.now()).split(".")[0] + "s" |
| 715 | + ) |
| 716 | + except: |
| 717 | + nd_entry["_rel_expiry"] = "-" |
| 718 | + |
| 719 | + res = jmespath.search(path_spec["jmespath"], resp[0]) |
| 720 | + return {"nd": res} |
| 721 | + |
656 | 722 | def get(
|
657 | 723 | self,
|
658 | 724 | paths: List[str],
|
|
0 commit comments