[Python黑帽子_黑客与渗透测试编程之道.pdf] 第三章代码

《Python黑帽子_黑客与渗透测试编程之道》github有好兄弟把书放出来了

https://github.com/TrojanAZhen/Self_Back/blob/master/书籍/Python黑帽子_黑客与渗透测试编程之道.pdf

由于原文是Python2,并且部分API有改动,这里的代码也略有改动,不影响使用

第三章总的来说还算简单,重点理解一下那个 IP 包结构和ICMP部分结构就行,我这样没怎么学过网络的童鞋来说,还行

windows和linux上的包嗅探
import socket
import os

# 监听的主机,不是本机的IP也可以
host = "localhost"

# 创建原始套接字,然后绑定在公开接口上
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP
# windows和linux 的区别,windows运行嗅探所有的数据包,linux只能嗅探ICMP协议

sniffer = socket.socket(socket.AF_INET,
                        socket.SOCK_RAW, 
                        socket_protocol)
sniffer.bind((host, 0))

# 
sniffer.setsockopt(socket.IPPROTO_IP,
                   socket.IP_HDRINCL,
                   1)

# 在 windows 平台上,我们需要设置 IOCTL 以启用混杂模式
if os.name == "nt":
    # 发送 IOCTL 信号到网卡驱动
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

# 读取单个数据包
while True:
    recv = sniffer.recvfrom(65535)
    # print(len(recv[0]), recv)
    print(recv[1])

if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
解码IP层
import socket

import os
import struct
from ctypes import *


# 监听的主机
host = "10.202.60.3"

# IP头定义
class IP(Structure):
    
    # 定义了 ctype 结构体
    _fields_ = [
            ("ihl",           c_ubyte, 4),
            ("version",       c_ubyte, 4),
            ("tos",           c_ubyte),
            ("len",           c_ushort),
            ("id",            c_ushort),
            ("offset",        c_ushort),
            ("ttl",           c_ubyte),
            ("protocol_num",  c_ubyte),
            ("sum",           c_ushort),
            ("src",           c_ulong),
            ("dst",           c_ulong),
        ]
    
    def  __new__(self, socket_buffer=None):
        # __new__ 方法将原始缓冲区中的数据填充到结构中
        return self.from_buffer_copy(socket_buffer)
        # 当调用 __init__ 方法时,__new__ 方法已经完成了对缓冲区数据的处理
        
    def __init__(self, socket_buffer=None):
        # 协议字段与协议名称
        self.protocol_map = { 1:"ICMP",
                              6:"TCP",
                             17:"UDP"}
        # 可读性更强的IP地址
        self.src_address = socket.inet_ntoa(struct.pack("<L", self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst))
        
        # 协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)
            

if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)


    try:
        while True:
            
            # 读取数据包
            raw_buffer = sniffer.recvfrom(65535)[0]
            
            # 将缓冲区的前20个字节按照IP头进行解析
            ip_header = IP(raw_buffer[0:20])
            
            # 输出协议与通信双方的IP地址
            print("Protocal: %s %s -> %s" % 
                  (ip_header.protocol, ip_header.src_address, ip_header.dst_address))
    except KeyboardInterrupt:
        
        # 如果运行 windows 上,关闭混杂模式
        if os.name == "nt":
            sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

我这里ping一下百度哈:

在这里插入图片描述
这是我的结果:
在这里插入图片描述
ping 是向特定的目的主机发送 ICMP(Internet Control Message Protocol 因特网报文控制协议)Echo 请求报文,测试目的站是否可达及了解其有关状态

解码ICMP
import socket

import os
import struct
from ctypes import *


# 监听的主机
host = "10.202.60.3"
# host = 'localhost'

# IP头定义
class IP(Structure):
    
    # 定义了 ctype 结构体
    _fields_ = [
            ("ihl",           c_ubyte, 4),
            ("version",       c_ubyte, 4),
            ("tos",           c_ubyte),
            ("len",           c_ushort),
            ("id",            c_ushort),
            ("offset",        c_ushort),
            ("ttl",           c_ubyte),
            ("protocol_num",  c_ubyte),
            ("sum",           c_ushort),
            ("src",           c_ulong),
            ("dst",           c_ulong),
        ]
    
    def  __new__(self, socket_buffer=None):
        # __new__ 方法将原始缓冲区中的数据填充到结构中
        return self.from_buffer_copy(socket_buffer)
        # 当调用 __init__ 方法时,__new__ 方法已经完成了对缓冲区数据的处理
        
    def __init__(self, socket_buffer=None):
        # 协议字段与协议名称
        self.protocol_map = { 1:"ICMP",
                              6:"TCP",
                             17:"UDP"}
        # 可读性更强的IP地址
        self.src_address = socket.inet_ntoa(struct.pack("<L", self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst))
        
        # 协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)
            

class ICMP(Structure):
    
    _fields_ = [
        ("type",         c_ubyte),
        ("code",         c_ubyte),
        ("checksum",     c_ushort),
        ("unused",       c_ushort),
        ("next_hop_mtu", c_ushort),
    ]
    
    def __new__(self, socket_buffer):
        return self.from_buffer_copy(socket_buffer)
    
    def __init__(self, socket_buffer):
        pass


if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)


    try:
        while True:
            
            # 读取数据包
            raw_buffer = sniffer.recvfrom(65535)[0]
            
            # 将缓冲区的前20个字节按照IP头进行解析
            ip_header = IP(raw_buffer[0:20])
            
            # if host == ip_header.src_address or host == ip_header.dst_address:
            
            #     # 输出协议与通信双方的IP地址
            #     print("Protocal: %s %s -> %s" % 
            #           (ip_header.protocol, ip_header.src_address, ip_header.dst_address))
                
            # print("Protocal: %s %s -> %s" % 
            #           (ip_header.protocol, ip_header.src_address, ip_header.dst_address))
            
            # 如果是 ICMP 协议包, 则进行处理
            if ip_header.protocol == "ICMP":
                
                # 计算ICMP包的起始位置
                offset = ip_header.ihl * 4
                # 计算ICMP数据在原始数据包中的偏移
                buf = raw_buffer[offset: offset+sizeof(ICMP)]
                
                # 解析 ICMP 数据
                icmp_header = ICMP(buf)
                
                print("ICMP -> Type: %d Code: %d" % 
                      (icmp_header.type, icmp_header.code))
                
    except KeyboardInterrupt:
        
        # 如果运行 windows 上,关闭混杂模式
        if os.name == "nt":
            sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
pip install netaddr -i https://mirror.baidu.com/pypi/simple
# 在程序中添加最后的代码,实现UDP数据的获取扫描结果了
import os
import socket
import threading
import time
import struct
from ctypes import *

from netaddr import IPNetwork, IPAddress



# 监听的主机
host = "10.202.60.3" # 本机地址

# 扫描的目标子网
subnet = "10.202.32.000/19"

# 自定义的字符串,我们将在 ICMP 响应中进行核对
magic_message = b"PYTHONRULES!"

# IP头定义
class IP(Structure):
    
    # 定义了 ctype 结构体
    _fields_ = [
            ("ihl",           c_ubyte, 4),
            ("version",       c_ubyte, 4),
            ("tos",           c_ubyte),
            ("len",           c_ushort),
            ("id",            c_ushort),
            ("offset",        c_ushort),
            ("ttl",           c_ubyte),
            ("protocol_num",  c_ubyte),
            ("sum",           c_ushort),
            ("src",           c_ulong),
            ("dst",           c_ulong),
        ]
    
    def  __new__(self, socket_buffer=None):
        # __new__ 方法将原始缓冲区中的数据填充到结构中
        return self.from_buffer_copy(socket_buffer)
        # 当调用 __init__ 方法时,__new__ 方法已经完成了对缓冲区数据的处理
        
    def __init__(self, socket_buffer=None):
        # 协议字段与协议名称
        self.protocol_map = { 1:"ICMP",
                              6:"TCP",
                             17:"UDP"}
        # 可读性更强的IP地址
        self.src_address = socket.inet_ntoa(struct.pack("<L", self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst))
        
        # 协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)
            

class ICMP(Structure):
    
    _fields_ = [
        ("type",         c_ubyte),
        ("code",         c_ubyte),
        ("checksum",     c_ushort),
        ("unused",       c_ushort),
        ("next_hop_mtu", c_ushort),
    ]
    
    def __new__(self, socket_buffer):
        return self.from_buffer_copy(socket_buffer)
    
    def __init__(self, socket_buffer):
        pass


if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)



# 批量发送 UDP 数据包
def udp_sender(subnet, magic_message):
    
    time.sleep(1)
    
    sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    for ip in IPNetwork(subnet): # 我的IP是:list(IPNetwork(subnet))
        
        try:
            # print("[SendTo] %s %d"%(ip, 65212))
            sender.sendto(magic_message, ("%s"%ip, 65212))
        except:
            pass
        time.sleep(0.05)

# udp_sender(subnet, magic_message) # 调试入口

# 开始发送数据包
t = threading.Thread(target=udp_sender, 
                      args=(subnet, magic_message))
t.start()


if os.name == "nt":
    
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
    
    try:
        
        while True:
            
            # 读取数据包
            raw_buffer = sniffer.recvfrom(65535)[0]
            
            # 将缓冲区的前20个字节按照IP头进行解析
            ip_header = IP(raw_buffer[0:20])
            
            # if host == ip_header.src_address or host == ip_header.dst_address:
            
            #     # 输出协议与通信双方的IP地址
            #     print("Protocal: %s %s -> %s" % 
            #           (ip_header.protocol, ip_header.src_address, ip_header.dst_address))
                
            # print("Protocal: %s %s -> %s" % 
            #           (ip_header.protocol, ip_header.src_address, ip_header.dst_address))
            
            
            # 如果是 ICMP 协议包, 则进行处理
            if ip_header.protocol == "ICMP":
                
                # 计算ICMP包的起始位置
                offset = ip_header.ihl * 4
                # 计算ICMP数据在原始数据包中的偏移
                buf = raw_buffer[offset: offset+sizeof(ICMP)]
                
                # 解析 ICMP 数据
                icmp_header = ICMP(buf)
                
                # print("ICMP -> Type: %d Code: %d" % 
                #       (icmp_header.type, icmp_header.code))
                
            
                # 检查类型和代码值是否为3
                if icmp_header.code == 1 and icmp_header.type == 3:
                    
                    # 确认响应的主机在我们的目标子网之内
                    if IPAddress(ip_header.src_address) in IPNetwork(subnet):
                        
                        # 确认 ICMP 数据中包括我们发送的自定义的字符串
                        if raw_buffer[len(raw_buffer)-len(magic_message):] == magic_message:
                            print("Host Up: %s -> %s" % (ip_header.dst_address, ip_header.src_address))
        
    
    except KeyboardInterrupt:
        
        # 如果运行 windows 上,关闭混杂模式
        if os.name == "nt":
            sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)           
GitHub 加速计划 / li / linux-dash
10.39 K
1.2 K
下载
A beautiful web dashboard for Linux
最近提交(Master分支:2 个月前 )
186a802e added ecosystem file for PM2 4 年前
5def40a3 Add host customization support for the NodeJS version 4 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐