Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 133 additions & 30 deletions scapy/layers/dhcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
- rfc951 - BOOTSTRAP PROTOCOL (BOOTP)
- rfc1542 - Clarifications and Extensions for the Bootstrap Protocol
- rfc1533 - DHCP Options and BOOTP Vendor Extensions
- rfc3396 - Encoding Long Options in DHCPv4

You can disable concatenation on DHCP options holding the same code
while decoding DHCP packets with::

>>> conf.contribs["dhcp"]["rfc3396"] = False

(Defaults to True)
"""

try:
Expand Down Expand Up @@ -69,6 +77,9 @@
)

dhcpmagic = b"c\x82Sc"
if "dhcp" not in conf.contribs:
conf.contribs["dhcp"] = {}
conf.contribs["dhcp"]["rfc3396"] = True


class _BOOTP_chaddr(StrFixedLenField):
Expand Down Expand Up @@ -436,60 +447,145 @@ def i2repr(self, pkt, x):
s.append(sane(v))
return "[%s]" % (" ".join(s))

def getfield(self, pkt, s):
return b"", self.m2i(pkt, s)

def m2i(self, pkt, x):
opt = []
def _extract_raw_entries(self, x):
"""
Extract raw TLV entries from the options buffer without interpreting them.
Returns a list where each entry is either:
- A string ('pad'/'end').
- A tuple (code, raw_bytes).
- Raw bytes for malformed trailing data.
"""
entries = []
while x:
o = orb(x[0])
if o == 255:
opt.append("end")
entries.append("end")
x = x[1:]
continue
if o == 0:
opt.append("pad")
entries.append("pad")
x = x[1:]
continue
if len(x) < 2 or len(x) < orb(x[1]) + 2:
opt.append(x)
entries.append(x)
break
elif o in DHCPOptions:
f = DHCPOptions[o]
olen = orb(x[1])
entries.append((o, x[2:olen + 2]))
x = x[olen + 2:]
return entries

if isinstance(f, str):
olen = orb(x[1])
opt.append((f, x[2:olen + 2]))
x = x[olen + 2:]
def _merge_entries(self, entries):
"""
RFC 3396: merge all entries sharing the same option code.
Preserves order of first appearance.
Pads, ends and malformed entries are kept in place.
"""
merged = {}
order = []
for entry in entries:
if isinstance(entry, tuple) and len(entry) == 2:
code, value = entry
if code in merged:
merged[code] += value
else:
olen = orb(x[1])
lval = [f.name]
merged[code] = bytearray(value)
order.append(('option', code))
else:
order.append(('special', entry))
result = []
for kind, data in order:
if kind == 'special':
result.append(data)
else:
result.append((data, bytes(merged[data])))
return result

if olen == 0:
def _entries_to_raw(self, entries):
"""
Reconstruct raw bytes from a list of extracted entries.
"""
s = b""
for entry in entries:
if isinstance(entry, tuple) and len(entry) == 2:
code, value = entry
s += struct.pack("!BB", code, len(value)) + value
elif entry == "end":
s += b'\xff'
elif entry == "pad":
s += b'\x00'
elif isinstance(entry, bytes):
s += entry
return s

def getfield(self, pkt, s):
return b"", self.m2i(pkt, s)

def m2i(self, pkt, x):
rfc3396 = conf.contribs.get("dhcp", {}).get("rfc3396", True)

entries = self._extract_raw_entries(x)

if rfc3396:
# RFC 3396 section 5: check for option overload
overload = 0
for entry in entries:
if (isinstance(entry, tuple) and len(entry) == 2
and entry[0] == 52 and len(entry[1]) == 1):
overload = orb(entry[1][0])
break

# Build aggregate entries from file/sname if needed
if (overload
and pkt.underlayer is not None
and isinstance(pkt.underlayer, BOOTP)):
if overload in (1, 3):
entries += self._extract_raw_entries(
pkt.underlayer.file
)
if overload in (2, 3):
entries += self._extract_raw_entries(
pkt.underlayer.sname
)

# Merge all entries with same code
entries = self._merge_entries(entries)

# Interpret entries
opt = []
for i, entry in enumerate(entries):
if isinstance(entry, tuple) and len(entry) == 2:
code, raw_value = entry
if code in DHCPOptions:
f = DHCPOptions[code]
if isinstance(f, str):
opt.append((f, raw_value))
continue
lval = [f.name]
if len(raw_value) == 0:
try:
_, val = f.getfield(pkt, b'')
except Exception:
opt.append(x)
opt.append(
self._entries_to_raw(entries[i:])
)
break
else:
lval.append(val)

try:
left = x[2:olen + 2]
left = raw_value
while left:
left, val = f.getfield(pkt, left)
lval.append(val)
except Exception:
opt.append(x)
opt.append(
self._entries_to_raw(entries[i:])
)
break
else:
otuple = tuple(lval)
opt.append(otuple)
x = x[olen + 2:]
opt.append(tuple(lval))
else:
opt.append((code, raw_value))
else:
olen = orb(x[1])
opt.append((o, x[2:olen + 2]))
x = x[olen + 2:]
opt.append(entry)
return opt

def i2m(self, pkt, x):
Expand All @@ -514,8 +610,15 @@ def i2m(self, pkt, x):
warning("Unknown field option %s", name)
continue

s += struct.pack("!BB", onum, len(oval))
s += oval
# RFC 3396: split options longer than 255 bytes
if not oval:
s += struct.pack("!BB", onum, 0)
else:
while oval:
chunk = oval[:255]
oval = oval[255:]
s += struct.pack("!BB", onum, len(chunk))
s += chunk

elif (isinstance(o, str) and o in DHCPRevOptions and
DHCPRevOptions[o][1] is None):
Expand Down
52 changes: 52 additions & 0 deletions test/scapy/layers/dhcp.uts
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,55 @@ assert result in [
'<function scapy.ansmachine.dhcpd(self, pool: Union[scapy.base_classes.Net, List[str]] = Net("192.168.1.128/25"), network: str = \'192.168.1.0/24\', gw: str = \'192.168.1.1\', nameserver: Union[str, List[str]] = None, domain: Union[str, NoneType] = None, renewal_time: int = 60, lease_time: int = 1800, **kwargs)>',
'<function scapy.ansmachine.dhcpd(self, pool=Net("192.168.1.128/25"), network=\'192.168.1.0/24\', gw=\'192.168.1.1\', nameserver=None, domain=None, renewal_time=60, lease_time=1800, **kwargs)>',
]

= RFC 3396 - Encoding long DHCPv4 options (fixes #4642, #4343)
# i2m: option > 255 bytes is split into fragments
# i2m: zero-length option is preserved
# m2i (rfc3396=True): all options with same code are concatenated globally
# m2i (rfc3396=True): concatenation works for unknown option codes
# m2i (rfc3396=False): options with same code are NOT concatenated (legacy)
# roundtrip: long option survives encode/decode with rfc3396=True
# getfield (rfc3396=True): sname/file not aggregated without overload
# getfield (rfc3396=True): overload=1 aggregates file field

import struct

r = raw(DHCP(options=[('captive-portal', 'a'*256), 'end']))
assert r[:2] == b'\x72\xff' and r[2:257] == b'a'*255
assert r[257:260] == b'\x72\x01a' and r[260:261] == b'\xff'

assert raw(DHCP(options=[('rapid_commit', b''), 'end'])) == b'\x50\x00\xff'

old_rfc3396 = conf.contribs.get("dhcp", {}).get("rfc3396", False)
conf.contribs["dhcp"]["rfc3396"] = True

assert DHCP(b'\x06\x02\x01\x02\x06\x02\x03\x04').options == DHCP(b'\x06\x04\x01\x02\x03\x04').options

p = DHCP(b'\x0c\x02sc\x06\x04\x01\x02\x03\x04\x0c\x02py')
assert p.options[0] == ('hostname', b'scpy')
assert p.options[1] == ('name_server', '1.2.3.4')

assert DHCP(b'\xfe\x02AB\xfe\x02CD').options[0] == (254, b'ABCD')

conf.contribs["dhcp"]["rfc3396"] = False

p = DHCP(b'\x0c\x02sc\x06\x04\x01\x02\x03\x04\x0c\x02py')
assert p.options == [('hostname', b'sc'), ('name_server', '1.2.3.4'), ('hostname', b'py')]

conf.contribs["dhcp"]["rfc3396"] = True

pkt2 = DHCP(raw(DHCP(options=[('captive-portal', 'a'*400), 'end'])))
assert pkt2.options[0] == ('captive-portal', b'a'*400) and pkt2.options[-1] == 'end'

bootp_pkt = BOOTP(chaddr="00:01:02:03:04:05", sname=b'myserver'+b'\x00'*56, file=b'bootfile'+b'\x00'*120, options=b'c\x82Sc') / DHCP(options=[('message-type', 'discover'), 'end'])
p = BOOTP(raw(bootp_pkt))
assert p[DHCP].options[0] == ('message-type', 1) and p[BOOTP].sname[:8] == b'myserver'

magic = b'\x63\x82\x53\x63'
opts = b'\x34\x01\x01' + b'\x35\x01\x01' + b'\xff'
file_field = (b'\x0c\x05scapy' + b'\xff' + b'\x00'*120)[:128]
bootp_raw = struct.pack("!4B", 1, 1, 6, 0) + b'\x00'*4 + b'\x00'*4 + b'\x00'*16 + b'\x00'*16 + b'\x00'*64 + file_field + magic + opts
p = BOOTP(bootp_raw)
assert DHCP in p and ('hostname', b'scapy') in p[DHCP].options

conf.contribs["dhcp"]["rfc3396"] = old_rfc3396