声明
本文章中所有内容仅供学习交流使用,不用于其他任何目的,抓包 内容、敏感网址、数据接口等均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关!侵权通过头像私信或名字简介叫我删除博客谢谢。
部分python代码

import asyncio

import execjs
import websockets
import json
import ssl
import time
from datetime import datetime
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
import base64

class FootballScoreClient:
    def __init__(self, ads_tracker_baidu):
        ads = ads_tracker_baidu
        ck = f'ads-tracker-baidu={ads}'
        self.url = "scorepush/football"
        self.headers = {
            'cookie':f'{ck}',
        }
        self.initial_message = {
        }
        self.websocket = None
        self.is_running = False

    async def connect(self):
        """建立WebSocket连接"""
        try:
            ssl_context = ssl.create_default_context()
            ssl_context.check_hostname = False
            ssl_context.verify_mode = ssl.CERT_NONE

            self.websocket = await websockets.connect(
                self.url,
                extra_headers=self.headers,
            )
            print(f"[{self._get_timestamp()}] 已连接到服务器")
            return True
        except Exception as e:
            print(f"[{self._get_timestamp()}] 连接失败: {e}")
            return False

    async def send_initial_message(self):
        """发送初始消息"""
        try:
            await self.websocket.send(json.dumps(self.initial_message))
            print(f"[{self._get_timestamp()}] 发送初始消息: {self.initial_message}")
        except Exception as e:
            print(f"[{self._get_timestamp()}] 发送消息失败: {e}")

    async def receive_messages(self):
        """接收消息循环"""
        try:
            async for message in self.websocket:
                print(f"[{self._get_timestamp()}] 收到消息: {message}")
                await self.handle_message(message)
        except websockets.exceptions.ConnectionClosed as e:
            print(f"[{self._get_timestamp()}] 连接关闭: {e}")
            self.is_running = False
        except Exception as e:
            print(f"[{self._get_timestamp()}] 接收消息错误: {e}")
            self.is_running = False

    async def handle_message(self, message):
        """处理接收到的消息"""
        try:
            data = decrypt_aes_cbc(message)
            # 在这里添加你的消息处理逻辑
            print(f"解析后的数据: {data}")
            pass
        except json.JSONDecodeError:
            print(f"无法解析消息: {message}")

    async def run(self, auto_reconnect=True):
        """主运行函数"""
        self.is_running = True

        while self.is_running:
            if await self.connect():
                await self.send_initial_message()
                await self.receive_messages()

            if auto_reconnect and self.is_running:
                print(f"[{self._get_timestamp()}] 5秒后重新连接...")
                await asyncio.sleep(5)
            else:
                break

    async def stop(self):
        """停止客户端"""
        self.is_running = False
        if self.websocket:
            await self.websocket.close()

    def _get_timestamp(self):
        """获取当前时间戳"""
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


async def main():
    cp = execjs.compile(open('1.js', 'r', encoding='utf-8', errors='ignore').read())
    ads_tracker_baidu = cp.call('getAdsTrackerBaidu').split("ads-tracker-baidu=")[-1]
    print(ads_tracker_baidu)
    client = FootballScoreClient(ads_tracker_baidu)
    try:
        await client.run()
    except KeyboardInterrupt:
        print("\n正在关闭客户端...")
        await client.stop()


if __name__ == "__main__":
    asyncio.run(main())

结果

总结

1.出于安全考虑,本章未提供完整流程,调试环节省略较多,只提供大致思路,具体细节要你自己还原,相信你也能调试出来。

2.具体更多细节请看名字进入详情了解更多细节,具体细节要你自己还原,相信你也能调试出来。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐