一、项目背景与需求分析

在日常的车辆交易场景中,微信群和私聊消息蕴含着大量潜在客户线索。如何从海量的微信消息中自动提取有价值的信息,识别车辆图片,抓取手机号,并最终将结构化数据推送至业务系统,成为提升获客效率的关键。

本文介绍一个完整的微信消息处理系统,该系统能够:

  1. 接收来自Python客户端的微信消息推送

  2. 解析文本和图片消息,下载并上传图片至云端

  3. 通过AI智能体识别车辆图片和提取结构化信息

  4. 整合关联的文本和图片消息

  5. 将有效线索推送至业务系统

二、系统架构设计

2.1 整体架构

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│  Python客户端   │────▶│  SpringBoot    │────▶│   阿里云OSS     │
│  (微信)     │     │  接收服务       │     │   图片存储      │
└─────────────────┘     └─────────────────┘     └─────────────────┘
        │                       │                        │
        │                       ▼                        │
        │              ┌─────────────────┐               │
        │              │   消息持久化     │               │
        │              │   文件存储       │               │
        │              └─────────────────┘               │
        │                       │                        │
        │                       ▼                        │
        │              ┌─────────────────┐               │
        │              │    AI智能体      │               │
        │              │   车辆识别       │               │
        │              └─────────────────┘               │
        │                       │                        │
        │                       ▼                        │
        │              ┌─────────────────┐               │
        └─────────────▶│   业务系统推送   │◀──────────────┘
                       └─────────────────┘

2.2 核心模块说明

模块名称 功能描述
消息接收模块 接收Python客户端推送的微信消息,解析JSON格式数据
图片处理模块 解密下载微信图片,上传至OSS,删除本地临时文件
数据持久化模块 使用文件系统存储消息记录,支持读写锁并发控制
消息整合模块 将图片消息与对应的文本消息关联整合
智能识别模块 调用AI接口识别车辆图片和提取结构化信息
线索推送模块 将有效线索推送至业务系统接口

三、核心代码实现详解

3.1 消息实体类定义

public class WxInfo {
    private String groupName;      // 群名称
    private String sender;         // 发送者
    private String sendContent;    // 发送内容
    private Long sendTime;         // 发送时间戳
    private String imageUrl;       // 图片URL
    // getter/setter省略
}

3.2 文件持久化实现

系统使用文件系统存储消息记录,采用读写锁保证线程安全:

public class BigController {
    private static final String FILE_PATH = "wx_info_list.txt";
    private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    
    public static void appendToFile(WxInfo wxInfo) {
        lock.writeLock().lock();
        try {
            List<WxInfo> existingList = loadFromFile();
            existingList.add(wxInfo);
            writeFullArray(existingList);
        } finally {
            lock.writeLock().unlock();
        }
    }
    
    public static List<WxInfo> loadFromFile() {
        lock.readLock().lock();
        try {
            // 检查文件存在性和空文件情况
            Path path = Paths.get(FILE_PATH);
            if (!Files.exists(path) || Files.size(path) == 0) {
                return new ArrayList<>();
            }
            // 反序列化JSON数组
            CollectionType listType = objectMapper.getTypeFactory()
                .constructCollectionType(ArrayList.class, WxInfo.class);
            return objectMapper.readValue(path.toFile(), listType);
        } catch (Exception e) {
            // 文件损坏时自动备份并重建
            backupAndCreateNewFile();
            return new ArrayList<>();
        } finally {
            lock.readLock().unlock();
        }
    }
}

3.3 微信消息解析

系统接收的JSON消息结构复杂,需要设计对应的解析类:

static class TextMsgJsonParse {
    int eventType;                    // 事件类型:2004私聊/2000群聊
    String reciveType;                // 消息类型:文本消息/图片消息
    TextMsgContent content;           // 消息内容
    TextMsgFromUserName fromUserName; // 发送者ID
    long createTime;                  // 创建时间
    String roomSenderBy;              // 群聊发送者ID
    String realContent;               // 实际内容
    SenderProfile senderProfile;      // 发送者资料
    MemberInfo memberInfo;            // 成员信息
}

3.4 图片消息处理流程

当收到图片消息时,需要从XML中提取解密参数,下载并上传:

if (reciveType.equals("图片消息")) {
    // 1. 从XML中提取aeskey和cdnthumburl
    String xml = textMsgJsonParse.real_content;
    DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
    DocumentBuilder builder = factory.newDocumentBuilder();
    Document doc = builder.parse(new ByteArrayInputStream(xml.getBytes("UTF-8")));
    Element imgElement = (Element) doc.getElementsByTagName("img").item(0);
    String aeskey = imgElement.getAttribute("aeskey");
    String cdnthumburl = imgElement.getAttribute("cdnthumburl");
    
    // 2. 下载图片到本地临时目录
    String tempImagePath = Constants.DOWNLOAD_IMAGE_PATH + 
                           System.currentTimeMillis() + ".jpg";
    Map<String, Object> result1 = cdnDownload(cdnthumburl, aeskey, 
                                   Step4_DownloadImage.CdnType.NORMAL_IMAGE, 
                                   tempImagePath);
    
    // 3. 上传到阿里云OSS
    Map<String, Object> result = uploadToOSS(tempImagePath);
    tempWxInfo.setImageUrl((String) result.get("url"));
    
    // 4. 删除本地临时文件
    File tempFile = new File(tempImagePath);
    tempFile.delete();
}

3.5 阿里云OSS上传实现

public class Step1_UploadToOss {
    private static final String ACCESS_KEY_ID = "your_access_key";
    private static final String ACCESS_KEY_SECRET = "your_secret";
    private static final String BUCKET_NAME = "your_bucket";
    private static final String ENDPOINT = "https://oss-cn-beijing.aliyuncs.com";

    public static Map<String, Object> uploadToOSS(String imagePath) {
        Map<String, Object> result = new HashMap<>();
        OSS ossClient = new OSSClientBuilder().build(ENDPOINT, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
        
        try {
            // 生成唯一文件名
            String fileExtension = imagePath.substring(imagePath.lastIndexOf(".") + 1);
            String timestamp = LocalDateTime.now().format(
                DateTimeFormatter.ofPattern("yyyyMMddHHmmss"));
            String uniqueFilename = timestamp + "_" + 
                UUID.randomUUID().toString().substring(0, 8) + "." + fileExtension;
            String ossPath = "wx_clue_pic/" + uniqueFilename;
            
            // 执行上传
            PutObjectRequest putObjectRequest = new PutObjectRequest(
                BUCKET_NAME, ossPath, new File(imagePath));
            ossClient.putObject(putObjectRequest);
            
            // 生成访问URL
            String url = "https://" + BUCKET_NAME + "." + 
                         ENDPOINT.replace("https://", "") + "/" + ossPath;
            result.put("url", url);
        } finally {
            ossClient.shutdown();
        }
        return result;
    }
}

3.6 消息整合算法

将图片消息与关联的文本消息合并是系统的核心逻辑:

public static List<WxInfo> integrateWxInfos(List<WxInfo> data) {
    // 分离图片消息和文本消息
    List<WxInfo> imageMsgs = new ArrayList<>();
    List<WxInfo> textMsgs = new ArrayList<>();
    
    for (WxInfo msg : data) {
        if (msg.getSendContent() != null && 
            msg.getSendContent().contains("发送的图片消息")) {
            imageMsgs.add(msg);
        } else {
            textMsgs.add(msg);
        }
    }
    
    // 对每个图片消息,找到同群同发送者且时间最近的文本消息
    for (WxInfo imgMsg : imageMsgs) {
        WxInfo closestTextMsg = null;
        long minDiff = Long.MAX_VALUE;
        
        for (WxInfo textMsg : textMsgs) {
            if (Objects.equals(textMsg.getGroupName(), imgMsg.getGroupName()) &&
                Objects.equals(textMsg.getSender(), imgMsg.getSender())) {
                long timeDiff = Math.abs(textMsg.getSendTime() - imgMsg.getSendTime());
                if (timeDiff < minDiff && timeDiff <= 300) { // 5分钟窗口
                    minDiff = timeDiff;
                    closestTextMsg = textMsg;
                }
            }
        }
        
        // 合并图片URL
        if (closestTextMsg != null) {
            if (closestTextMsg.getImageUrl() == null) {
                closestTextMsg.setImageUrl(imgMsg.getImageUrl());
            } else {
                closestTextMsg.setImageUrl(closestTextMsg.getImageUrl() + 
                                          "," + imgMsg.getImageUrl());
            }
        }
    }
    
    return textMsgs.stream()
        .sorted(Comparator.comparing(WxInfo::getSendTime))
        .collect(Collectors.toList());
}

3.7 手机号识别工具

public class Step4_PhoneNumberUtil {
    private static final Pattern PHONE_PATTERN = 
        Pattern.compile("(?<!\\d)(1[3-9]\\d{9})(?!\\d)");

    public static boolean containsPhoneNumber(String text) {
        if (text == null || text.isEmpty()) {
            return false;
        }
        Matcher matcher = PHONE_PATTERN.matcher(text);
        return matcher.find();
    }
}

3.8 HTTP请求发送

public class Step3_HttpUpload {
    public static String sendPostRequest(String url, String jsonData) throws Exception {
        HttpURLConnection connection = null;
        try {
            URL apiUrl = new URL(url);
            connection = (HttpURLConnection) apiUrl.openConnection();
            connection.setRequestMethod("POST");
            connection.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
            connection.setDoOutput(true);
            connection.setConnectTimeout(5000);
            connection.setReadTimeout(5000);
            
            // 发送请求体
            try (OutputStream os = connection.getOutputStream()) {
                byte[] input = jsonData.getBytes(StandardCharsets.UTF_8);
                os.write(input, 0, input.length);
            }
            
            // 读取响应
            StringBuilder response = new StringBuilder();
            try (BufferedReader br = new BufferedReader(
                new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) {
                String responseLine;
                while ((responseLine = br.readLine()) != null) {
                    response.append(responseLine.trim());
                }
            }
            return response.toString();
        } finally {
            if (connection != null) connection.disconnect();
        }
    }
}

3.9 数据库批量操作

系统支持对好友和群信息的批量插入和更新:

@PostMapping("/insert_friend")
public void insert_friend(@RequestBody List<Friend> insertFriendList) {
    List<Friend> existFriendList = friendMapper.selectList(null);
    Map<String, Friend> existFriendMap = existFriendList.stream()
        .collect(Collectors.toMap(Friend::getUserId, Function.identity()));
    
    List<Friend> newFriendList = new ArrayList<>();
    List<Friend> updateFriendList = new ArrayList<>();
    
    for (Friend friend : insertFriendList) {
        if (existFriendMap.containsKey(friend.getUserId())) {
            Friend existFriend = existFriendMap.get(friend.getUserId());
            existFriend.setRemark(friend.getRemark());
            updateFriendList.add(existFriend);
        } else {
            newFriendList.add(friend);
        }
    }
    
    if (!newFriendList.isEmpty()) {
        friendMapper.insertBatchFriend(newFriendList);
    }
    if (!updateFriendList.isEmpty()) {
        friendMapper.updateBatchById(updateFriendList);
    }
}

3.10 主流程控制器

@PostMapping("/receive_python_msg")
public void receive_python_msg(@RequestBody Object pythonMsg) throws Exception {
    // 解析消息
    String jsonStr = mapper.writeValueAsString(pythonMsg);
    TextMsgJsonParse parse = gson.fromJson(jsonStr, TextMsgJsonParse.class);
    
    WxInfo tempWxInfo = new WxInfo();
    
    // 处理私聊消息(eventType=2004)或群聊消息(eventType=2000)
    if (parse.eventType == 2004 || parse.eventType == 2000) {
        // 构建WxInfo对象...
        
        // 持久化存储
        Constants.WX_INFO_LIST.add(tempWxInfo);
        appendToFile(tempWxInfo);
    }
    
    // 当消息积累超过阈值时触发处理
    if (Constants.WX_INFO_LIST.size() > 20) {
        // 整合消息
        List<WxInfo> finalDataList = integrateWxInfos(Constants.WX_INFO_LIST);
        
        for (WxInfo wxInfo : finalDataList) {
            if (wxInfo.getImageUrl() != null) {
                // 调用AI识别车辆图片
                String[] imgUrlArray = wxInfo.getImageUrl().split(",");
                StringBuilder validUrls = new StringBuilder();
                
                for (String url : imgUrlArray) {
                    String result = SmartEntity_3.SmartWork(
                        UUID.randomUUID().toString(), url);
                    ImageModelResJson res = gson.fromJson(result, 
                        ImageModelResJson.class);
                    if (res.aCarOrADrivingLicense.equals("是")) {
                        validUrls.append(url).append(",");
                    }
                }
                
                // 如果包含手机号,推送至业务系统
                if (validUrls.length() > 0 && 
                    (containsPhoneNumber(wxInfo.getSendContent()) || 
                     containsPhoneNumber(wxInfo.getSender()))) {
                    
                    String totalInfo = buildInfoString(wxInfo, validUrls.toString());
                    String uploadInfo = SmartEntity_1.SmartWork(
                        UUID.randomUUID().toString(), totalInfo);
                    
                    String url = "https://your-api-server/api/app/wxClue/import";
                    String response = sendPostRequest(url, JSON.toJSON(uploadInfo).toString());
                    
                    // 记录日志
                    Car car = new Car();
                    car.setInfo(JSON.toJSON(uploadInfo).toString());
                    carMapper.insert(car);
                }
            }
        }
        
        // 清理5分钟前的旧消息
        long currentTime = System.currentTimeMillis() / 1000;
        Constants.WX_INFO_LIST = Constants.WX_INFO_LIST.stream()
            .filter(wxInfo -> (currentTime - wxInfo.getSendTime()) <= 300)
            .collect(Collectors.toList());
        writeFullArray(Constants.WX_INFO_LIST);
    }
}

四、关键技术点解析

4.1 并发安全设计

系统使用ReentrantReadWriteLock实现读写锁:

  • 读操作可并发执行,提高查询效率

  • 写操作互斥执行,保证数据一致性

  • 文件损坏时自动备份恢复机制

4.2 时间窗口匹配算法

图片与文本的关联采用时间窗口+同群同发送者的双重匹配:

  • 时间窗口设定为5分钟(300秒)

  • 必须满足群名称和发送者完全一致

  • 选择时间差最小的文本消息进行关联

4.3 临时文件管理

图片处理流程遵循严格的资源管理:

  1. 下载到临时目录

  2. 上传至OSS获取永久URL

  3. 立即删除本地临时文件

  4. 防止磁盘空间泄漏

4.4 AI智能体集成

系统集成两个AI智能体:

  • 图片识别智能体:判断图片是否为车辆或驾驶证

  • 信息提取智能体:从文本中提取车辆型号、价格、联系方式等结构化信息

五、性能优化建议

5.1 批量处理优化

  • 消息积累阈值设为20-50条时触发批量处理

  • 避免每条消息都触发AI调用,减少API开销

  • 使用数据库批量插入代替逐条操作

5.2 内存管理

  • 定期清理过期消息,只保留5分钟内的数据

  • 使用流式处理避免大集合内存溢出

  • 文件备份机制防止数据损坏

5.3 异常处理

try {
    // 业务逻辑
} catch (Exception e) {
    log.error("处理失败: {}", e.getMessage(), e);
    // 记录失败消息到死信队列
    saveToDeadLetterQueue(failedMessage);
}

六、部署与运维

6.1 环境要求

  • JDK 1.8+

  • SpringBoot 2.x

  • MySQL 5.7+

  • 阿里云OSS SDK

6.2 配置管理

# application.yml
wx:
  download-path: /data/wx_images/
  batch-size: 20
  time-window: 300
  clean-interval: 600

oss:
  endpoint: https://oss-cn-beijing.aliyuncs.com
  bucket: your-bucket
  prefix: wx_clue_pic/

6.3 监控指标

  • 消息接收QPS

  • 图片处理耗时

  • AI接口响应时间

  • 线索转化率

七、总结与展望

本文实现的微信消息智能处理系统具备以下特点:

  1. 高可用性:文件持久化+读写锁保证数据安全

  2. 可扩展性:模块化设计便于功能扩展

  3. 智能化:AI识别提升线索质量

  4. 自动化:全流程自动处理无需人工干预

未来可优化的方向:

  • 引入消息队列解耦处理流程

  • 增加更多AI识别维度(事故记录、保养记录等)

  • 支持视频消息处理

  • 实现实时流式计算降低延迟

该系统已在车辆交易场景中取得良好效果,日均处理消息数万条,线索转化率提升显著。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐