Anonymize pcap files #
Functionality #
This program is designed for the console, but also has a reduced UI. To use the console, the
--ignore-gooey
option must be included in the call.
The program can make adjustments in a pcap file:
- replace part of IP addresses
- replace parts of MAC addresses
- remove packets in a date range
- replace part of an email address with another one
In the console application it is also possible to do these replacements multiple times (for example replace 2 email addresses).
The UI looks something like this:
Installation #
The following packages are needed:
- gooey
- scapy
- python_dateutil
The dependencies can be installed quickly with the requirements.txt via pip. First the
code should be downloaded. Afterwards a pip install -r
requirements.txt
in the directory is sufficient.
Code #
main.py #
import argparse
from dateutil.parser import parse
from gooey import Gooey
import lib as anon
@Gooey(program_name="PCAP anonymizer", default_size=(800, 600))
def main():
parser = argparse.ArgumentParser()
parser.add_argument("input_path",
type=str,
help="Path to input pcap file")
parser.add_argument("output_path",
type=str,
help="Output pcap file path")
parser.add_argument("--replace-ip-part",
nargs=2,
default=["192.168", "10.0.0"],
type=str,
help="Replace ip prefix with a new one",
action='append')
parser.add_argument("--replace-email-part",
nargs=2,
default=["old@example.com", "new@example.com"],
help="Replace in SMTP messages found email address with a new one",
action='append')
parser.add_argument("--remove-date-range",
nargs=2,
default=["2021-06-30", "2021-07-30"],
type=lambda s: parse(s),
help="Remove packets with start-date <= local_time <= end-date")
parser.add_argument("--replace-mac-part",
nargs=2,
default=["11:22:33", "11:11:11"],
type=str,
help="Replace mac prefix with a new one")
args = parser.parse_args()
anon.processPcap(args)
if __name__ == '__main__':
main()
tests.py #
import datetime
import unittest
from dateutil.parser import parse
from scapy.layers.dns import DNS, DNSQR
from scapy.layers.inet import IP, TCP, UDP, Ether
import lib
class TestActions(unittest.TestCase):
def test_inRemovalRange(self):
pkt1 = Ether(dst="11:22:33:44:55:66", src="12:23:34:45:56:67") / IP(dst="1.1.1.1", src="8.8.8.8") / TCP(dport=53)
pkt1.time = datetime.datetime(2021, 6, 1, 1, 0, 0).timestamp()
pkt2 = Ether(dst="12:23:34:45:56:67", src="11:22:33:44:55:66") / IP(dst="8.8.8.8", src="1.1.1.1") / UDP(dport=53) / DNS(rd=1, qd=DNSQR(qname="example.com"))
pkt2.time = datetime.datetime(2021, 6, 1, 1, 1, 0).timestamp()
pkt3 = Ether(dst="11:22:33:44:55:66", src="12:23:34:45:56:67") / IP(dst="1.1.1.1", src="8.8.8.8") / TCP(dport=25) / "This is not a valid SMTP Packet but contains an email private@example.com"
pkt3.time = datetime.datetime(2021, 6, 1, 1, 2, 0).timestamp()
pkt4 = Ether(dst="11:22:33:44:55:66", src="12:23:34:45:56:67") / IP(dst="1.1.1.1", src="8.8.8.8") / TCP(dport=25) / "This is not a valid SMTP Packet but contains an email public@example.com"
pkt4.time = datetime.datetime(2021, 6, 1, 1, 3, 0).timestamp()
daterange = [parse('2021-06-01 01:01:00.0'), parse('2021-06-01 01:02:00.0')]
self.assertFalse(lib.inRemovalRange(pkt1, daterange))
self.assertTrue(lib.inRemovalRange(pkt2, daterange))
self.assertTrue(lib.inRemovalRange(pkt3, daterange))
self.assertFalse(lib.inRemovalRange(pkt4, daterange))
def test_replaceIp(self):
pkt1 = Ether(dst="11:22:33:44:55:66", src="12:23:34:45:56:67") / IP(dst="1.1.1.1", src="8.8.8.8") / TCP(dport=53)
pkt2 = Ether(dst="12:23:34:45:56:67", src="11:22:33:44:55:66") / IP(dst="8.8.8.8", src="1.1.1.1") / UDP(dport=53) / DNS(rd=1, qd=DNSQR(qname="example.com"))
ips = [["1.1.1.1", "8.8.4.4"]]
lib.replaceIp(pkt1, ips)
self.assertEqual(pkt1[IP].src, "8.8.8.8")
self.assertEqual(pkt1[IP].dst, "8.8.4.4")
lib.replaceIp(pkt2, ips)
self.assertEqual(pkt2[IP].src, "8.8.4.4")
self.assertEqual(pkt2[IP].dst, "8.8.8.8")
def test_replaceMac(self):
pkt1 = Ether(dst="11:22:33:44:55:66", src="12:23:34:45:56:67") / IP(dst="1.1.1.1", src="8.8.8.8") / TCP(dport=53)
pkt2 = Ether(dst="12:23:34:45:56:67", src="11:22:33:44:55:66") / IP(dst="8.8.8.8", src="1.1.1.1") / UDP(dport=53) / DNS(rd=1, qd=DNSQR(qname="example.com"))
macs = [["11:22:33:44:55:66", "11:11:11:11:11:11"]]
lib.replaceMac(pkt1, macs)
self.assertEqual(pkt1[Ether].dst, "11:11:11:11:11:11")
self.assertEqual(pkt1[Ether].src, "12:23:34:45:56:67")
lib.replaceMac(pkt2, macs)
self.assertEqual(pkt2[Ether].dst, "12:23:34:45:56:67")
self.assertEqual(pkt2[Ether].src, "11:11:11:11:11:11")
def test_replaceEmailInSMTPPacket(self):
pkt1 = Ether(dst="11:22:33:44:55:66", src="12:23:34:45:56:67") / IP(dst="1.1.1.1", src="8.8.8.8") / TCP(dport=25) / "This is not a valid SMTP Packet but contains an email private@example.com"
pkt2 = Ether(dst="11:22:33:44:55:66", src="12:23:34:45:56:67") / IP(dst="1.1.1.1", src="8.8.8.8") / TCP(dport=25) / "This is not a valid SMTP Packet but contains an email public@example.com"
emails = [["private@example.com", "ano@example.com"]]
lib.replaceEmailInSMTPPacket(pkt1, emails)
self.assertEqual(pkt1[TCP].load.decode('utf-8'),
"This is not a valid SMTP Packet but contains an email ano@example.com")
lib.replaceEmailInSMTPPacket(pkt2, emails)
self.assertEqual(pkt2[TCP].load.decode('utf-8'),
"This is not a valid SMTP Packet but contains an email public@example.com")
if __name__ == "__main__":
unittest.main()
lib.py #
from scapy.layers.inet import IP, TCP, Ether
from scapy.packet import Raw
from scapy.utils import wrpcap
from scapy.utils import rdpcap
from datetime import datetime
def inRemovalRange(packet, daterange):
if daterange is None:
return False
local_time = datetime.fromtimestamp(int(packet.time))
result = daterange[0] <= local_time <= daterange[1]
return result
def replaceIp(packet, ips):
if ips is None or ips[0] is None:
return
if not packet.haslayer(IP):
return
ip_src = packet[IP].src
ip_dst = packet[IP].dst
for ip_part in ips:
if ip_src.startswith(ip_part[0]):
new_src_ip = ip_part[1] + ip_src.removeprefix(ip_part[0])
packet[IP].src = new_src_ip
if ip_dst.startswith(ip_part[0]):
new_dst_ip = ip_part[1] + ip_dst.removeprefix(ip_part[0])
packet[IP].dst = new_dst_ip
def replaceMac(packet, macs):
if macs is None or macs[0] is None:
return
if not packet.haslayer(Ether):
return
mac_src = packet[Ether].src
mac_dst = packet[Ether].dst
for mac in macs:
if mac_src.startswith(mac[0]):
new_src_ip = mac[1] + mac_src.removeprefix(mac[0])
packet[Ether].src = new_src_ip
if mac_dst.startswith(mac[0]):
new_dst_ip = mac[1] + mac_dst.removeprefix(mac[0])
packet[Ether].dst = new_dst_ip
def replaceEmailInSMTPPacket(packet, emailadresses):
if emailadresses is None:
return
if not packet.haslayer(TCP):
return
if not packet.haslayer(Raw):
return
if packet[TCP].dport == 25 or packet[TCP].dport == 465 or packet[TCP].dport == 587 \
or packet[TCP].dport == 2525:
tcp_load = packet[TCP].load.decode('utf-8')
for email_part in emailadresses:
new_tcp_load = tcp_load.replace(email_part[0], email_part[1])
packet[Raw].load = new_tcp_load
def processPcap(args):
packets = list(filter(lambda packet: (not inRemovalRange(packet, args.remove_date_range)), rdpcap(args.input_path)))
for packet in packets:
replaceIp(packet, args.replace_ip_part)
replaceEmailInSMTPPacket(packet, args.replace_email_part)
replaceMac(packet, args.replace_mac_part)
wrpcap(args.output_path, packets)