企业解决方案十四-个人微信消息智能处理系统:从消息采集到车辆线索的完整实现
一、项目背景与需求分析
在日常的车辆交易场景中,微信群和私聊消息蕴含着大量潜在客户线索。如何从海量的微信消息中自动提取有价值的信息,识别车辆图片,抓取手机号,并最终将结构化数据推送至业务系统,成为提升获客效率的关键。
本文介绍一个完整的微信消息处理系统,该系统能够:
-
接收来自Python客户端的微信消息推送
-
解析文本和图片消息,下载并上传图片至云端
-
通过AI智能体识别车辆图片和提取结构化信息
-
整合关联的文本和图片消息
-
将有效线索推送至业务系统
二、系统架构设计
2.1 整体架构
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Python客户端 │────▶│ SpringBoot │────▶│ 阿里云OSS │
│ (微信) │ │ 接收服务 │ │ 图片存储 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ 消息持久化 │ │
│ │ 文件存储 │ │
│ └─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ AI智能体 │ │
│ │ 车辆识别 │ │
│ └─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
└─────────────▶│ 业务系统推送 │◀──────────────┘
└─────────────────┘
2.2 核心模块说明
| 模块名称 | 功能描述 |
|---|---|
| 消息接收模块 | 接收Python客户端推送的微信消息,解析JSON格式数据 |
| 图片处理模块 | 解密下载微信图片,上传至OSS,删除本地临时文件 |
| 数据持久化模块 | 使用文件系统存储消息记录,支持读写锁并发控制 |
| 消息整合模块 | 将图片消息与对应的文本消息关联整合 |
| 智能识别模块 | 调用AI接口识别车辆图片和提取结构化信息 |
| 线索推送模块 | 将有效线索推送至业务系统接口 |
三、核心代码实现详解
3.1 消息实体类定义
public class WxInfo {
private String groupName; // 群名称
private String sender; // 发送者
private String sendContent; // 发送内容
private Long sendTime; // 发送时间戳
private String imageUrl; // 图片URL
// getter/setter省略
}
3.2 文件持久化实现
系统使用文件系统存储消息记录,采用读写锁保证线程安全:
public class BigController {
private static final String FILE_PATH = "wx_info_list.txt";
private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public static void appendToFile(WxInfo wxInfo) {
lock.writeLock().lock();
try {
List<WxInfo> existingList = loadFromFile();
existingList.add(wxInfo);
writeFullArray(existingList);
} finally {
lock.writeLock().unlock();
}
}
public static List<WxInfo> loadFromFile() {
lock.readLock().lock();
try {
// 检查文件存在性和空文件情况
Path path = Paths.get(FILE_PATH);
if (!Files.exists(path) || Files.size(path) == 0) {
return new ArrayList<>();
}
// 反序列化JSON数组
CollectionType listType = objectMapper.getTypeFactory()
.constructCollectionType(ArrayList.class, WxInfo.class);
return objectMapper.readValue(path.toFile(), listType);
} catch (Exception e) {
// 文件损坏时自动备份并重建
backupAndCreateNewFile();
return new ArrayList<>();
} finally {
lock.readLock().unlock();
}
}
}
3.3 微信消息解析
系统接收的JSON消息结构复杂,需要设计对应的解析类:
static class TextMsgJsonParse {
int eventType; // 事件类型:2004私聊/2000群聊
String reciveType; // 消息类型:文本消息/图片消息
TextMsgContent content; // 消息内容
TextMsgFromUserName fromUserName; // 发送者ID
long createTime; // 创建时间
String roomSenderBy; // 群聊发送者ID
String realContent; // 实际内容
SenderProfile senderProfile; // 发送者资料
MemberInfo memberInfo; // 成员信息
}
3.4 图片消息处理流程
当收到图片消息时,需要从XML中提取解密参数,下载并上传:
if (reciveType.equals("图片消息")) {
// 1. 从XML中提取aeskey和cdnthumburl
String xml = textMsgJsonParse.real_content;
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
Document doc = builder.parse(new ByteArrayInputStream(xml.getBytes("UTF-8")));
Element imgElement = (Element) doc.getElementsByTagName("img").item(0);
String aeskey = imgElement.getAttribute("aeskey");
String cdnthumburl = imgElement.getAttribute("cdnthumburl");
// 2. 下载图片到本地临时目录
String tempImagePath = Constants.DOWNLOAD_IMAGE_PATH +
System.currentTimeMillis() + ".jpg";
Map<String, Object> result1 = cdnDownload(cdnthumburl, aeskey,
Step4_DownloadImage.CdnType.NORMAL_IMAGE,
tempImagePath);
// 3. 上传到阿里云OSS
Map<String, Object> result = uploadToOSS(tempImagePath);
tempWxInfo.setImageUrl((String) result.get("url"));
// 4. 删除本地临时文件
File tempFile = new File(tempImagePath);
tempFile.delete();
}
3.5 阿里云OSS上传实现
public class Step1_UploadToOss {
private static final String ACCESS_KEY_ID = "your_access_key";
private static final String ACCESS_KEY_SECRET = "your_secret";
private static final String BUCKET_NAME = "your_bucket";
private static final String ENDPOINT = "https://oss-cn-beijing.aliyuncs.com";
public static Map<String, Object> uploadToOSS(String imagePath) {
Map<String, Object> result = new HashMap<>();
OSS ossClient = new OSSClientBuilder().build(ENDPOINT, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
try {
// 生成唯一文件名
String fileExtension = imagePath.substring(imagePath.lastIndexOf(".") + 1);
String timestamp = LocalDateTime.now().format(
DateTimeFormatter.ofPattern("yyyyMMddHHmmss"));
String uniqueFilename = timestamp + "_" +
UUID.randomUUID().toString().substring(0, 8) + "." + fileExtension;
String ossPath = "wx_clue_pic/" + uniqueFilename;
// 执行上传
PutObjectRequest putObjectRequest = new PutObjectRequest(
BUCKET_NAME, ossPath, new File(imagePath));
ossClient.putObject(putObjectRequest);
// 生成访问URL
String url = "https://" + BUCKET_NAME + "." +
ENDPOINT.replace("https://", "") + "/" + ossPath;
result.put("url", url);
} finally {
ossClient.shutdown();
}
return result;
}
}
3.6 消息整合算法
将图片消息与关联的文本消息合并是系统的核心逻辑:
public static List<WxInfo> integrateWxInfos(List<WxInfo> data) {
// 分离图片消息和文本消息
List<WxInfo> imageMsgs = new ArrayList<>();
List<WxInfo> textMsgs = new ArrayList<>();
for (WxInfo msg : data) {
if (msg.getSendContent() != null &&
msg.getSendContent().contains("发送的图片消息")) {
imageMsgs.add(msg);
} else {
textMsgs.add(msg);
}
}
// 对每个图片消息,找到同群同发送者且时间最近的文本消息
for (WxInfo imgMsg : imageMsgs) {
WxInfo closestTextMsg = null;
long minDiff = Long.MAX_VALUE;
for (WxInfo textMsg : textMsgs) {
if (Objects.equals(textMsg.getGroupName(), imgMsg.getGroupName()) &&
Objects.equals(textMsg.getSender(), imgMsg.getSender())) {
long timeDiff = Math.abs(textMsg.getSendTime() - imgMsg.getSendTime());
if (timeDiff < minDiff && timeDiff <= 300) { // 5分钟窗口
minDiff = timeDiff;
closestTextMsg = textMsg;
}
}
}
// 合并图片URL
if (closestTextMsg != null) {
if (closestTextMsg.getImageUrl() == null) {
closestTextMsg.setImageUrl(imgMsg.getImageUrl());
} else {
closestTextMsg.setImageUrl(closestTextMsg.getImageUrl() +
"," + imgMsg.getImageUrl());
}
}
}
return textMsgs.stream()
.sorted(Comparator.comparing(WxInfo::getSendTime))
.collect(Collectors.toList());
}
3.7 手机号识别工具
public class Step4_PhoneNumberUtil {
private static final Pattern PHONE_PATTERN =
Pattern.compile("(?<!\\d)(1[3-9]\\d{9})(?!\\d)");
public static boolean containsPhoneNumber(String text) {
if (text == null || text.isEmpty()) {
return false;
}
Matcher matcher = PHONE_PATTERN.matcher(text);
return matcher.find();
}
}
3.8 HTTP请求发送
public class Step3_HttpUpload {
public static String sendPostRequest(String url, String jsonData) throws Exception {
HttpURLConnection connection = null;
try {
URL apiUrl = new URL(url);
connection = (HttpURLConnection) apiUrl.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
connection.setDoOutput(true);
connection.setConnectTimeout(5000);
connection.setReadTimeout(5000);
// 发送请求体
try (OutputStream os = connection.getOutputStream()) {
byte[] input = jsonData.getBytes(StandardCharsets.UTF_8);
os.write(input, 0, input.length);
}
// 读取响应
StringBuilder response = new StringBuilder();
try (BufferedReader br = new BufferedReader(
new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) {
String responseLine;
while ((responseLine = br.readLine()) != null) {
response.append(responseLine.trim());
}
}
return response.toString();
} finally {
if (connection != null) connection.disconnect();
}
}
}
3.9 数据库批量操作
系统支持对好友和群信息的批量插入和更新:
@PostMapping("/insert_friend")
public void insert_friend(@RequestBody List<Friend> insertFriendList) {
List<Friend> existFriendList = friendMapper.selectList(null);
Map<String, Friend> existFriendMap = existFriendList.stream()
.collect(Collectors.toMap(Friend::getUserId, Function.identity()));
List<Friend> newFriendList = new ArrayList<>();
List<Friend> updateFriendList = new ArrayList<>();
for (Friend friend : insertFriendList) {
if (existFriendMap.containsKey(friend.getUserId())) {
Friend existFriend = existFriendMap.get(friend.getUserId());
existFriend.setRemark(friend.getRemark());
updateFriendList.add(existFriend);
} else {
newFriendList.add(friend);
}
}
if (!newFriendList.isEmpty()) {
friendMapper.insertBatchFriend(newFriendList);
}
if (!updateFriendList.isEmpty()) {
friendMapper.updateBatchById(updateFriendList);
}
}
3.10 主流程控制器
@PostMapping("/receive_python_msg")
public void receive_python_msg(@RequestBody Object pythonMsg) throws Exception {
// 解析消息
String jsonStr = mapper.writeValueAsString(pythonMsg);
TextMsgJsonParse parse = gson.fromJson(jsonStr, TextMsgJsonParse.class);
WxInfo tempWxInfo = new WxInfo();
// 处理私聊消息(eventType=2004)或群聊消息(eventType=2000)
if (parse.eventType == 2004 || parse.eventType == 2000) {
// 构建WxInfo对象...
// 持久化存储
Constants.WX_INFO_LIST.add(tempWxInfo);
appendToFile(tempWxInfo);
}
// 当消息积累超过阈值时触发处理
if (Constants.WX_INFO_LIST.size() > 20) {
// 整合消息
List<WxInfo> finalDataList = integrateWxInfos(Constants.WX_INFO_LIST);
for (WxInfo wxInfo : finalDataList) {
if (wxInfo.getImageUrl() != null) {
// 调用AI识别车辆图片
String[] imgUrlArray = wxInfo.getImageUrl().split(",");
StringBuilder validUrls = new StringBuilder();
for (String url : imgUrlArray) {
String result = SmartEntity_3.SmartWork(
UUID.randomUUID().toString(), url);
ImageModelResJson res = gson.fromJson(result,
ImageModelResJson.class);
if (res.aCarOrADrivingLicense.equals("是")) {
validUrls.append(url).append(",");
}
}
// 如果包含手机号,推送至业务系统
if (validUrls.length() > 0 &&
(containsPhoneNumber(wxInfo.getSendContent()) ||
containsPhoneNumber(wxInfo.getSender()))) {
String totalInfo = buildInfoString(wxInfo, validUrls.toString());
String uploadInfo = SmartEntity_1.SmartWork(
UUID.randomUUID().toString(), totalInfo);
String url = "https://your-api-server/api/app/wxClue/import";
String response = sendPostRequest(url, JSON.toJSON(uploadInfo).toString());
// 记录日志
Car car = new Car();
car.setInfo(JSON.toJSON(uploadInfo).toString());
carMapper.insert(car);
}
}
}
// 清理5分钟前的旧消息
long currentTime = System.currentTimeMillis() / 1000;
Constants.WX_INFO_LIST = Constants.WX_INFO_LIST.stream()
.filter(wxInfo -> (currentTime - wxInfo.getSendTime()) <= 300)
.collect(Collectors.toList());
writeFullArray(Constants.WX_INFO_LIST);
}
}
四、关键技术点解析
4.1 并发安全设计
系统使用ReentrantReadWriteLock实现读写锁:
-
读操作可并发执行,提高查询效率
-
写操作互斥执行,保证数据一致性
-
文件损坏时自动备份恢复机制
4.2 时间窗口匹配算法
图片与文本的关联采用时间窗口+同群同发送者的双重匹配:
-
时间窗口设定为5分钟(300秒)
-
必须满足群名称和发送者完全一致
-
选择时间差最小的文本消息进行关联
4.3 临时文件管理
图片处理流程遵循严格的资源管理:
-
下载到临时目录
-
上传至OSS获取永久URL
-
立即删除本地临时文件
-
防止磁盘空间泄漏
4.4 AI智能体集成
系统集成两个AI智能体:
-
图片识别智能体:判断图片是否为车辆或驾驶证
-
信息提取智能体:从文本中提取车辆型号、价格、联系方式等结构化信息
五、性能优化建议
5.1 批量处理优化
-
消息积累阈值设为20-50条时触发批量处理
-
避免每条消息都触发AI调用,减少API开销
-
使用数据库批量插入代替逐条操作
5.2 内存管理
-
定期清理过期消息,只保留5分钟内的数据
-
使用流式处理避免大集合内存溢出
-
文件备份机制防止数据损坏
5.3 异常处理
try {
// 业务逻辑
} catch (Exception e) {
log.error("处理失败: {}", e.getMessage(), e);
// 记录失败消息到死信队列
saveToDeadLetterQueue(failedMessage);
}
六、部署与运维
6.1 环境要求
-
JDK 1.8+
-
SpringBoot 2.x
-
MySQL 5.7+
-
阿里云OSS SDK
6.2 配置管理
# application.yml wx: download-path: /data/wx_images/ batch-size: 20 time-window: 300 clean-interval: 600 oss: endpoint: https://oss-cn-beijing.aliyuncs.com bucket: your-bucket prefix: wx_clue_pic/
6.3 监控指标
-
消息接收QPS
-
图片处理耗时
-
AI接口响应时间
-
线索转化率
七、总结与展望
本文实现的微信消息智能处理系统具备以下特点:
-
高可用性:文件持久化+读写锁保证数据安全
-
可扩展性:模块化设计便于功能扩展
-
智能化:AI识别提升线索质量
-
自动化:全流程自动处理无需人工干预
未来可优化的方向:
-
引入消息队列解耦处理流程
-
增加更多AI识别维度(事故记录、保养记录等)
-
支持视频消息处理
-
实现实时流式计算降低延迟
该系统已在车辆交易场景中取得良好效果,日均处理消息数万条,线索转化率提升显著。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)