Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 56 additions & 15 deletions scapy/layers/ipsec.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,15 +420,33 @@ def encrypt(self, sa, esp, key, icv_size=None, esn_en=False, esn=0):
aad = struct.pack('!LL', esp.spi, esp.seq)
if self.ciphers_aead_api:
# New API
if self.cipher == aead.AESCCM:
# For ciphers that don't support custom tag_length (e.g., AES-GCM),
# use the lower-level API when icv_size differs from default
use_lower_level_api = (self.cipher == aead.AESGCM and
icv_size != self.icv_size)

if use_lower_level_api:
# Use lower-level API for truncated ICV support
# For AES-GCM, we need to use algorithms.AES with modes.GCM
cipher = Cipher(
algorithms.AES(key),
modes.GCM(mode_iv),
default_backend(),
)
encryptor = cipher.encryptor()
encryptor.authenticate_additional_data(aad)
data = encryptor.update(data) + encryptor.finalize()
data += encryptor.tag[:icv_size]
elif self.cipher == aead.AESCCM:
cipher = self.cipher(key, tag_length=icv_size)
data = cipher.encrypt(mode_iv, data, aad)
else:
cipher = self.cipher(key)
if self.name == 'AES-NULL-GMAC':
# Special case for GMAC (rfc 4543 sect 3)
data = data + cipher.encrypt(mode_iv, b"", aad + esp.iv + data)
else:
data = cipher.encrypt(mode_iv, data, aad)
if self.name == 'AES-NULL-GMAC':
# Special case for GMAC (rfc 4543 sect 3)
data = data + cipher.encrypt(mode_iv, b"", aad + esp.iv + data)
else:
data = cipher.encrypt(mode_iv, data, aad)
else:
cipher = self.new_cipher(key, mode_iv)
encryptor = cipher.encryptor()
Expand Down Expand Up @@ -475,18 +493,41 @@ def decrypt(self, sa, esp, key, icv_size=None, esn_en=False, esn=0):
aad = struct.pack('!LL', esp.spi, esp.seq)
if self.ciphers_aead_api:
# New API
if self.cipher == aead.AESCCM:
# For ciphers that don't support custom tag_length (e.g., AES-GCM),
# use the lower-level API when icv_size differs from default
use_lower_level_api = (self.cipher == aead.AESGCM and
icv_size != self.icv_size)

if use_lower_level_api:
# Use lower-level API for truncated ICV support
# For AES-GCM, we need to use algorithms.AES with modes.GCM
cipher = Cipher(
algorithms.AES(key),
modes.GCM(mode_iv, icv, len(icv)),
default_backend(),
)
decryptor = cipher.decryptor()
decryptor.authenticate_additional_data(aad)
try:
data = decryptor.update(data) + decryptor.finalize()
except InvalidTag as err:
raise IPSecIntegrityError(err)
elif self.cipher == aead.AESCCM:
cipher = self.cipher(key, tag_length=icv_size)
try:
data = cipher.decrypt(mode_iv, data + icv, aad)
except InvalidTag as err:
raise IPSecIntegrityError(err)
else:
cipher = self.cipher(key)
try:
if self.name == 'AES-NULL-GMAC':
# Special case for GMAC (rfc 4543 sect 3)
data = data + cipher.decrypt(mode_iv, icv, aad + iv + data)
else:
data = cipher.decrypt(mode_iv, data + icv, aad)
except InvalidTag as err:
raise IPSecIntegrityError(err)
try:
if self.name == 'AES-NULL-GMAC':
# Special case for GMAC (rfc 4543 sect 3)
data = data + cipher.decrypt(mode_iv, icv, aad + iv + data)
else:
data = cipher.decrypt(mode_iv, data + icv, aad)
except InvalidTag as err:
raise IPSecIntegrityError(err)
else:
cipher = self.new_cipher(key, mode_iv, icv)
decryptor = cipher.decryptor()
Expand Down
43 changes: 43 additions & 0 deletions test/scapy/layers/ipsec.uts
Original file line number Diff line number Diff line change
Expand Up @@ -1885,6 +1885,49 @@ try:
except IPSecIntegrityError as err:
err

#######################################
= IPv4 / ESP - Transport - AES-GCM - Truncated ICV (12 bytes)
~ crypto_advanced

p = IP(src='1.1.1.1', dst='2.2.2.2')
p /= TCP(sport=45012, dport=80)
p /= Raw('testdata')
p = IP(raw(p))
p

sa = SecurityAssociation(ESP, spi=0x222,
crypt_algo='AES-GCM', crypt_key=b'16bytekey+4bytenonce',
crypt_icv_size=12,
auth_algo='NULL', auth_key=None)

e = sa.encrypt(p)
e

assert isinstance(e, IP)
assert e.src == '1.1.1.1' and e.dst == '2.2.2.2'
assert e.chksum != p.chksum
assert e.proto == socket.IPPROTO_ESP
assert e.haslayer(ESP)
assert not e.haslayer(TCP)
assert e[ESP].spi == sa.spi
* after encryption the original packet payload should NOT be readable
assert b'testdata' not in e[ESP].data

d = sa.decrypt(e)
d

* after decryption original packet should be preserved
assert d[TCP] == p[TCP]

* test with altered packet - integrity verification should fail
e2 = sa.encrypt(p)
e2[ESP].data = e2[ESP].data[:-1] + b'\xff'
try:
d = sa.decrypt(e2)
assert False, "Integrity check should have failed"
except IPSecIntegrityError as err:
err

#######################################
= IPv4 / ESP - Transport - AES-CCM - NULL
~ crypto_advanced
Expand Down
Loading