Commit 1cda2d9e authored by Thomas Haller's avatar Thomas Haller
Browse files

nmci/ip: rework nmci.ip.link_show_all() to not use JSON and fix binary=True

Interface names might not be valid UTF-8. But JSON strings must be
UTF-8. iproute2 will just return invalid JSON in the case where the
interface name is not a unicode name.

We need nmci.ip to work properly also in case where we have unexpected
interface names. For that reason, there was already a fallback path that
tired to parse the text output, and not use JSON.

If we already have a non-JSON variant that MUST work, there is no point
in keeping the JSON code. Drop it and always do our manual parsing.

This also fixes the "binary=True" argument, which is supposed to return
the interface name as bytes.
parent e5060a53
Pipeline #402867 running with stages
......@@ -7,23 +7,21 @@ from . import util
class _IP:
def _link_show_all_manual_parsing(self, binary):
def link_show_all(self, binary=None):
# binary is:
# False: expect all stings to be UTF-8, the result only contains decoded strings
# True: expect at least some of the names to be binary, all the ifnames are bytes
# None: expect a mix. The ifnames that can be decoded as UTF-8 are returned
# as strings, otherwise as bytes.
as_utf8 = binary is False
try_decode_as_utf8 = binary is not False
jstr = util.process_run(
["ip", "-d", "link", "show"], as_utf8=as_utf8, timeout=2
)
assert binary is None or binary is True or binary is False
out = util.process_run(["ip", "-d", "link", "show"], as_utf8=False, timeout=2)
result = []
lines = jstr.split("\n" if as_utf8 else b"\n")
lines = out.split(b"\n")
i = 0
if lines and not lines[-1]:
......@@ -40,10 +38,7 @@ class _IP:
ip_data = {}
if as_utf8:
r = r"^([0-9]+): *([^:@]+)(@[^:]*)?: <([^>]*)>"
else:
r = rb"^([0-9]+): *([^:@]+)(@[^:]*)?: <([^>]*)>"
r = rb"^([0-9]+): *([^:@]+)(@[^:]*)?: <([^>]*)>"
m = re.match(r, line)
if not m:
......@@ -52,25 +47,23 @@ class _IP:
ip_data["ifindex"] = int(m.group(1))
g = m.group(2)
if not as_utf8 and try_decode_as_utf8:
if binary is not True:
# If requested, we try to parse the binary output as utf-8.
# In this mode, some of the names will be UTF-8, and some binary.
try:
g = g.decode("utf-8")
g = g.decode()
except:
pass
if binary is False:
raise
ip_data["ifname"] = g
g = m.group(4)
if as_utf8:
g = g.split(",")
else:
g = [s.decode("utf-8") for s in g.split(b",")]
g = [s.decode() for s in g.split(b",")]
ip_data["flags"] = g
while i < len(lines):
line = lines[i]
if not re.match(r"^ +" if as_utf8 else rb"^ +", line):
if not re.match(rb"^ +", line):
break
i += 1
......@@ -78,65 +71,7 @@ class _IP:
return result
def link_show_all(self, binary=None):
assert binary is None or binary is True or binary is False
# We require iproute2 to give valid UTF-8. That means, you cannot use this
# function if you have any interfaces with a non-UTF-8 name (like after
# `ip link add $'d\xccf\\c' type dummy`).
#
# And of course, in those cases `iproute2` wouldn't even output valid
# JSON to begin with, because JSON can only be UTF-8 (although `jq` wouldn't
# complain about that).
#
# If you need to support non-UTF-8 names, this function is not for you.
if getattr(self, "_ip_link_no_json", False):
return self._link_show_all_manual_parsing(binary=binary)
argv = ["ip", "-json", "-details", "link", "show"]
proc = subprocess.run(
argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=2
)
if proc.stderr:
if proc.returncode == 255 and re.match(
r'Option "-json" is unknown, try "ip -help"\.',
proc.stderr.decode("utf-8", "replace"),
):
self._ip_link_no_json = True
return self._link_show_all_manual_parsing(binary=binary)
# if anything was printed to stderr, we consider that
# a fail.
raise Exception(
"`%s` printed something on stderr: %s"
% (" ".join(argv), proc.stderr.decode("utf-8", "replace"))
)
if proc.returncode != 0:
raise Exception(
"`%s` returned exit code %s" % (" ".join(argv), proc.returncode)
)
try:
jstr = proc.stdout.decode("utf-8", errors="strict")
except UnicodeDecodeError:
if binary is False:
raise
jstr = None
if jstr is None:
return self._link_show_all_manual_parsing(binary=binary)
import json
return json.loads(jstr)
def _link_show(self, ifindex=None, ifname=None, flags=None):
def _link_show(self, ifindex=None, ifname=None, flags=None, binary=None):
if ifindex is None and ifname is None:
raise ValueError("Missing argument, either ifindex or ifname must be given")
......@@ -144,20 +79,18 @@ class _IP:
if ifname is None:
ifname_b = None
elif isinstance(ifname, str):
ifname_b = ifname.encode("utf-8")
ifname_b = ifname.encode()
else:
ifname_b = ifname
result = []
for data in self.link_show_all():
for data in self.link_show_all(binary=True):
ii = data["ifindex"]
if ifindex is not None and int(ifindex) != ii:
continue
if ifname_b is not None:
ii = data["ifname"]
if isinstance(ii, str):
ii = ii.encode("utf-8")
if ifname_b != ii:
continue
if flags is not None:
......@@ -183,7 +116,19 @@ class _IP:
raise KeyError("Could not find interface with " + s)
raise KeyError("Could not find unique interface with " + s)
return result[0]
data = result[0]
if binary is not True:
name = data["ifname"]
try:
name = name.decode()
except:
if binary is False:
raise
else:
data["ifname"] = name
return data
def link_show(self, timeout=None, **kwargs):
......
......@@ -562,10 +562,27 @@ def test_black_code_fromatting():
def test_ip_link_show_all():
l0 = ip._link_show_all_manual_parsing(binary=None)
l1 = ip.link_show_all()
l0 = ip.link_show_all(binary=None)
def _normalize(i):
return (i["ifindex"], i["ifname"])
return (i["ifindex"], util.str_to_bytes(i["ifname"]), i["flags"])
assert [_normalize(i) for i in l0] == [_normalize(i) for i in l1]
assert [_normalize(i) for i in l0] == [_normalize(i) for i in ip.link_show_all()]
assert [_normalize(i) for i in l0] == [
_normalize(i) for i in ip.link_show_all(binary=None)
]
assert [_normalize(i) for i in l0] == [
_normalize(i) for i in ip.link_show_all(binary=True)
]
assert [_normalize(i) for i in l0] == [
_normalize(i) for i in ip.link_show_all(binary=False)
]
l = ip.link_show(ifname="lo", binary=None)
assert l["ifname"] == "lo"
l = ip.link_show(ifname="lo", binary=False)
assert l["ifname"] == "lo"
l = ip.link_show(ifname="lo", binary=True)
assert l["ifname"] == b"lo"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment