pcap Dateien anonymisieren

pcap Dateien anonymisieren #

Funktionalität #

Dieses Programm ist für die Konsole ausgelegt, besitzt jedoch auch ein reduziertes UI. Um die Konsole zu verwenden, muss beim Aufruf die Option --ignore-gooey mitgegeben werden.

Das Programm kann in einer pcap Datei Anpassungen durführen:

  • Teil von IP Adressen ersetzen
  • Teile von MAC Adressen ersetzen
  • Packete in einer Date Range entfernen
  • Teile einer Email Adresse durch eine andere ersetzen

In der Konsolen Applikation ist es zudem möglich, diese Ersetzungen mehrfach durchzuführen (beispielsweise 2 Email Adressen zu ersetzen).

Das UI sieht in etwa so aus:

Installation #

Es werden die folgenden Packages benötigt:

  • Gooey
  • scapy
  • python_dateutil

Die Dependencies lassen sich mit der requirements.txt via pip schnell installieren. Dazu soll zuerst der Code heruntergeladen werden. Im Anschluss genügt dann ein pip install -r requirements.txt im Verzeichnis.

Code #

main.py #

import argparse
from dateutil.parser import parse
from gooey import Gooey
import lib as anon


@Gooey(program_name="PCAP anonymizer", default_size=(800, 600))
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("input_path",
                        type=str,
                        help="Path to input pcap file")
    parser.add_argument("output_path",
                        type=str,
                        help="Output pcap file path")
    parser.add_argument("--replace-ip-part",
                        nargs=2,
                        default=["192.168", "10.0.0"],
                        type=str,
                        help="Replace ip prefix with a new one",
                        action='append')
    parser.add_argument("--replace-email-part",
                        nargs=2,
                        default=["old@example.com", "new@example.com"],
                        help="Replace in SMTP messages found email address with a new one",
                        action='append')
    parser.add_argument("--remove-date-range",
                        nargs=2,
                        default=["2021-06-30", "2021-07-30"],
                        type=lambda s: parse(s),
                        help="Remove packets with start-date <= local_time <= end-date")
    parser.add_argument("--replace-mac-part",
                        nargs=2,
                        default=["11:22:33", "11:11:11"],
                        type=str,
                        help="Replace mac prefix with a new one")

    args = parser.parse_args()
    anon.processPcap(args)


if __name__ == '__main__':
    main()

tests.py #

import datetime
import unittest

from dateutil.parser import parse
from scapy.layers.dns import DNS, DNSQR
from scapy.layers.inet import IP, TCP, UDP, Ether

import lib

class TestActions(unittest.TestCase):

    def test_inRemovalRange(self):
        pkt1 = Ether(dst="11:22:33:44:55:66", src="12:23:34:45:56:67") / IP(dst="1.1.1.1", src="8.8.8.8") / TCP(dport=53)
        pkt1.time = datetime.datetime(2021, 6, 1, 1, 0, 0).timestamp()
        pkt2 = Ether(dst="12:23:34:45:56:67", src="11:22:33:44:55:66") / IP(dst="8.8.8.8", src="1.1.1.1") / UDP(dport=53) / DNS(rd=1, qd=DNSQR(qname="example.com"))
        pkt2.time = datetime.datetime(2021, 6, 1, 1, 1, 0).timestamp()
        pkt3 = Ether(dst="11:22:33:44:55:66", src="12:23:34:45:56:67") / IP(dst="1.1.1.1", src="8.8.8.8") / TCP(dport=25) / "This is not a valid SMTP Packet but contains an email private@example.com"
        pkt3.time = datetime.datetime(2021, 6, 1, 1, 2, 0).timestamp()
        pkt4 = Ether(dst="11:22:33:44:55:66", src="12:23:34:45:56:67") / IP(dst="1.1.1.1", src="8.8.8.8") / TCP(dport=25) / "This is not a valid SMTP Packet but contains an email public@example.com"
        pkt4.time = datetime.datetime(2021, 6, 1, 1, 3, 0).timestamp()

        daterange = [parse('2021-06-01 01:01:00.0'), parse('2021-06-01 01:02:00.0')]

        self.assertFalse(lib.inRemovalRange(pkt1, daterange))
        self.assertTrue(lib.inRemovalRange(pkt2, daterange))
        self.assertTrue(lib.inRemovalRange(pkt3, daterange))
        self.assertFalse(lib.inRemovalRange(pkt4, daterange))

    def test_replaceIp(self):
        pkt1 = Ether(dst="11:22:33:44:55:66", src="12:23:34:45:56:67") / IP(dst="1.1.1.1", src="8.8.8.8") / TCP(dport=53)
        pkt2 = Ether(dst="12:23:34:45:56:67", src="11:22:33:44:55:66") / IP(dst="8.8.8.8", src="1.1.1.1") / UDP(dport=53) / DNS(rd=1, qd=DNSQR(qname="example.com"))

        ips = [["1.1.1.1", "8.8.4.4"]]

        lib.replaceIp(pkt1, ips)
        self.assertEqual(pkt1[IP].src, "8.8.8.8")
        self.assertEqual(pkt1[IP].dst, "8.8.4.4")

        lib.replaceIp(pkt2, ips)
        self.assertEqual(pkt2[IP].src, "8.8.4.4")
        self.assertEqual(pkt2[IP].dst, "8.8.8.8")

    def test_replaceMac(self):
        pkt1 = Ether(dst="11:22:33:44:55:66", src="12:23:34:45:56:67") / IP(dst="1.1.1.1", src="8.8.8.8") / TCP(dport=53)
        pkt2 = Ether(dst="12:23:34:45:56:67", src="11:22:33:44:55:66") / IP(dst="8.8.8.8", src="1.1.1.1") / UDP(dport=53) / DNS(rd=1, qd=DNSQR(qname="example.com"))

        macs = [["11:22:33:44:55:66", "11:11:11:11:11:11"]]

        lib.replaceMac(pkt1, macs)

        self.assertEqual(pkt1[Ether].dst, "11:11:11:11:11:11")
        self.assertEqual(pkt1[Ether].src, "12:23:34:45:56:67")

        lib.replaceMac(pkt2, macs)
        self.assertEqual(pkt2[Ether].dst, "12:23:34:45:56:67")
        self.assertEqual(pkt2[Ether].src, "11:11:11:11:11:11")

    def test_replaceEmailInSMTPPacket(self):
        pkt1 = Ether(dst="11:22:33:44:55:66", src="12:23:34:45:56:67") / IP(dst="1.1.1.1", src="8.8.8.8") / TCP(dport=25) / "This is not a valid SMTP Packet but contains an email private@example.com"
        pkt2 = Ether(dst="11:22:33:44:55:66", src="12:23:34:45:56:67") / IP(dst="1.1.1.1", src="8.8.8.8") / TCP(dport=25) / "This is not a valid SMTP Packet but contains an email public@example.com"

        emails = [["private@example.com", "ano@example.com"]]

        lib.replaceEmailInSMTPPacket(pkt1, emails)

        self.assertEqual(pkt1[TCP].load.decode('utf-8'),
                         "This is not a valid SMTP Packet but contains an email ano@example.com")

        lib.replaceEmailInSMTPPacket(pkt2, emails)
        self.assertEqual(pkt2[TCP].load.decode('utf-8'),
                         "This is not a valid SMTP Packet but contains an email public@example.com")


if __name__ == "__main__":
    unittest.main()

lib.py #

from scapy.layers.inet import IP, TCP, Ether
from scapy.packet import Raw
from scapy.utils import wrpcap
from scapy.utils import rdpcap
from datetime import datetime


def inRemovalRange(packet, daterange):
    if daterange is None:
        return False

    local_time = datetime.fromtimestamp(int(packet.time))

    result = daterange[0] <= local_time <= daterange[1]
    return result


def replaceIp(packet, ips):
    if ips is None or ips[0] is None:
        return
    if not packet.haslayer(IP):
        return

    ip_src = packet[IP].src
    ip_dst = packet[IP].dst
    for ip_part in ips:

        if ip_src.startswith(ip_part[0]):
            new_src_ip = ip_part[1] + ip_src.removeprefix(ip_part[0])
            packet[IP].src = new_src_ip

        if ip_dst.startswith(ip_part[0]):
            new_dst_ip = ip_part[1] + ip_dst.removeprefix(ip_part[0])
            packet[IP].dst = new_dst_ip


def replaceMac(packet, macs):
    if macs is None or macs[0] is None:
        return
    if not packet.haslayer(Ether):
        return

    mac_src = packet[Ether].src
    mac_dst = packet[Ether].dst
    for mac in macs:

        if mac_src.startswith(mac[0]):
            new_src_ip = mac[1] + mac_src.removeprefix(mac[0])
            packet[Ether].src = new_src_ip

        if mac_dst.startswith(mac[0]):
            new_dst_ip = mac[1] + mac_dst.removeprefix(mac[0])
            packet[Ether].dst = new_dst_ip


def replaceEmailInSMTPPacket(packet, emailadresses):
    if emailadresses is None:
        return
    if not packet.haslayer(TCP):
        return
    if not packet.haslayer(Raw):
        return

    if packet[TCP].dport == 25 or packet[TCP].dport == 465 or packet[TCP].dport == 587 \
            or packet[TCP].dport == 2525:

        tcp_load = packet[TCP].load.decode('utf-8')

        for email_part in emailadresses:
            new_tcp_load = tcp_load.replace(email_part[0], email_part[1])
            packet[Raw].load = new_tcp_load


def processPcap(args):
    packets = list(filter(lambda packet: (not inRemovalRange(packet, args.remove_date_range)), rdpcap(args.input_path)))

    for packet in packets:
        replaceIp(packet, args.replace_ip_part)
        replaceEmailInSMTPPacket(packet, args.replace_email_part)
        replaceMac(packet, args.replace_mac_part)

    wrpcap(args.output_path, packets)
Calendar September 21, 2021