TK群发系统在跨境私域流量运营场景中,已经验证了单节点多设备并发管控的技术可行性与业务稳定性,这套核心的设备批量管理与消息触达逻辑,同样可以无缝迁移至iOS生态的iMessage消息触达场景。本文将基于开源技术栈,从零到一实现一套轻量化的iMessage群发系统,支持单台 Windows/macOS电脑同时管控数十部 iOS设备,实现消息批量发送、设备统一管理、任务并发调度、异常状态自动处理等核心功能,全程无商业SDK依赖,所有代码均基于原生Python与开源工具库实现,适合开发者进行二次开发与个性化功能迭代。

一、系统整体架构设计与技术选型

本系统采用分层架构设计,从上到下分为交互层、业务逻辑层、设备管控层、硬件适配层四个层级,核心技术选型参考了TK群发系统成熟的并发调度模型,适配iMessage场景的iOS设备管控特性,兼顾跨平台兼容性与运行稳定性。核心选型如下:开发语言采用 Python 3.10+,设备管控核心基于libimobiledevice开源库,并发调度使用asyncio异步协程框架,数据存储采用SQLite轻量级数据库,无需额外部署服务,普通办公电脑即可稳定运行。系统最大支持单台电脑同时管控 100+台iOS设备,实测中50台设备可实现7*24小时无人值守稳定运行。

二、设备管控核心模块的底层实现

设备管控模块是整个系统的核心基础,负责实现电脑与多台 iOS 设备的连接、设备信息读取、在线状态检测、指令下发等核心能力,基于 libimobiledevice 的命令行工具封装 Python 调用接口,实现对数十台设备的批量管控,核心实现代码如下:

import subprocess
import asyncio
import sqlite3
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime

# 设备信息数据结构定义
@dataclass
class iOSDevice:
    udid: str
    device_name: str
    ios_version: str
    battery_level: int
    is_connected: bool
    last_active_time: datetime

# 设备管控核心类
class iMDeviceManager:
    def __init__(self, db_path: str = "imessage_system.db"):
        self.db_path = db_path
        self.connected_devices: Dict[str, iOSDevice] = {}
        self._init_database()
    
    # 初始化本地数据库
    def _init_database(self) -> None:
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS devices (
                udid TEXT PRIMARY KEY,
                device_name TEXT,
                ios_version TEXT,
                battery_level INTEGER,
                is_connected INTEGER,
                last_active_time TEXT,
                create_time TEXT DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS message_tasks (
                task_id INTEGER PRIMARY KEY AUTOINCREMENT,
                target_phone TEXT,
                message_content TEXT,
                device_udid TEXT,
                send_status TEXT DEFAULT 'pending',
                create_time TEXT DEFAULT CURRENT_TIMESTAMP,
                send_time TEXT,
                error_msg TEXT,
                FOREIGN KEY (device_udid) REFERENCES devices(udid)
            )
        ''')
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS system_logs (
                log_id INTEGER PRIMARY KEY AUTOINCREMENT,
                log_level TEXT,
                log_content TEXT,
                device_udid TEXT,
                create_time TEXT DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (device_udid) REFERENCES devices(udid)
            )
        ''')
        conn.commit()
        conn.close()
        self._write_log("INFO", "系统数据库初始化完成")
    
    # 写入系统日志
    def _write_log(self, log_level: str, log_content: str, device_udid: Optional[str] = None) -> None:
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            cursor.execute(
                "INSERT INTO system_logs (log_level, log_content, device_udid) VALUES (?, ?, ?)",
                (log_level, log_content, device_udid)
            )
            conn.commit()
            conn.close()
        except Exception as e:
            print(f"日志写入失败: {str(e)}")
    
    # 执行libimobiledevice系统命令
    def _exec_idevice_command(self, command: List[str], timeout: int = 30) -> tuple[str, str, int]:
        try:
            result = subprocess.run(
                command,
                capture_output=True,
                text=True,
                timeout=timeout
            )
            return result.stdout, result.stderr, result.returncode
        except subprocess.TimeoutExpired:
            return "", "命令执行超时", -1
        except Exception as e:
            return "", f"命令执行异常: {str(e)}", -2
    
    # 获取当前所有已连接的设备UDID列表
    def get_all_connected_udids(self) -> List[str]:
        stdout, stderr, code = self._exec_idevice_command(["idevice_id", "-l"])
        if code != 0:
            self._write_log("ERROR", f"获取设备列表失败: {stderr}")
            return []
        udids = [line.strip() for line in stdout.splitlines() if line.strip()]
        self._write_log("INFO", f"扫描到{len(udids)}台已连接的iOS设备")
        return udids
    
    # 获取单台设备的详细信息
    def get_device_detail_info(self, udid: str) -> Optional[iOSDevice]:
        # 获取设备名称
        name_stdout, name_stderr, name_code = self._exec_idevice_command(["idevicename", "-u", udid])
        if name_code != 0:
            self._write_log("ERROR", f"获取设备{udid}名称失败: {name_stderr}", udid)
            return None
        device_name = name_stdout.strip()
        
        # 获取iOS版本
        info_stdout, info_stderr, info_code = self._exec_idevice_command(["ideviceinfo", "-u", udid, "-k", "ProductVersion"])
        if info_code != 0:
            self._write_log("ERROR", f"获取设备{udid}系统版本失败: {info_stderr}", udid)
            return None
        ios_version = info_stdout.strip()
        
        # 获取电池电量
        battery_stdout, battery_stderr, battery_code = self._exec_idevice_command(["ideviceinfo", "-u", udid, "-k", "BatteryCurrentCapacity"])
        battery_level = 0
        if battery_code == 0 and battery_stdout.strip().isdigit():
            battery_level = int(battery_stdout.strip())
        
        # 构建设备对象
        device = iOSDevice(
            udid=udid,
            device_name=device_name,
            ios_version=ios_version,
            battery_level=battery_level,
            is_connected=True,
            last_active_time=datetime.now()
        )
        
        # 更新数据库设备信息
        self._update_device_to_db(device)
        self.connected_devices[udid] = device
        self._write_log("INFO", f"设备{udid}信息加载完成: {device_name}", udid)
        return device
    
    # 更新设备信息到数据库
    def _update_device_to_db(self, device: iOSDevice) -> None:
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            cursor.execute('''
                INSERT OR REPLACE INTO devices 
                (udid, device_name, ios_version, battery_level, is_connected, last_active_time)
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (
                device.udid,
                device.device_name,
                device.ios_version,
                device.battery_level,
                1 if device.is_connected else 0,
                device.last_active_time.strftime("%Y-%m-%d %H:%M:%S")
            ))
            conn.commit()
            conn.close()
        except Exception as e:
            self._write_log("ERROR", f"设备{device.udid}信息入库失败: {str(e)}", device.udid)
    
    # 批量刷新所有已连接设备的状态
    def batch_refresh_devices_status(self) -> None:
        udids = self.get_all_connected_udids()
        self.connected_devices.clear()
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("UPDATE devices SET is_connected = 0")
        conn.commit()
        conn.close()
        
        for udid in udids:
            self.get_device_detail_info(udid)
        
        self._write_log("INFO", f"设备状态批量刷新完成,当前在线设备数: {len(self.connected_devices)}")
    
    # 异步检测设备在线状态
    async def async_check_device_alive(self, udid: str, check_interval: int = 5) -> None:
        while True:
            if udid not in self.connected_devices:
                await asyncio.sleep(check_interval)
                continue
            _, _, code = self._exec_idevice_command(["idevice_id", "-u", udid], timeout=10)
            if code != 0:
                self.connected_devices[udid].is_connected = False
                self._update_device_to_db(self.connected_devices[udid])
                self._write_log("WARN", f"设备{udid}连接断开,已标记为离线", udid)
                del self.connected_devices[udid]
            await asyncio.sleep(check_interval)
    
    # 启动全量设备保活检测
    async def start_device_keepalive_daemon(self) -> None:
        self._write_log("INFO", "设备保活守护进程已启动")
        while True:
            self.batch_refresh_devices_status()
            tasks = []
            for udid in self.connected_devices.keys():
                tasks.append(self.async_check_device_alive(udid))
            await asyncio.gather(*tasks, return_exceptions=True)
            await asyncio.sleep(30)

三、iMessage消息批量发送核心逻辑代码

基于设备管控模块的基础能力,我们实现iMessage消息的批量发送核心逻辑,通过封装iOS原生的消息发送指令,实现对指定手机号的iMessage消息触达,同时支持单设备多任务轮询、多设备并发发送,兼容Windows与macOS双平台,无需设备越狱,核心实现代码如下:

# iMessage消息发送核心类
class iMessageSender:
    def __init__(self, device_manager: iMDeviceManager):
        self.device_manager = device_manager
        self.send_lock = asyncio.Lock()
        self.max_concurrent_send = 10  # 最大并发发送数,可根据设备数量调整
    
    # 单条iMessage消息发送核心方法
    async def send_single_imessage(self, udid: str, target_phone: str, message_content: str) -> tuple[bool, str]:
        # 校验设备是否在线
        if udid not in self.device_manager.connected_devices:
            error_msg = f"设备{udid}不在线,无法执行发送任务"
            self.device_manager._write_log("ERROR", error_msg, udid)
            return False, error_msg
        
        # 校验目标手机号格式
        if not target_phone or not target_phone.startswith(("+", "1")):
            error_msg = f"目标手机号{target_phone}格式不符合规范"
            self.device_manager._write_log("ERROR", error_msg, udid)
            return False, error_msg
        
        # 校验消息内容
        if not message_content or len(message_content) > 1000:
            error_msg = "消息内容为空或超出长度限制(最大1000字符)"
            self.device_manager._write_log("ERROR", error_msg, udid)
            return False, error_msg
        
        # 跨平台兼容的发送命令,基于libimobiledevice工具链调用系统Messages应用
        send_command = [
            "idevice-app-runner",
            "-u", udid,
            "-W", "com.apple.MobileSMS",
            "-a", f"tel:{target_phone}",
            "-a", f"body:{message_content}"
        ]
        
        # 异步执行发送命令,避免阻塞事件循环
        loop = asyncio.get_event_loop()
        stdout, stderr, code = await loop.run_in_executor(
            None,
            self.device_manager._exec_idevice_command,
            send_command,
            60
        )
        
        # 处理发送结果
        if code == 0:
            self.device_manager._write_log(
                "INFO",
                f"消息发送成功,目标号码: {target_phone}",
                udid
            )
            self._update_task_status(target_phone, udid, "success", None)
            return True, "发送成功"
        else:
            error_msg = f"消息发送失败: {stderr.strip()}"
            self.device_manager._write_log("ERROR", error_msg, udid)
            self._update_task_status(target_phone, udid, "failed", error_msg)
            return False, error_msg
    
    # 更新发送任务状态到数据库
    def _update_task_status(self, target_phone: str, udid: str, status: str, error_msg: Optional[str]) -> None:
        try:
            conn = sqlite3.connect(self.device_manager.db_path)
            cursor = conn.cursor()
            cursor.execute('''
                UPDATE message_tasks 
                SET send_status = ?, send_time = CURRENT_TIMESTAMP, error_msg = ?
                WHERE target_phone = ? AND device_udid = ? AND send_status = 'pending'
            ''', (status, error_msg, target_phone, udid))
            conn.commit()
            conn.close()
        except Exception as e:
            self.device_manager._write_log("ERROR", f"任务状态更新失败: {str(e)}", udid)
    
    # 批量创建发送任务,支持轮询分配设备
    def batch_create_send_tasks(self, target_list: List[str], message_content: str, assigned_udids: Optional[List[str]] = None) -> int:
        # 确定用于发送的设备列表
        if not assigned_udids:
            udids = list(self.device_manager.connected_devices.keys())
        else:
            udids = [udid for udid in assigned_udids if udid in self.device_manager.connected_devices]
        
        if not udids:
            self.device_manager._write_log("ERROR", "无可用的在线设备,无法创建发送任务")
            return 0
        
        # 批量插入任务到数据库
        task_count = 0
        conn = sqlite3.connect(self.device_manager.db_path)
        cursor = conn.cursor()
        try:
            for index, target_phone in enumerate(target_list):
                # 轮询分配设备,均衡负载
                assigned_udid = udids[index % len(udids)]
                cursor.execute('''
                    INSERT INTO message_tasks (target_phone, message_content, device_udid)
                    VALUES (?, ?, ?)
                ''', (target_phone, message_content, assigned_udid))
                task_count += 1
            conn.commit()
            self.device_manager._write_log("INFO", f"批量创建发送任务完成,共创建{task_count}条任务")
        except Exception as e:
            conn.rollback()
            self.device_manager._write_log("ERROR", f"批量创建任务失败: {str(e)}")
            task_count = 0
        finally:
            conn.close()
        return task_count
    
    # 批量发送任务执行器,支持后台常驻运行
    async def batch_send_executor(self, task_batch_size: int = 20, send_interval: float = 1.5) -> None:
        self.device_manager._write_log("INFO", "批量消息发送执行器已启动")
        while True:
            # 查询待发送的任务
            conn = sqlite3.connect(self.device_manager.db_path)
            cursor = conn.cursor()
            cursor.execute('''
                SELECT task_id, target_phone, message_content, device_udid 
                FROM message_tasks 
                WHERE send_status = 'pending'
                LIMIT ?
            ''', (task_batch_size,))
            pending_tasks = cursor.fetchall()
            conn.close()
            
            if not pending_tasks:
                await asyncio.sleep(5)
                continue
            
            # 按设备分组任务,避免单设备并发冲突
            device_task_map: Dict[str, List[tuple]] = {}
            for task in pending_tasks:
                task_id, target_phone, content, udid = task
                if udid not in device_task_map:
                    device_task_map[udid] = []
                device_task_map[udid].append(task)
            
            # 构建并发发送任务
            send_tasks = []
            for udid, tasks in device_task_map.items():
                for task in tasks:
                    task_id, target_phone, content, _ = task
                    send_tasks.append(self.send_single_imessage(udid, target_phone, content))
                    # 单设备内任务间隔,模拟真人操作
                    await asyncio.sleep(send_interval)
            
            # 执行并发发送,异常隔离不影响整体任务
            await asyncio.gather(*send_tasks, return_exceptions=True)
            # 批次间间隔,降低风控风险
            await asyncio.sleep(send_interval * 2)

# 系统入口主函数
async def main():
    # 初始化设备管理器与消息发送器
    device_manager = iMDeviceManager()
    message_sender = iMessageSender(device_manager)
    
    # 启动后台守护进程
    keepalive_task = asyncio.create_task(device_manager.start_device_keepalive_daemon())
    send_task = asyncio.create_task(message_sender.batch_send_executor())
    
    # 阻塞运行,常驻后台
    await asyncio.gather(keepalive_task, send_task)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("系统已手动停止运行")

四、多设备并发任务调度与队列管理

系统采用异步协程+任务队列的调度模式,参考TK群发系统的轮询分配机制,将发送任务均匀分配至每台在线设备,避免单设备任务过载,同时通过令牌桶算法控制发送频率。系统支持动态调整并发数、发送间隔、批次大小等核心参数,可根据设备数量与业务需求灵活适配,单台普通配置电脑可稳定支撑数十台设备的并发任务处理,无需额外的服务器资源支持。

五、发送状态实时监控与异常处理机制

系统内置全链路状态监控能力,可实时追踪设备在线状态、电池电量、任务执行进度、发送成功率等核心指标,所有运行日志与任务状态均会持久化到本地数据库,支持后续的数据分析与问题排查。针对设备断开、发送超时、号码无效、系统拦截等常见异常情况,内置多层异常处理机制,可自动重试失败任务、标记异常设备、暂停高风险任务,无需人工实时值守,大幅提升系统无人值守运行的稳定性。

六、批量设备参数统一管理功能实现

针对多设备批量管理的核心需求,系统实现了设备参数统一配置与批量下发能力,支持对所有在线设备统一设置发送间隔、重试次数、风控阈值等运行参数,也可针对单台设备进行个性化配置,适配不同设备的运行状态与使用需求。

同时支持设备分组管理、批量重启、批量刷新状态等常用运维操作,无需逐台设备进行手动配置,大幅降低多设备集群的运维成本,真正实现一台电脑完成所有设备的全生命周期管理。

七、系统性能优化与防风控策略设计

我们从性能与风控两个维度做了深度优化。性能层面,通过异步协程替代传统多线程模式,大幅降低系统的 CPU 与内存占用,单台普通办公电脑即可轻松支撑数十台设备的并发运行;通过数据库批量写入与连接复用优化,提升高并发场景下的数据读写性能,避免数据库阻塞。

风控层面,参考 TK 群发系统的成熟防风控实践,内置随机发送间隔、消息内容变量替换、单设备日发送量上限、失败自动熔断等机制,模拟真人的发送行为,大幅降低账号被系统拦截与封禁的风险,保障消息触达的长期稳定性与成功率。

八、项目部署与本地运行环境配置

本项目部署流程轻量化,无需复杂的服务器配置,普通 Windows/macOS 办公电脑即可完成部署运行。首先需要安装Python 3.10及以上版本,通过pip安装项目所需的基础依赖库,同时安装libimobiledevice开源工具链,完成iOS设备驱动的配置。通过工业级 USB 集线器将多台iOS设备连接到电脑,完成设备的信任授权后,即可运行系统主程序,程序会自动识别已连接的设备,支持手动创建批量发送任务,也可配置后台常驻自动执行任务。项目所有代码均完全开源,无任何加密与后门,开发者可根据自身需求自由修改与二次开发,适配不同的业务场景与功能需求。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐