全知全能技能
name: omniscient
author: 王教成 Wang Jiaocheng (波动几何)
description: >
全知全能技能 — 整合认知套件、执行框架、系统控制三大能力层,并配备编排引擎。
认知层:四种思维操作码(直用/改进/迁移/构建)覆盖所有思考任务;
执行层:大语言模型 + 命令执行工具,自动化代码生成与脚本执行;
操控层:Windows桌面软件、系统硬件、串口设备、物联网平台、图形界面自动化;
编排引擎:将所有操作统一为基元,自由组合为任意复杂度的执行链条。
此技能应用于任何任务,无例外。
全知全能技能
认知套件(想)+ 执行框架(编程做)+ 系统控制(操控做),覆盖所有类型的任务。
所有操作类型——脚本命令、代码生成、工具调用、技能插件、纯认知输出——均为统一基元,可由编排引擎自由组合为任意复杂度的执行链条。
架构总览
┌─────────────────────────────────┐
│ 用户请求 │
└──────────────┬──────────────────┘
↓
┌──────────────────────────────────┐
│ 任务路由器 │
│ 判断任务类型 → 选择执行层 │
└──────┬──────────┬──────────┬─────┘
↓ ↓ ↓
┌────────────┐ ┌──────────────┐ ┌──────────────┐
│ 认知层 │ │ 编程执行层 │ │ 系统操控层 │
└────────────┘ └──────────────┘ └──────────────┘
纯大语言模型 大语言模型+命令 调用专用脚本
执行工具
─────────────────────────────────
┌──────────────────────────────────┐
│ 编排引擎 │
│ 将三层的任意原子操作 │
│ 组合为执行链条 │
└──────────────────────────────────┘
三层各有所长,按任务主要操作类型选择入口:
- 认知为主 → 认知层(模式提示词套件 + 大语言模型)
- 代码执行为主 → 编程执行层(大语言模型 + 命令执行工具)
- 系统操控为主 → 系统操控层(专用 Python 脚本)
超级任务(跨层、多步骤、复杂编排)→ 编排引擎调度三层原子操作构成执行链条。
任务路由器
加载本技能后,按以下逻辑自动判断任务类型并选择执行层:
收到用户任务
│
├─ 涉及多种操作类型或超多步骤? ──→ 编排引擎
│ (跨层组合、命令序列、自动化流水线...)
│
├─ 需要操控电脑/硬件/设备? ──→ 第三层:系统控制
│ (开关窗口、调音量、截图、点鼠标、串口、物联网...)
│
├─ 需要写代码/跑脚本? ──→ 第二层:执行框架
│ (数据处理、文件操作、接口调用、构建应用...)
│
└─ 分析/思考/创作? ──→ 第一层:认知套件
(写作、分析、推理、规划、翻译、总结...)
混合任务(如"写个脚本帮我批量处理图片然后调低亮度"):
- 先用第一层构思方案
- 再用第二层写脚本执行
- 最后用第三层调硬件参数(如需)
注意:所有层和引擎可自由组合,不必非此即彼。路由器只是默认起点,可根据任务需要自动跨层调度。
编排引擎
将三层的任意原子操作统一为基元,组合为执行链条,实现超级复杂、超多步骤的任务。
基元定义
所有可执行的操作均为原子基元,共五类:
| 类别 | 来源 | 示例 |
|---|---|---|
| 脚本命令基元 | 第三层六大模块 | window_manager.py list、process_manager.py start、hardware_controller.py volume set --level 50、serial_comm.py list、iot_controller.py homeassistant on --entity-id light.xxx、gui_controller.py screenshot full |
| 代码执行基元 | 第二层执行框架 | 大语言模型即时生成的 Python 脚本、命令行指令 |
| 工具调用基元 | 宿主环境内置工具 | 文件读写、命令执行、搜索、网页访问等 |
| 技能插件基元 | 已安装的其他技能 | 通过 use_skill 加载的外部技能能力 |
| 认知输出基元 | 第一层四种模式 | 大语言模型直接生成的分析、文案、方案、推理结果 |
编排协议
基于模式直用提示词的核心机制——“复杂任务分拆成简单任务交给基元构成链条执行”——扩展为完整的多步编排协议:
1. 拆解
大语言模型将用户任务拆解为有序的基元序列。
每个基元标注类别(脚本/代码/工具/插件/认知)和预期输出。
2. 规划
确定基元之间的依赖关系和数据流向。
前序基元的输出可作为后序基元的输入。
3. 执行
按序执行每个基元,实时检查输出是否符合预期。
每个基元执行后,大语言模型判断是否需要调整后续步骤。
4. 修复
任一基元执行失败时,大语言模型分析错误原因,
自主选择:重试当前基元 / 替换为等价基元 / 调整后续计划 / 终止并报告。
5. 汇总
所有基元执行完毕后,将整体结果汇总为人类可读的输出。
编排规则
- 优先使用大语言模型:能用认知输出基元解决的步骤,不生成代码;能用单条命令解决的,不写脚本。
- 安全规则贯穿:编排链中的每个基元都受第二层安全机制约束,高危操作必须用户确认。
- 智能插桩:可在基元之间插入认知输出基元做中间判断(如"分析上一步输出,决定下一步怎么做")。
- 条件分支:支持"如果…则…"逻辑——大语言模型根据中间结果动态选择后续基元。
- 循环迭代:支持"重复执行直到…"逻辑——大语言模型判断循环条件并控制迭代。
- 并行编排:无依赖关系的基元可并行发起,减少总耗时。
- 可中断恢复:超长任务可设置检查点,支持从断点恢复。
编排示例
示例1:自动化办公流水线(3层联动)
任务:"打开Excel,读取A列数据,生成每行的汇总报告,保存到桌面,然后调低屏幕亮度"
基元序列:
[脚本] window_manager.py activate --title "Excel" ← 第三层
[脚本] gui_controller.py screenshot full ← 第三层(确认状态)
[认知] 识别Excel内容,规划读取方案 ← 第一层
[代码] Python脚本读取Excel数据并生成报告 ← 第二层
[工具] 保存文件到桌面 ← 工具基元
[脚本] hardware_controller.py screen brightness --level 30 ← 第三层
示例2:智能设备联动(生成代码 + 脚本编排)
任务:"监测串口传感器温度,超过阈值自动发微信通知并开风扇"
基元序列:
[脚本] serial_comm.py list ← 第三层(确认端口)
[代码] 生成Python监听脚本:读取串口 + 阈值判断 ← 第二层
[脚本] iot_controller.py homeassistant on --entity-id switch.fan ← 第三层
[代码] 生成通知逻辑:超限时触发微信提醒 ← 第二层
示例3:研究分析全链路(认知 + 工具 + 代码)
任务:"调研三篇论文的核心观点,生成对比表格,再做成可视化图表"
基元序列:
[工具] 网页搜索并获取三篇论文内容 ← 工具基元
[认知] 模式直用:提取每篇论文核心观点 ← 第一层
[认知] 模式改进:对比分析,找出异同 ← 第一层
[工具] 生成对比表格Markdown ← 工具基元
[代码] 生成Python可视化图表脚本并执行 ← 第二层
第一层:认知套件
四种思维操作码,覆盖所有认知任务。根据任务性质选择对应模式。未指定模式时,默认使用直用模式。
模式直用(精确执行明确任务)
执行协议等待指令:简单任务用自适应身份叠加输入处理输出结构基元执行,复杂任务分拆成简单任务交给基元构成链条执行,默认不展示输入处理输出细节但用户可以要求展示。优先根据提示词使用大语言模型完成任务,非必要不调用工具技能插件,把工具技能插件作为身份或基元进行规划调用。
基元范围:此处的"基元"不限于纯认知输出,而是涵盖编排引擎定义的全部五类基元——脚本命令、代码执行、工具调用、技能插件、认知输出。大语言模型根据任务需要将任意类别的基元纳入执行链条。
适用场景:用户有明确目标,需要精确执行。如翻译、总结、格式转换、数据分析、文档撰写、自动化流水线、跨层多步任务等。
模式改进(优化已有方案)
按需生成新方案自选创新元框架:第一性原理、逆向思维、辩证综合、随机性驱动、涌现生成、演化迭代、系统动力学、约束驱动、故事叙述和游戏化。
适用场景:优化/改进/升级现有方案。如重构代码、改进文案、优化流程、升级设计等。
模式迁移(跨领域模式搬移)
作为模式转换器分析提供的旧具体事物的底层结构与原理(得到抽象模式)运用到指定的新具体事物(生成全新的具体方案)。
适用场景:把 A 领域的成功模式搬到 B 领域。如把游戏化机制应用到教育、把电商推荐逻辑应用到内容分发等。
模式构建(从零创造)
作为可能性空间导航器把两个概念解构成基本维度建立维度矩阵随机选择看似无关的维度组合强制连接推导可能性发展(评估逻辑距离形成可能性集群识别无人探索区域)输出最有潜力最激进最被忽视可能性(生成几个反常识方案)。
适用场景:从零探索全新方向。如创新产品设计、新商业模式、前沿技术方案等。
第二层:执行框架
基于"大语言模型 + 命令执行工具"架构的自动化执行协议。无需密钥,无需额外配置,开箱即用。
四步工作流
思考 → 执行 → 修复 → 总结
- 思考 — 大语言模型理解任务意图,判断复杂度,决定生成命令行指令还是 Python 脚本
- 执行 — 写入文件 → 执行命令/脚本 → 捕获输出
- 修复 — 出错时大语言模型分析错误、自动修复代码、重试(最多2次)
- 总结 — 将技术输出翻译成人类可读的自然语言结果
能力范围
| 能力 | 描述 |
|---|---|
| 命令行指令 | 文件操作、进程管理、系统管理 |
| Python 脚本 | 数据处理、网络爬虫、机器学习、图像处理 |
| 命令行工具 | git、docker、ffmpeg、aws 等任意工具 |
| 接口调用 | 任意 HTTP 接口 |
核心原理:命令执行工具能运行脚本 → 脚本能覆盖所有代码执行类任务。
安全机制
| 级别 | 示例 | 处理方式 |
|---|---|---|
| 🔴 高危 | rm -rf /、format C: |
必须用户确认 |
| 🟡 中危 | pip uninstall、sudo |
警告提示 |
| 🟢 低危 | ls、cat、python script.py |
直接执行 |
第三层:系统控制
通过专用 Python 脚本统一控制 Windows 桌面软件、系统硬件、串口设备和物联网平台。
运行环境
- Python 路径:
~/.workbuddy/binaries/python/versions/{version}/python.exe({version}为当前可用版本,如3.13.12) - 虚拟环境路径:
~/.workbuddy/binaries/python/envs/default/ - 脚本目录:
~/.workbuddy/skills/omniscient/scripts/ - 执行模式: 始终使用命令执行工具运行脚本
若虚拟环境不存在,先创建:
python -m venv ~/.workbuddy/binaries/python/envs/default
~/.workbuddy/binaries/python/envs/default/Scripts/pip install pyautogui pillow pyserial requests
六大控制模块
| 模块 | 脚本 | 覆盖范围 |
|---|---|---|
| 窗口管理 | window_manager.py |
桌面窗口控制:列表、激活、关闭、最小化、最大化、调整大小、发送按键 |
| 进程管理 | process_manager.py |
系统进程:列表、终止、启动、详情、系统概览 |
| 硬件控制 | hardware_controller.py |
音量/亮度/电源/网络/USB |
| 串口通信 | serial_comm.py |
Arduino/ESP32/串口设备(自动安装 pyserial) |
| 物联网控制 | iot_controller.py |
Home Assistant/HTTP 接口/智能家居(自动安装 requests) |
| 图形界面自动化 | gui_controller.py |
鼠标/键盘/截图/文字识别/图像识别(自动安装 pyautogui+pillow) |
所有脚本独立运行,无互相依赖。
安全规则
- 破坏性操作必须用户确认(关机、重启、杀进程、关窗口、禁用网卡)
- 先查询再操作(先
list再close,先list --name再kill) - 电源操作需警告(关机/重启/睡眠/休眠前必须确认)
- 禁用网络适配器需用户确认(脚本通过确认机制保护,防止意外断网)
- 串口操作先列出端口确认
模块调用示例
窗口管理:
- “关闭Chrome” →
window_manager.py list→ 找到 Chrome →window_manager.py close --title "Chrome" - “把微信调到前台” →
window_manager.py activate --title "微信"
进程管理:
- “启动VS Code” →
process_manager.py start "code" - “关掉所有记事本” →
process_manager.py kill --name notepad
硬件控制:
- “把音量调到50” →
hardware_controller.py volume set --level 50 - “锁屏” →
hardware_controller.py power lock - “扫描WiFi” →
hardware_controller.py network wifi
串口通信:
- “有哪些串口” →
serial_comm.py list - “给Arduino发指令开灯” →
serial_comm.py send --port COM3 --data "LED_ON"
物联网控制:
- “打开客厅灯” →
iot_controller.py homeassistant --url ... --token ... on --entity-id light.living_room
图形界面自动化:
- “截图” →
gui_controller.py screenshot full - “点击(500,300)” →
gui_controller.py mouse click --x 500 --y 300 - “输入Hello World” →
gui_controller.py keyboard type --text "Hello World" - “识别屏幕上的文字” →
gui_controller.py visual ocr
未知设备处理流程
- 检查能否作为进程启动 →
process_manager.py start - 检查是否有窗口 →
window_manager.py list - 截图查看 →
gui_controller.py screenshot full - 文字识别 →
gui_controller.py visual ocr - 图像匹配点击 →
gui_controller.py visual click-image --template "icon.png" - 鼠标键盘直接操控 →
gui_controller.py mouse click - 检查是否有接口 →
iot_controller.py http - 检查 USB 连接 →
hardware_controller.py usb list→serial_comm.py list - 建议替代方案 → MCP 服务或自定义脚本
详细命令语法见 references/command_reference.md。
系统控制命令速查
脚本位置
所有脚本位于:~/.workbuddy/skills/omniscient/scripts/
1. window_manager.py - 窗口管理
列出所有可见窗口
python window_manager.py list
返回包含进程ID、标题和进程名的 JSON 数组。
激活(置于前台)
python window_manager.py activate --title "Notepad"
python window_manager.py activate --pid 1234
关闭窗口
python window_manager.py close --title "Untitled - Notepad"
python window_manager.py close --pid 1234
最小化 / 最大化
python window_manager.py minimize --title "Chrome"
python window_manager.py maximize --pid 1234
调整大小和移动
python window_manager.py resize --pid 1234 --x 100 --y 100 --width 800 --height 600
发送按键(SendKeys 格式)
python window_manager.py send-keys --title "Notepad" --text "Hello World"
SendKeys 特殊按键:{ENTER}, {TAB}, {ESC}, {F1}, ^(c) 表示 Ctrl+C,%(f) 表示 Alt+F
2. process_manager.py - 进程管理
列出所有进程
python process_manager.py list
python process_manager.py list --name chrome
终止进程
python process_manager.py kill --name notepad
python process_manager.py kill --pid 1234
python process_manager.py kill --name chrome --force
启动进程
python process_manager.py start "notepad.exe"
python process_manager.py start "code" --dir "C:\Projects"
查看进程详情
python process_manager.py info --pid 1234
系统资源概览
python process_manager.py system
3. hardware_controller.py - 硬件控制
音量
python hardware_controller.py volume get
python hardware_controller.py volume set --level 75
python hardware_controller.py volume mute
注意:精确音量控制需要安装 NirCmd(nircmd.com)
屏幕亮度
python hardware_controller.py screen brightness
python hardware_controller.py screen brightness --level 50
适用于笔记本电脑屏幕和支持 DDC/CI 的显示器。
显示器信息
python hardware_controller.py screen info
电源管理
python hardware_controller.py power lock
python hardware_controller.py power sleep
python hardware_controller.py power hibernate
python hardware_controller.py power shutdown --delay 30
python hardware_controller.py power restart --delay 30
python hardware_controller.py power cancel
网络
python hardware_controller.py network adapters
python hardware_controller.py network enable --name "Wi-Fi"
python hardware_controller.py network disable --name "Ethernet"
python hardware_controller.py network wifi
python hardware_controller.py network info
USB 设备
python hardware_controller.py usb list
4. serial_comm.py - 串口通信
列出串口
python serial_comm.py list
自动检测波特率
python serial_comm.py detect --port COM3
发送数据
python serial_comm.py send --port COM3 --data "LED_ON" --baud 9600
接收数据
python serial_comm.py receive --port COM3 --baud 9600 --timeout 5
发送并等待响应
python serial_comm.py chat --port COM3 --data "GET_TEMP" --baud 9600
监听模式(实时)
python serial_comm.py monitor --port COM3 --baud 9600 --duration 30
依赖:pip install pyserial(首次使用时自动安装)
5. iot_controller.py - 物联网 / 智能家居
Home Assistant
# 列出所有实体
python iot_controller.py homeassistant --url http://192.168.1.100:8123 --token YOUR_TOKEN list
# 查看实体状态
python iot_controller.py homeassistant --url http://192.168.1.100:8123 --token YOUR_TOKEN state --entity-id light.living_room
# 开启/关闭/切换
python iot_controller.py homeassistant --url http://192.168.1.100:8123 --token YOUR_TOKEN on --entity-id light.living_room
python iot_controller.py homeassistant --url http://192.168.1.100:8123 --token YOUR_TOKEN off --entity-id light.living_room
python iot_controller.py homeassistant --url http://192.168.1.100:8123 --token YOUR_TOKEN toggle --entity-id switch.fan
# 调用服务并传参
python iot_controller.py homeassistant --url http://192.168.1.100:8123 --token YOUR_TOKEN on --entity-id light.living_room --data '{"brightness_pct": 50, "color_temp": 350}'
# 调用任意服务
python iot_controller.py homeassistant --url http://192.168.1.100:8123 --token YOUR_TOKEN call --domain climate --service set_temperature --entity-id climate.bedroom --data '{"temperature": 24}'
通用 HTTP/REST
# GET 请求
python iot_controller.py http --url http://192.168.1.50:8080 get --path /api/status
python iot_controller.py http --url http://192.168.1.50:8080 get --path /api/data --header "Authorization: Bearer TOKEN"
# POST 请求
python iot_controller.py http --url http://192.168.1.50:8080 post --path /api/command --body '{"action":"on"}'
# PUT 请求
python iot_controller.py http --url http://192.168.1.50:8080 put --path /api/config --body '{"name":"updated"}'
米家 / 小米
python iot_controller.py mijia discover
需要:pip install miio(需手动安装)
依赖:pip install requests(首次使用时自动安装)
常用场景
Arduino 灯光控制
- 通过 USB 连接 Arduino
- 列出端口:
python serial_comm.py list - 发送指令:
python serial_comm.py send --port COM3 --data "LED_ON" --baud 9600 - 读取传感器:
python serial_comm.py chat --port COM3 --data "READ_TEMP" --baud 9600
智能家居自动化
- 列出灯光:
python iot_controller.py homeassistant --url ... --token ... list - 开灯:
python iot_controller.py homeassistant --url ... --token ... on --entity-id light.bedroom - 调亮度:
python iot_controller.py homeassistant --url ... --token ... on --entity-id light.bedroom --data '{"brightness_pct":30}'
应用自动化
- 查找窗口:
python window_manager.py list - 激活:
python window_manager.py activate --title "Excel" - 发送输入:
python window_manager.py send-keys --title "Excel" --text "^(s)"(Ctrl+S) - 关闭:
python window_manager.py close --title "Excel"
6. gui_controller.py - 图形界面自动化
鼠标控制
python gui_controller.py mouse position
python gui_controller.py mouse move --x 500 --y 300 [--duration 0.3]
python gui_controller.py mouse click [--x 500] [--y 300]
python gui_controller.py mouse right-click [--x 500] [--y 300]
python gui_controller.py mouse double-click [--x 500] [--y 300]
python gui_controller.py mouse drag --start-x 100 --start-y 200 --end-x 500 --end-y 400 [--duration 0.5]
python gui_controller.py mouse scroll [--x 500] [--y 300] [--clicks 5] [--direction up|down]
注意:坐标为屏幕绝对坐标,不指定时使用鼠标当前位置。
键盘控制
python gui_controller.py keyboard type --text "Hello World"
python gui_controller.py keyboard press --keys "ctrl+c"
python gui_controller.py keyboard press --keys "alt+tab"
python gui_controller.py keyboard key-down --key shift
python gui_controller.py keyboard key-up --key shift
press 支持单键(如 enter、esc)和多键组合(如 ctrl+shift+esc)。
截图
python gui_controller.py screenshot full
python gui_controller.py screenshot active-window
python gui_controller.py screenshot region --x 0 --y 0 --width 800 --height 600
python gui_controller.py screenshot list
python gui_controller.py screenshot size
视觉识别 / OCR
python gui_controller.py visual ocr [--x 0] [--y 0] [--width 1920] [--height 1080] [--lang chi_sim+eng]
python gui_controller.py visual find --template "icon.png" [--confidence 0.9]
python gui_controller.py visual click-image --template "button.png" [--confidence 0.9] [--offset-x 0] [--offset-y 0]
python gui_controller.py visual find-color --color "#FF0000" [--x 0] [--y 0] [--width 1920] [--height 1080]
python gui_controller.py visual pixel --x 100 --y 200
ocr:默认全屏识别,支持中英文混合(chi_sim+eng)find/click-image:模板图像可在截图目录或assets/子目录下搜索,支持相对路径find-color:在指定区域(或全屏)内按颜色查找像素,支持十六进制颜色(如#FF0000)pixel:获取指定坐标像素的 RGB 和十六进制颜色值
依赖:pip install pyautogui pillow(首次使用时自动安装)
OCR 依赖:pip install pytesseract(可选,需要 Tesseract 引擎并配置语言包)
#!/usr/bin/env python3
"""
Shared utilities for system-controller scripts.
Handles Windows encoding issues and PowerShell execution.
Security:
- run_ps(): Uses list-based subprocess (no shell=True) for PowerShell execution
- run_cmd(): Deprecated - kept for backward compat but with strict validation
- All command paths are validated before execution
"""
import subprocess
import sys
import os
import shlex
def run_ps(script, timeout=30):
"""
Execute PowerShell script safely using list-based subprocess invocation.
Security: Does NOT use shell=True. The script text is passed as an argument
to powershell.exe via the -Command parameter. This prevents shell injection
from the Python side since subprocess handles argument escaping.
Args:
script: PowerShell script text to execute
timeout: Maximum execution time in seconds (default 30)
Returns:
Tuple of (stdout: str, stderr: str, returncode: int)
"""
env = os.environ.copy()
env["PYTHONIOENCODING"] = "utf-8"
# Prepend encoding setup to ensure UTF-8 output
encoding_setup = (
"[Console]::InputEncoding = [Console]::OutputEncoding = "
"[System.Text.Encoding]::UTF8; "
"$OutputEncoding = [System.Text.Encoding]::UTF8; "
)
full_script = encoding_setup + script
# Validate: reject scripts that try to break out of PowerShell context
dangerous_markers = ['--%', 'cmd /c', 'Invoke-Expression', 'iex ', 'Start-Process -FilePath']
lower_script = full_script.lower()
for marker in dangerous_markers:
if marker in lower_script:
return "", f"ERROR: Script contains blocked pattern '{marker}'", -1
try:
# Use list-based invocation (no shell=True) for safety
result = subprocess.run(
["powershell.exe", "-NoProfile", "-NonInteractive", "-Command", full_script],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
timeout=timeout,
env=env,
shell=False # Explicitly no shell
)
stdout = (result.stdout or "").strip()
stderr = (result.stderr or "").strip()
return stdout, stderr, result.returncode
except subprocess.TimeoutExpired:
return "", "ERROR: Command timed out", -1
except FileNotFoundError:
return "", "ERROR: powershell.exe not found on system PATH", -1
except Exception as e:
return "", f"ERROR: {e}", -1
def run_cmd(command, timeout=30):
"""
Execute a shell command with safety constraints.
WARNING: This function uses shell=True which carries inherent risk.
It should only be used for simple, trusted commands where run_ps is not applicable.
Security measures:
- Command must be a string (not a list that could be misinterpreted)
- Command length is limited
- Obvious injection patterns are rejected
Prefer run_ps() for any PowerShell operation.
Args:
command: Command string to execute
timeout: Maximum execution time in seconds
Returns:
Tuple of (stdout: str, stderr: str, returncode: int)
"""
if not isinstance(command, str):
return "", "ERROR: run_cmd requires string command", -1
# Length limit to prevent abuse
if len(command) > 10000:
return "", "ERROR: Command exceeds maximum length of 10000 characters", -1
# Reject dangerous shell metacharacters that enable injection
dangerous_chars = ['`$', '$(', '${', '`', ';', '&&', '||']
cmd_str = command.strip()
for char in dangerous_chars:
if char in cmd_str:
return "", f"ERROR: Command blocked - contains '{char}' (use list-based invocation)", -1
env = os.environ.copy()
env["PYTHONIOENCODING"] = "utf-8"
try:
result = subprocess.run(
cmd_str,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
timeout=timeout,
shell=True, # Intentional but guarded above
env=env
)
stdout = (result.stdout or "").strip()
stderr = (result.stderr or "").strip()
return stdout, stderr, result.returncode
except subprocess.TimeoutExpired:
return "", "ERROR: Command timed out", -1
except Exception as e:
return "", f"ERROR: {e}", -1
def json_safe(obj):
"""Ensure output is serializable, replacing None with null."""
if obj is None:
return "null"
return obj
#!/usr/bin/env python3
"""
Window Manager - Windows desktop application control via PowerShell UI Automation.
Requirements: Windows 10/11, PowerShell 5.1+
Dependencies: None (uses built-in PowerShell and Windows APIs)
"""
import subprocess
import json
import sys
import time
import os
import re
# Fix print encoding for Windows
if sys.stdout.encoding != 'utf-8':
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if sys.stderr.encoding != 'utf-8':
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from common import run_ps as _run_ps
# ========== Safety: Input Validation ==========
DANGEROUS_CHARS_PATTERN = re.compile(r'[;&|`$(){}[\]!<>\n\r]')
# SendKeys special characters that could be abused for injection
# SendKeys uses {} for special keys like {ENTER}, {TAB}, etc.
# We allow only well-known SendKeys sequences
SEND_KEYS_ALLOWED_PATTERN = re.compile(
r'^[\w\s\.,;\:\[\]\{\}\+\=\-\*\/\%\#\@\!\?\&\(\)\"\'~`\|]+$'
)
# Known safe SendKeys special key names (whitelist)
SEND_KEYS_SPECIAL_KEYS = {
'ENTER', 'TAB', 'ESC', 'ESCAPE', 'SPACE', 'BACKSPACE', 'BS',
'DELETE', 'DEL', 'INSERT', 'INS', 'HOME', 'END', 'PGUP', 'PGDN',
'UP', 'DOWN', 'LEFT', 'RIGHT', 'F1', 'F2', 'F3', 'F4', 'F5',
'F6', 'F7', 'F8', 'F9', 'F10', 'F11', 'F12', 'F13', 'F14', 'F15', 'F16',
'BREAK', 'CAPSLOCK', 'NUMLOCK', 'SCROLLLOCK',
'PRTSC', 'PRINTSCREEN', '+', '^', '%', '~', '(', ')',
'LWIN', 'RWIN', 'APP',
}
def _validate_string(value, field_name="input"):
"""Validate that a string doesn't contain shell injection characters."""
if not value:
return True
if DANGEROUS_CHARS_PATTERN.search(value):
raise ValueError(
f"ERROR: {field_name} contains forbidden shell characters."
)
return True
def _sanitize_ps_string(value):
"""Sanitize string for embedding in PowerShell single-quoted strings."""
if value is None:
return "''"
_validate_string(value)
escaped = value.replace("'", "''")
return f"'{escaped}'"
def _validate_sendkeys_text(text):
"""
Validate and sanitize text for SendKeys::SendWait.
Rejects potentially dangerous escape sequences.
"""
if not text:
raise ValueError("ERROR: Text to send cannot be empty")
# Check length limit to prevent buffer abuse
if len(text) > 10000:
raise ValueError("ERROR: SendKeys text exceeds maximum length of 10000 characters")
# Check for obviously dangerous patterns
dangerous_patterns = ['{EXIT}', '{QUIT}', '{KILL}', '{SHUTDOWN}', '{REBOOT}',
'{FORMAT}', '{DELETE}', '{RM ', '{DEL ']
upper_text = text.upper()
for pattern in dangerous_patterns:
if pattern in upper_text:
raise ValueError(
f"ERROR: SendKeys contains blocked pattern '{pattern}'."
)
# Validate no shell injection chars
_validate_string(text, "SendKeys text")
return True
def list_windows():
"""List all visible windows with title, process name, position, and size."""
script = r"""
Add-Type -AssemblyName UIAutomationClient
$windows = Get-Process | Where-Object { $_.MainWindowTitle -ne '' } | ForEach-Object {
try {
$rect = $_.MainWindowHandle | ForEach-Object {
$r = New-Object System.Drawing.Rectangle
[System.Drawing.Rectangle]::Intersect([System.Drawing.Rectangle]::Empty, $r)
[void][System.Runtime.InteropServices.Marshal]::GetClassLongHash($_.Handle)
}
[PSCustomObject]@{
PID = $_.Id
Title = $_.MainWindowTitle
ProcessName = $_.ProcessName
MainWindowHandle = $_.MainWindowHandle.ToInt64()
}
} catch { }
}
$windows = $windows | Sort-Object -Property Title -Unique | Where-Object { $_.Title -ne '' }
$windows | ConvertTo-Json -Compress
"""
stdout, stderr, code = _run_ps(script)
if code != 0 or not stdout:
script2 = """
Get-Process | Where-Object { $_.MainWindowTitle -ne '' } | Select-Object Id, ProcessName, MainWindowTitle | ConvertTo-Json -Compress
"""
stdout, stderr, code = _run_ps(script2)
return stdout
def activate_window(pid=None, title=None):
"""Bring a window to the foreground by PID or title substring."""
if pid:
try:
pid_val = int(pid)
if pid_val < 1 or pid_val > 2147483647:
return "ERROR: Invalid PID"
except (ValueError, TypeError):
return f"ERROR: Invalid PID: {pid}"
script = f"""
Add-Type @"
using System;
using System.Runtime.InteropServices;
public class Win32 {{
[DllImport("user32.dll")]
public static extern bool SetForegroundWindow(IntPtr hWnd);
[DllImport("user32.dll")]
public static extern bool ShowWindow(IntPtr hWnd, int nCmdShow);
[DllImport("user32.dll")]
public static extern bool IsIconic(IntPtr hWnd);
}}
"@
$proc = Get-Process -Id {pid_val} -ErrorAction SilentlyContinue
if ($proc) {{
$hwnd = $proc.MainWindowHandle
if ([Win32]::IsIconic($hwnd)) {{ [Win32]::ShowWindow($hwnd, 9) }}
[Win32]::SetForegroundWindow($hwnd)
Write-Output "OK: Activated window (PID: {pid_val}, Title: $($proc.MainWindowTitle))"
}} else {{
Write-Output "ERROR: Process with PID {pid_val} not found"
}}
"""
elif title:
_validate_string(title, "window title")
escaped_title = title.replace("'", "''")
script = f"""
Add-Type @"
using System;
using System.Runtime.InteropServices;
public class Win32 {{
[DllImport("user32.dll")]
public static extern bool SetForegroundWindow(IntPtr hWnd);
[DllImport("user32.dll")]
public static extern bool ShowWindow(IntPtr hWnd, int nCmdShow);
[DllImport("user32.dll")]
public static extern bool IsIconic(IntPtr hWnd);
}}
"@
$proc = Get-Process | Where-Object {{ $_.MainWindowTitle -like '*{escaped_title}*' }} | Select-Object -First 1
if ($proc) {{
$hwnd = $proc.MainWindowHandle
if ([Win32]::IsIconic($hwnd)) {{ [Win32]::ShowWindow($hwnd, 9) }}
[Win32]::SetForegroundWindow($hwnd)
Write-Output "OK: Activated window (PID: $($proc.Id), Title: $($proc.MainWindowTitle))"
}} else {{
Write-Output "ERROR: No window found matching '{title}'"
}}
"""
else:
return "ERROR: Provide --pid or --title"
stdout, stderr, code = _run_ps(script)
return stdout if stdout else stderr
def close_window(pid=None, title=None):
"""Close a window by PID or title substring.
Safety improvement: Uses graceful close with configurable delay before force-kill.
Default is 3 seconds grace period (up from original 1 second).
"""
if pid:
try:
pid_val = int(pid)
if pid_val < 1 or pid_val > 2147483647:
return "ERROR: Invalid PID"
except (ValueError, TypeError):
return f"ERROR: Invalid PID: {pid}"
script = f"""
$proc = Get-Process -Id {pid_val} -ErrorAction SilentlyContinue
if ($proc) {{
$proc.CloseMainWindow() | Out-Null
Start-Sleep -Seconds 3
if (!$proc.HasExited) {{
Write-Output "WARNING: Process did not exit gracefully after 3 seconds. Use --force with kill command if needed."
Write-Output "INFO: Window close request sent to PID {pid_val}"
}} else {{
Write-Output "OK: Closed window gracefully (PID: {pid_val})"
}}
}} else {{
Write-Output "ERROR: Process not found"
}}
"""
elif title:
_validate_string(title, "window title")
escaped_title = title.replace("'", "''")
script = f"""
$procs = Get-Process | Where-Object {{ $_.MainWindowTitle -like '*{escaped_title}*' }}
if ($procs) {{
$procs | ForEach-Object {{ $_.CloseMainWindow() | Out-Null }}
Start-Sleep -Seconds 3
$remaining = $procs | Where-Object {{ !$_.HasExited }}
if ($remaining) {{
$names = ($remaining.ProcessName | Sort-Object -Unique) -join ', '
Write-Output "WARNING: {$remaining.Count} process(es) did not close gracefully: $names"
Write-Output "INFO: Close request sent to matching '{title}' windows"
}} else {{
Write-Output "OK: Closed $($procs.Count) window(s) matching '{title}' gracefully"
}}
}} else {{
Write-Output "ERROR: No window found"
}}
"""
else:
return "ERROR: Provide --pid or --title"
stdout, stderr, code = _run_ps(script)
return stdout if stdout else stderr
def minimize_window(pid=None, title=None):
"""Minimize a window by PID or title substring."""
if pid:
try:
pid_val = int(pid)
if pid_val < 1 or pid_val > 2147483647:
return "ERROR: Invalid PID"
except (ValueError, TypeError):
return f"ERROR: Invalid PID: {pid}"
script = f'''
Add-Type @'
using System;
using System.Runtime.InteropServices;
public class WinMin {{
[DllImport("user32.dll")] public static extern bool ShowWindow(IntPtr hWnd, int nCmdShow);
}}
'@
$proc = Get-Process -Id {pid_val} -ErrorAction SilentlyContinue
if ($proc) {{
[WinMin]::ShowWindow($proc.MainWindowHandle, 6)
Write-Output "OK: Minimized (PID: {pid_val})"
}} else {{
Write-Output "ERROR: Process not found"
}}
'''
elif title:
_validate_string(title, "window title")
escaped_title = title.replace("'", "''")
script = f'''
Add-Type @'
using System;
using System.Runtime.InteropServices;
public class WinMin {{
[DllImport("user32.dll")] public static extern bool ShowWindow(IntPtr hWnd, int nCmdShow);
}}
'@
$proc = Get-Process | Where-Object {{ $_.MainWindowTitle -like '*{escaped_title}*' }} | Select-Object -First 1
if ($proc) {{
[WinMin]::ShowWindow($proc.MainWindowHandle, 6)
Write-Output "OK: Minimized: $($proc.MainWindowTitle)"
}} else {{
Write-Output "ERROR: No window found"
}}
'''
else:
return "ERROR: Provide --pid or --title"
stdout, stderr, code = _run_ps(script)
return stdout if stdout else stderr
def maximize_window(pid=None, title=None):
"""Maximize a window by PID or title substring."""
if pid:
try:
pid_val = int(pid)
if pid_val < 1 or pid_val > 2147483647:
return "ERROR: Invalid PID"
except (ValueError, TypeError):
return f"ERROR: Invalid PID: {pid}"
script = f'''
Add-Type @'
using System;
using System.Runtime.InteropServices;
public class WinMax {{
[DllImport("user32.dll")] public static extern bool ShowWindow(IntPtr hWnd, int nCmdShow);
}}
'@
$proc = Get-Process -Id {pid_val} -ErrorAction SilentlyContinue
if ($proc) {{
[WinMax]::ShowWindow($proc.MainWindowHandle, 3)
Write-Output "OK: Maximized (PID: {pid_val})"
}} else {{
Write-Output "ERROR: Process not found"
}}
'''
elif title:
_validate_string(title, "window title")
escaped_title = title.replace("'", "''")
script = f'''
Add-Type @'
using System;
using System.Runtime.InteropServices;
public class WinMax {{
[DllImport("user32.dll")] public static extern bool ShowWindow(IntPtr hWnd, int nCmdShow);
}}
'@
$proc = Get-Process | Where-Object {{ $_.MainWindowTitle -like '*{escaped_title}*' }} | Select-Object -First 1
if ($proc) {{
[WinMax]::ShowWindow($proc.MainWindowHandle, 3)
Write-Output "OK: Maximized: $($proc.MainWindowTitle)"
}} else {{
Write-Output "ERROR: No window found"
}}
'''
else:
return "ERROR: Provide --pid or --title"
stdout, stderr, code = _run_ps(script)
return stdout if stdout else stderr
def resize_window(pid=None, title=None, x=None, y=None, width=None, height=None):
"""Move and resize a window. All position/size parameters in pixels."""
if not (pid or title):
return "ERROR: Provide --pid or --title"
if x is None or y is None or width is None or height is None:
return "ERROR: Provide --x, --y, --width, --height"
# Validate numeric parameters are reasonable
for name, val in [("x", x), ("y", y), ("width", width), ("height", height)]:
try:
ival = int(val)
if ival < 0 or ival > 100000:
return f"ERROR: {name} value out of reasonable range: {val}"
except (ValueError, TypeError):
return f"ERROR: {name} must be an integer, got: {val}"
target = ""
if pid:
try:
target = f"$proc = Get-Process -Id {int(pid)} -ErrorAction SilentlyContinue"
except (ValueError, TypeError):
return "ERROR: Invalid PID"
else:
_validate_string(title, "window title")
escaped_title = title.replace("'", "''")
target = f'$proc = Get-Process | Where-Object {{ $_.MainWindowTitle -like \'*{escaped_title}*\' }} | Select-Object -First 1'
x_val, y_val, w_val, h_val = int(x), int(y), int(width), int(height)
script = f'''
Add-Type @'
using System;
using System.Runtime.InteropServices;
public class WinPos {{
[DllImport("user32.dll")] public static extern bool SetWindowPos(IntPtr hWnd, IntPtr hWndAfter, int X, int Y, int cx, int cy, uint uFlags);
}}
'@
{target}
if ($proc) {{
$result = [WinPos]::SetWindowPos($proc.MainWindowHandle, [IntPtr]::Zero, {x_val}, {y_val}, {w_val}, {h_val}, 0x0040)
if ($result) {{ Write-Output "OK: Window moved to ({x_val},{y_val}) size {w_val}x{h_val}" }}
else {{ Write-Output "ERROR: Failed to resize" }}
}} else {{
Write-Output "ERROR: Window not found"
}}
'''
stdout, stderr, code = _run_ps(script)
return stdout if stdout else stderr
def send_keys(text, pid=None, title=None):
"""Send keystrokes to a window. Uses SendKeys format with safety validation."""
_validate_sendkeys_text(text)
if pid:
try:
pid_val = int(pid)
if pid_val < 1 or pid_val > 2147483647:
return "ERROR: Invalid PID"
except (ValueError, TypeError):
return f"ERROR: Invalid PID: {pid}"
# Escape the text for PowerShell single-quote string (safe because we validated above)
ps_escaped = text.replace("'", "''")
script = f'''
Add-Type -AssemblyName System.Windows.Forms
$proc = Get-Process -Id {pid_val} -ErrorAction SilentlyContinue
if ($proc) {{
Start-Sleep -Milliseconds 200
[System.Windows.Forms.SendKeys]::SendWait('{ps_escaped}')
Write-Output "OK: Sent keys to PID {pid_val}"
}} else {{
Write-Output "ERROR: Process not found"
}}
'''
elif title:
_validate_string(title, "window title")
escaped_title = title.replace("'", "''")
ps_escaped = text.replace("'", "''")
script = f'''
Add-Type -AssemblyName System.Windows.Forms
$proc = Get-Process | Where-Object {{ $_.MainWindowTitle -like '*{escaped_title}*' }} | Select-Object -First 1
if ($proc) {{
$hwnd = $proc.MainWindowHandle
# Use Win32 API to bring window to foreground reliably
AddType @"
using System; using System.Runtime.InteropServices;
public class W32H {{ [DllImport("user32.dll")] public static extern bool SetForegroundWindow(IntPtr h); }}
"@
[W32H]::SetForegroundWindow($hwnd) | Out-Null
Start-Sleep -Milliseconds 300
[System.Windows.Forms.SendKeys]::SendWait('{ps_escaped}')
Write-Output "OK: Sent keys"
}} else {{
Write-Output "ERROR: Window not found"
}}
'''
else:
return "ERROR: Provide --pid or --title"
stdout, stderr, code = _run_ps(script)
return stdout if stdout else stderr
def main():
import argparse
parser = argparse.ArgumentParser(description="Window Manager - Control desktop windows")
sub = parser.add_subparsers(dest="action")
p_list = sub.add_parser("list", help="List all visible windows")
p_act = sub.add_parser("activate", help="Bring window to foreground")
p_act.add_argument("--pid", type=int)
p_act.add_argument("--title", type=str)
p_close = sub.add_parser("close", help="Close a window")
p_close.add_argument("--pid", type=int)
p_close.add_argument("--title", type=str)
p_min = sub.add_parser("minimize", help="Minimize a window")
p_min.add_argument("--pid", type=int)
p_min.add_argument("--title", type=str)
p_max = sub.add_parser("maximize", help="Maximize a window")
p_max.add_argument("--pid", type=int)
p_max.add_argument("--title", type=str)
p_resize = sub.add_parser("resize", help="Move and resize a window")
p_resize.add_argument("--pid", type=int)
p_resize.add_argument("--title", type=str)
p_resize.add_argument("--x", type=int, required=True)
p_resize.add_argument("--y", type=int, required=True)
p_resize.add_argument("--width", type=int, required=True)
p_resize.add_argument("--height", type=int, required=True)
p_keys = sub.add_parser("send-keys", help="Send keystrokes to a window")
p_keys.add_argument("--pid", type=int)
p_keys.add_argument("--title", type=str)
p_keys.add_argument("--text", type=str, required=True)
args = parser.parse_args()
if args.action == "list":
print(list_windows())
elif args.action == "activate":
print(activate_window(args.pid, args.title))
elif args.action == "close":
print(close_window(args.pid, args.title))
elif args.action == "minimize":
print(minimize_window(args.pid, args.title))
elif args.action == "maximize":
print(maximize_window(args.pid, args.title))
elif args.action == "resize":
print(resize_window(args.pid, args.title, args.x, args.y, args.width, args.height))
elif args.action == "send-keys":
try:
print(send_keys(args.text, args.pid, args.title))
except ValueError as e:
print(str(e))
else:
parser.print_help()
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Process Manager - List, start, stop, and monitor system processes.
Requirements: Windows 10/11, PowerShell 5.1+
Dependencies: None (uses built-in PowerShell)
"""
import subprocess
import sys
import os
import re
if sys.stdout.encoding != 'utf-8':
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if sys.stderr.encoding != 'utf-8':
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from common import run_ps as _run_ps
# ========== Safety: Input Validation ==========
# Dangerous characters that should never appear in process names or commands
DANGEROUS_CHARS_PATTERN = re.compile(r'[;&|`$(){}[\]!<>\n\r]')
# Protected system processes that must not be killed
PROTECTED_PROCESSES = {
'csrss', 'csrss.exe',
'lsass', 'lsass.exe',
'services', 'services.exe',
'svchost', 'svchost.exe',
'winlogon', 'winlogon.exe',
'wininit', 'wininit.exe',
'smss', 'smss.exe',
'system', 'system.exe',
'dwm', 'dwm.exe',
'taskhostw', 'taskhostw.exe',
'sihost', 'sihost.exe',
'fontdrvhost', 'fontdrvhost.exe',
'usermanager', 'usermanager.exe',
}
# Blocked command patterns for Start-Process
BLOCKED_COMMAND_PATTERNS = [
'& ', '| ', '; ', '`', '$(', '${',
'rm ', 'del ', 'format', 'shutdown', 'reboot',
'mklink', 'icacls', 'reg delete', 'reg add',
]
def _validate_string(value, field_name="input"):
"""Validate that a string doesn't contain shell injection characters."""
if not value:
return True
if DANGEROUS_CHARS_PATTERN.search(value):
raise ValueError(
f"ERROR: {field_name} contains forbidden shell characters. "
f"Only alphanumeric, spaces, dots, hyphens, underscores, "
f"slashes, backslashes, colons are allowed."
)
return True
def _sanitize_ps_string(value):
"""
Sanitize a string for safe embedding in PowerShell single-quoted strings.
Validates input first, then escapes single quotes.
"""
if value is None:
return "''"
_validate_string(value)
# In PowerShell single-quoted strings, only ' needs escaping -> ''
escaped = value.replace("'", "''")
return f"'{escaped}'"
def _validate_command_path(command):
"""Validate and sanitize a file path/command for Start-Process."""
if not command:
raise ValueError("ERROR: Command cannot be empty")
command = command.strip()
lower_cmd = command.lower()
for pattern in BLOCKED_COMMAND_PATTERNS:
if pattern in lower_cmd:
raise ValueError(
f"ERROR: Command blocked - contains dangerous pattern '{pattern}'. "
f"Use the terminal directly for advanced commands."
)
_validate_string(command, "command")
return command
def list_processes(name=None):
"""List running processes. Optionally filter by name."""
if name:
_validate_string(name, "process name")
escaped = name.replace("'", "''")
script = f"""
Get-Process -Name '*{escaped}*' -ErrorAction SilentlyContinue |
Select-Object Id, ProcessName, CPU, WorkingSet64, MainWindowTitle |
ForEach-Object {{
[PSCustomObject]@{{
PID = $_.Id
Name = $_.ProcessName
CPU_sec = [math]::Round($_.CPU, 2)
Memory_MB = [math]::Round($_.WorkingSet64 / 1MB, 2)
Window = if ($_.MainWindowTitle) {{ $_.MainWindowTitle }} else {{ '' }}
}}
}} | ConvertTo-Json -Compress
"""
else:
script = r"""
Get-Process | Select-Object Id, ProcessName, CPU, WorkingSet64, MainWindowTitle |
ForEach-Object {
[PSCustomObject]@{
PID = $_.Id
Name = $_.ProcessName
CPU_sec = [math]::Round($_.CPU, 2)
Memory_MB = [math]::Round($_.WorkingSet64 / 1MB, 2)
Window = if ($_.MainWindowTitle) { $_.MainWindowTitle } else { '' }
}
} | ConvertTo-Json -Compress
"""
stdout, stderr, code = _run_ps(script)
if not stdout:
return "[]"
return stdout
def kill_process(pid=None, name=None, force=False):
"""Kill a process by PID or name. Protected system processes are blocked."""
flag = "-Force" if force else ""
if pid:
try:
pid_val = int(pid)
if pid_val < 1 or pid_val > 2147483647:
return f"ERROR: Invalid PID value: {pid}"
except (ValueError, TypeError):
return f"ERROR: PID must be an integer, got: {pid}"
# Build protected list as PowerShell array string
protected_items = sorted(PROTECTED_PROCESSES)
protected_ps_array = ", ".join(f"'{p}'" for p in protected_items)
script = f"""
$proc = Get-Process -Id {pid_val} -ErrorAction SilentlyContinue
if (-not $proc) {{
Write-Output "ERROR: Process PID {pid_val} not found or access denied"
exit 1
}}
$protected = @({protected_ps_array})
$pn = $proc.ProcessName.ToLower()
$pne = $pn + '.exe'
if ($protected -contains $pn -or $protected -contains $pne) {{
Write-Output "ERROR: Process '$($proc.ProcessName)' (PID {pid_val}) is protected and cannot be killed"
exit 2
}}
Stop-Process -Id {pid_val} {flag} -ErrorAction SilentlyContinue
if ($?) {{
Write-Output "OK: Killed process PID {pid_val} ($($proc.ProcessName))"
}} else {{
Write-Output "ERROR: Could not kill process PID {pid_val}"
}}
"""
elif name:
_validate_string(name, "process name")
name_lower = name.lower().strip()
# Fast reject from Python-level blacklist
if name_lower in PROTECTED_PROCESSES or f"{name_lower}.exe" in PROTECTED_PROCESSES:
return f"ERROR: Process '{name}' is a protected system process and cannot be killed"
escaped = _sanitize_ps_string(name)
protected_items = sorted(PROTECTED_PROCESSES)
protected_ps_array = ", ".join(f"'{p}'" for p in protected_items)
script = f"""
$procs = Get-Process -Name *{escaped}* -ErrorAction SilentlyContinue
$protected = @({protected_ps_array})
$targets = $procs | Where-Object {{
$pn2 = $_.ProcessName.ToLower()
$pne2 = $pn2 + '.exe'
($protected -notcontains $pn2) -and ($protected -notcontains $pne2)
}}
if ($targets) {{
$targetNames = ($targets.ProcessName | Sort-Object -Unique) -join ', '
$targets | Stop-Process {flag} -ErrorAction SilentlyContinue
Write-Output "OK: Killed $($targets.Count) process(es): $targetNames"
}} else {{
if ($procs) {{
Write-Output "ERROR: All matching processes are protected system processes. No action taken."
}} else {{
Write-Output "ERROR: No process found matching '{name}'"
}}
}}
"""
else:
return "ERROR: Provide --pid or --name"
stdout, stderr, code = _run_ps(script)
return stdout if stdout else stderr
def start_process(command, working_dir=None, wait=False):
"""Start a new process with validated/sanitized input."""
# Validate command path (raises ValueError on dangerous input)
safe_command = _validate_command_path(command)
# Sanitize for PowerShell using single-quote escaping
safe_cmd_ps = _sanitize_ps_string(safe_command)
dir_part = ""
if working_dir:
_validate_string(working_dir, "working directory")
safe_dir = _sanitize_ps_string(working_dir)
dir_part = f"-WorkingDirectory {safe_dir}"
wait_part = "-Wait" if wait else ""
script = f"""
try {{
Start-Process -FilePath {safe_cmd_ps} {dir_part} {wait_part} -ErrorAction Stop
Write-Output "OK: Started process"
}} catch {{
Write-Output "ERROR: $($_.Exception.Message)"
}}
"""
stdout, stderr, code = _run_ps(script)
return stdout if stdout else stderr
def get_process_info(pid):
"""Get detailed information about a specific process."""
try:
pid_val = int(pid)
if pid_val < 1 or pid_val > 2147483647:
return f"ERROR: Invalid PID value: {pid}"
except (ValueError, TypeError):
return f"ERROR: PID must be an integer, got: {pid}"
script = f"""
$proc = Get-Process -Id {pid_val} -ErrorAction SilentlyContinue
if ($proc) {{
[PSCustomObject]@{{
PID = $proc.Id
Name = $proc.ProcessName
Path = $proc.Path
StartTime = $proc.StartTime
CPU_sec = [math]::Round($proc.CPU, 2)
Memory_MB = [math]::Round($proc.WorkingSet64 / 1MB, 2)
Threads = $proc.Threads.Count
Handles = $proc.HandleCount
MainWindowTitle = $proc.MainWindowTitle
Responding = $proc.Responding
}} | ConvertTo-Json
}} else {{
Write-Output "ERROR: Process PID {pid_val} not found"
}}
"""
stdout, stderr, code = _run_ps(script)
return stdout if stdout else stderr
def get_system_info():
"""Get overall system resource usage."""
script = r"""
$os = Get-CimInstance Win32_OperatingSystem
$totalGB = [math]::Round($os.TotalVisibleMemorySize / 1MB, 2)
$freeGB = [math]::Round($os.FreePhysicalMemory / 1MB, 2)
$usedGB = [math]::Round($totalGB - $freeGB, 2)
$cpu = Get-CimInstance Win32_Processor | Select-Object -First 1
$uptime = (Get-Date) - $os.LastBootUpTime
[PSCustomObject]@{
ComputerName = $env:COMPUTERNAME
OS = $os.Caption
CPU_Load_Pct = $cpu.LoadPercentage
RAM_Total_GB = $totalGB
RAM_Used_GB = $usedGB
RAM_Free_GB = $freeGB
RAM_Usage_Pct = [math]::Round($usedGB / $totalGB * 100, 1)
Uptime_Days = [math]::Floor($uptime.TotalDays)
Uptime_Hours = $uptime.Hours
Process_Count = (Get-Process).Count
} | ConvertTo-Json
"""
stdout, stderr, code = _run_ps(script)
return stdout if stdout else stderr
def main():
import argparse
parser = argparse.ArgumentParser(description="Process Manager")
sub = parser.add_subparsers(dest="action")
p_list = sub.add_parser("list", help="List processes")
p_list.add_argument("--name", type=str, help="Filter by name")
p_kill = sub.add_parser("kill", help="Kill a process")
p_kill.add_argument("--pid", type=int)
p_kill.add_argument("--name", type=str)
p_kill.add_argument("--force", action="store_true")
p_start = sub.add_parser("start", help="Start a process")
p_start.add_argument("command", type=str)
p_start.add_argument("--dir", type=str, help="Working directory")
p_start.add_argument("--wait", action="store_true")
p_info = sub.add_parser("info", help="Get process details")
p_info.add_argument("--pid", type=int, required=True)
p_sys = sub.add_parser("system", help="Get system resource info")
args = parser.parse_args()
if args.action == "list":
print(list_processes(args.name))
elif args.action == "kill":
print(kill_process(args.pid, args.name, args.force))
elif args.action == "start":
try:
print(start_process(args.command, args.dir, args.wait))
except ValueError as e:
print(str(e))
elif args.action == "info":
print(get_process_info(args.pid))
elif args.action == "system":
print(get_system_info())
else:
parser.print_help()
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Hardware Controller - Control Windows system hardware settings.
Capabilities:
- Volume control (get/set/mute)
- Screen brightness (get/set)
- Display settings (resolution, orientation)
- Power management (sleep, hibernate, shutdown, restart, lock)
- Network adapters (list, enable, disable)
- WiFi (list networks)
- USB devices (list)
Requirements: Windows 10/11, PowerShell 5.1+
Dependencies: None for basic features.
"""
import subprocess
import sys
import os
if sys.stdout.encoding != 'utf-8':
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if sys.stderr.encoding != 'utf-8':
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from common import run_ps as _run_ps
# ========== Volume Control ==========
def get_volume():
"""Get current volume level and mute state."""
script = r"""
try {
Get-WmiObject Win32_SoundDevice | Select-Object -First 1 | ForEach-Object {
[PSCustomObject]@{
Device = $_.Name
Status = $_.Status
}
} | ConvertTo-Json
} catch {
Write-Output '{"Note":"Audio device info unavailable"}'
}
"""
stdout, _, _ = _run_ps(script)
return stdout
def set_volume(level):
"""Set volume level (0-100). Requires NirCmd for precise control."""
if not (0 <= level <= 100):
return "ERROR: Volume level must be 0-100"
script = f"""
try {{
$nircmd = Get-Command nircmd -ErrorAction SilentlyContinue
if ($nircmd) {{
& nircmd.exe setsysvolume {int(level * 655.35)}
Write-Output "OK: Volume set to {level}%"
return
}}
Write-Output "INFO: For precise volume control, install NirCmd (nircmd.com) and add to PATH"
}} catch {{
Write-Output "ERROR: $($_.Exception.Message)"
}}
"""
stdout, _, _ = _run_ps(script)
return stdout
def toggle_mute():
"""Toggle system mute."""
script = r"""
Add-Type -TypeDefinition @"
using System;
using System.Runtime.InteropServices;
public class AudioMute {
[DllImport("user32.dll")]
public static extern IntPtr SendMessageW(IntPtr hWnd, int Msg, IntPtr wParam, IntPtr lParam);
public const int WM_APPCOMMAND = 0x319;
public const int APPCMD_VOLUME_MUTE = 0x08;
}
"@
$hwnd = (Get-Process -Id $PID).MainWindowHandle
[AudioMute]::SendMessageW($hwnd, [AudioMute]::WM_APPCOMMAND, [IntPtr]::Zero, (IntPtr)([AudioMute]::APPCMD_VOLUME_MUTE * 0x10000))
Write-Output "OK: Toggle mute sent"
"""
stdout, _, _ = _run_ps(script)
return stdout
# ========== Screen / Display ==========
def get_brightness():
"""Get screen brightness level."""
script = r"""
try {
$brightness = Get-CimInstance -Namespace root/WMI -ClassName WmiMonitorBrightness -ErrorAction Stop
$current = $brightness.CurrentBrightness
[PSCustomObject]@{
Brightness = $current
MaxBrightness = 100
MinBrightness = 0
} | ConvertTo-Json
} catch {
Write-Output '{"Error":"Could not read brightness (may not be supported on this display)"}'
}
"""
stdout, _, _ = _run_ps(script)
return stdout
def set_brightness(level):
"""Set screen brightness (0-100)."""
if not (0 <= level <= 100):
return "ERROR: Brightness must be 0-100"
script = f"""
try {{
$delay = 0
Get-CimInstance -Namespace root/WMI -ClassName WmiMonitorBrightnessMethods -ErrorAction Stop |
WmiSetBrightness($delay, {level})
Write-Output "OK: Brightness set to {level}%"
}} catch {{
Write-Output "ERROR: Could not set brightness. Requires compatible display (laptop or DDC/CI monitor)."
}}
"""
stdout, _, _ = _run_ps(script)
return stdout
def get_display_info():
"""Get display adapter and resolution information."""
script = r"""
Get-CimInstance Win32_VideoController | ForEach-Object {
[PSCustomObject]@{
Name = $_.Name
Resolution = "$($_.CurrentHorizontalResolution)x$($_.CurrentVerticalResolution)"
RefreshRate = $_.CurrentRefreshRate
VRAM_MB = [math]::Round($_.AdapterRAM / 1MB, 0)
DriverVersion = $_.DriverVersion
Status = $_.Status
}
} | ConvertTo-Json -Compress
"""
stdout, _, _ = _run_ps(script)
return stdout
# ========== Safety: Protected process blacklist ==========
# These critical system processes must never be killed
PROTECTED_PROCESSES = {
'csrss', 'lsass', 'services', 'svchost', 'winlogon', 'wininit',
'smss', 'system', 'explorer', 'dwm', 'taskhostw', 'sihost',
'fontdrvhost', 'usermanager'
}
# ========== Power Management ==========
def _require_confirmation(action_name):
"""
Safety gate for destructive power operations.
Returns error string if confirmation env var is not set.
Caller should check and return early if non-None.
"""
confirmed = os.environ.get("SYSTEM_CONTROLLER_CONFIRM", "")
if action_name not in confirmed.split(","):
return (
f"ERROR: {action_name} requires explicit confirmation. "
f"Set environment variable SYSTEM_CONTROLLER_CONFIRM={action_name} "
f"or SYSTEM_CONTROLLER_CONFIRM=all to proceed. "
f"This prevents accidental execution by AI agents."
)
return None
def lock_screen():
"""Lock the workstation."""
script = r"rundll32.exe user32.dll,LockWorkStation"
_run_ps(script)
return "OK: Screen locked"
def sleep_system():
"""Put the system to sleep."""
err = _require_confirmation("sleep")
if err:
return err
script = r"""
Add-Type -AssemblyName System.Windows.Forms
[System.Windows.Forms.Application]::SetSuspendState([System.Windows.Forms.PowerState]::Suspend, $false, $false)
Write-Output "OK: System entering sleep mode"
"""
stdout, _, _ = _run_ps(script)
return stdout
def hibernate():
"""Hibernate the system."""
err = _require_confirmation("hibernate")
if err:
return err
script = r"""
Add-Type -AssemblyName System.Windows.Forms
[System.Windows.Forms.Application]::SetSuspendState([System.Windows.Forms.PowerState]::Hibernate, $false, $false)
Write-Output "OK: System hibernating"
"""
stdout, _, _ = _run_ps(script)
return stdout
def shutdown(delay_sec=60):
"""Schedule system shutdown. Requires SYSTEM_CONTROLLER_CONFIRM env var."""
err = _require_confirmation("shutdown")
if err:
return err
# Validate delay range
if delay_sec < 10:
return "ERROR: Minimum shutdown delay is 10 seconds (safety constraint)"
if delay_sec > 3600:
return "ERROR: Maximum shutdown delay is 3600 seconds (1 hour)"
script = f"""
shutdown /s /t {delay_sec} /c "Shutdown initiated by system-controller"
Write-Output "OK: System will shutdown in {delay_sec} seconds. Run 'shutdown /a' to cancel."
"""
stdout, _, _ = _run_ps(script)
return stdout
def restart(delay_sec=60):
"""Schedule system restart. Requires SYSTEM_CONTROLLER_CONFIRM env var."""
err = _require_confirmation("restart")
if err:
return err
# Validate delay range
if delay_sec < 10:
return "ERROR: Minimum restart delay is 10 seconds (safety constraint)"
if delay_sec > 3600:
return "ERROR: Maximum restart delay is 3600 seconds (1 hour)"
script = f"""
shutdown /r /t {delay_sec} /c "Restart initiated by system-controller"
Write-Output "OK: System will restart in {delay_sec} seconds. Run 'shutdown /a' to cancel."
"""
stdout, _, _ = _run_ps(script)
return stdout
def cancel_shutdown():
"""Cancel a scheduled shutdown/restart."""
script = r"shutdown /a; Write-Output 'OK: Shutdown/restart cancelled'"
stdout, _, _ = _run_ps(script)
return stdout
# ========== Network ==========
def list_network_adapters():
"""List all network adapters with status."""
script = r"""
Get-NetAdapter | Select-Object Name, InterfaceDescription, Status, LinkSpeed, MacAddress |
ForEach-Object {
[PSCustomObject]@{
Name = $_.Name
Description = $_.InterfaceDescription
Status = $_.Status
Speed = $_.LinkSpeed
MAC = $_.MacAddress
}
} | ConvertTo-Json -Compress
"""
stdout, _, _ = _run_ps(script)
return stdout
def enable_adapter(name):
"""Enable a network adapter."""
escaped = name.replace("'", "''")
script = f"""
try {{
Enable-NetAdapter -Name '{escaped}' -Confirm:$false -ErrorAction Stop
Write-Output "OK: Adapter '{name}' enabled"
}} catch {{
Write-Output "ERROR: $($_.Exception.Message)"
}}
"""
stdout, _, _ = _run_ps(script)
return stdout
def disable_adapter(name):
"""Disable a network adapter. Requires confirmation to prevent self-disconnect."""
# Safety: require explicit confirmation
err = _require_confirmation("disable_network")
if err:
return err
escaped = name.replace("'", "''")
script = f"""
try {{
$adapter = Get-NetAdapter -Name '{escaped}' -ErrorAction Stop
if ($adapter.Status -eq 'Disabled') {{
Write-Output "INFO: Adapter '{name}' is already disabled"
return
}}
Disable-NetAdapter -Name '{escaped}' -Confirm:$false -ErrorAction Stop
Write-Output "OK: Adapter '{name}' disabled"
}} catch {{
Write-Output "ERROR: $($_.Exception.Message)"
}}
"""
stdout, _, _ = _run_ps(script)
return stdout
def list_wifi_networks():
"""List available WiFi networks."""
script = r"""
try {
netsh wlan show networks mode=bssid
} catch {
Write-Output "ERROR: Could not scan WiFi networks."
}
"""
stdout, _, _ = _run_ps(script)
return stdout
def get_network_info():
"""Get current network configuration."""
script = r"""
Get-NetIPConfiguration | ForEach-Object {
[PSCustomObject]@{
Interface = $_.InterfaceAlias
IPv4 = if ($_.IPv4Address) { $_.IPv4Address.IPAddress -join ', ' } else { '' }
IPv6 = if ($_.IPv6Address) { $_.IPv6Address.IPAddress -join ', ' } else { '' }
DNS = if ($_.DNSServer) { $_.DNSServer.ServerAddresses -join ', ' } else { '' }
}
} | ConvertTo-Json -Compress
"""
stdout, _, _ = _run_ps(script)
return stdout
# ========== USB / Device ==========
def list_usb_devices():
"""List connected USB devices."""
script = r"""
Get-PnpDevice -PresentOnly | Where-Object { $_.InstanceId -like 'USB\*' } |
Select-Object FriendlyName, Status, Class, InstanceId |
ForEach-Object {
[PSCustomObject]@{
Device = $_.FriendlyName
Status = $_.Status
Class = $_.Class
InstanceId = $_.InstanceId
}
} | ConvertTo-Json -Compress
"""
stdout, _, _ = _run_ps(script)
return stdout
def main():
import argparse
parser = argparse.ArgumentParser(description="Hardware Controller")
sub = parser.add_subparsers(dest="category")
# Volume
p_vol = sub.add_parser("volume", help="Volume control")
vol_sub = p_vol.add_subparsers(dest="action")
vol_sub.add_parser("get", help="Get volume level")
vol_set = vol_sub.add_parser("set", help="Set volume")
vol_set.add_argument("--level", type=int, required=True, help="0-100")
vol_sub.add_parser("mute", help="Toggle mute")
# Screen
p_scr = sub.add_parser("screen", help="Screen/display control")
scr_sub = p_scr.add_subparsers(dest="action")
scr_sub.add_parser("info", help="Get display info")
bri_get = scr_sub.add_parser("brightness", help="Get/set brightness")
bri_get.add_argument("--level", type=int, help="0-100")
# Power
p_pwr = sub.add_parser("power", help="Power management")
pwr_sub = p_pwr.add_subparsers(dest="action")
pwr_sub.add_parser("lock", help="Lock screen")
pwr_sub.add_parser("sleep", help="Sleep mode")
pwr_sub.add_parser("hibernate", help="Hibernate")
pwr_sd = pwr_sub.add_parser("shutdown", help="Shutdown")
pwr_sd.add_argument("--delay", type=int, default=60, help="Seconds")
pwr_rs = pwr_sub.add_parser("restart", help="Restart")
pwr_rs.add_argument("--delay", type=int, default=60, help="Seconds")
pwr_sub.add_parser("cancel", help="Cancel scheduled shutdown")
# Network
p_net = sub.add_parser("network", help="Network control")
net_sub = p_net.add_subparsers(dest="action")
net_sub.add_parser("adapters", help="List network adapters")
net_en = net_sub.add_parser("enable", help="Enable adapter")
net_en.add_argument("--name", type=str, required=True)
net_dis = net_sub.add_parser("disable", help="Disable adapter")
net_dis.add_argument("--name", type=str, required=True)
net_sub.add_parser("wifi", help="List WiFi networks")
net_sub.add_parser("info", help="Get network config")
# USB
p_usb = sub.add_parser("usb", help="USB devices")
usb_sub = p_usb.add_subparsers(dest="action")
usb_sub.add_parser("list", help="List USB devices")
args = parser.parse_args()
if args.category == "volume":
if args.action == "get":
print(get_volume())
elif args.action == "set":
print(set_volume(args.level))
elif args.action == "mute":
print(toggle_mute())
else:
p_vol.print_help()
elif args.category == "screen":
if args.action == "info":
print(get_display_info())
elif args.action == "brightness":
if args.level is not None:
print(set_brightness(args.level))
else:
print(get_brightness())
else:
p_scr.print_help()
elif args.category == "power":
if args.action == "lock":
print(lock_screen())
elif args.action == "sleep":
print(sleep_system())
elif args.action == "hibernate":
print(hibernate())
elif args.action == "shutdown":
print(shutdown(args.delay))
elif args.action == "restart":
print(restart(args.delay))
elif args.action == "cancel":
print(cancel_shutdown())
else:
p_pwr.print_help()
elif args.category == "network":
if args.action == "adapters":
print(list_network_adapters())
elif args.action == "enable":
print(enable_adapter(args.name))
elif args.action == "disable":
print(disable_adapter(args.name))
elif args.action == "wifi":
print(list_wifi_networks())
elif args.action == "info":
print(get_network_info())
else:
p_net.print_help()
elif args.category == "usb":
if args.action == "list":
print(list_usb_devices())
else:
p_usb.print_help()
else:
parser.print_help()
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Serial Communication - Communicate with Arduino and other serial devices.
Capabilities:
- List available serial ports
- Connect to a serial device
- Send data (text or bytes)
- Receive data (with timeout)
- Continuous monitor mode
- Auto-detect common baud rates
Requirements: pyserial (pip install pyserial)
"""
import sys
import json
import time
import subprocess
import os
if sys.stdout.encoding != 'utf-8':
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if sys.stderr.encoding != 'utf-8':
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
def check_pyserial():
"""Check if pyserial is installed, install if not."""
try:
import serial
import serial.tools.list_ports
return True
except ImportError:
print("Installing pyserial...")
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "pyserial", "-q"],
stdout=subprocess.DEVNULL
)
try:
import serial
return True
except ImportError:
return False
def list_ports():
"""List all available serial ports."""
if not check_pyserial():
return '{"error":"Failed to install pyserial"}'
import serial.tools.list_ports
ports = []
for port in serial.tools.list_ports.comports():
ports.append({
"device": port.device,
"description": port.description,
"hwid": port.hwid,
"vendor": port.vid if port.vid else None,
"product": port.pid if port.pid else None,
})
if not ports:
return json.dumps({"info": "No serial ports found"}, ensure_ascii=False)
return json.dumps(ports, indent=2, ensure_ascii=False)
def detect_baud_rate(port, timeout=2):
"""Try to detect the baud rate by testing common rates."""
if not check_pyserial():
return "ERROR: pyserial not available"
import serial
common_rates = [9600, 115200, 57600, 38400, 19200, 4800, 2400, 1200]
for rate in common_rates:
try:
ser = serial.Serial(port, rate, timeout=timeout)
ser.write(b'\n')
time.sleep(0.5)
response = ser.read(ser.in_waiting or 1)
ser.close()
if response:
return json.dumps({
"detected_rate": rate,
"response_preview": response.decode('utf-8', errors='replace')[:100]
})
except Exception:
continue
return json.dumps({"detected_rate": 9600, "note": "Could not auto-detect, using default 9600"})
def send_data(port, data, baud_rate=9600, encoding="utf-8", newline=True):
"""Send data to a serial port."""
if not check_pyserial():
return "ERROR: pyserial not available"
import serial
try:
ser = serial.Serial(port, baud_rate, timeout=2)
payload = (data + '\n').encode(encoding) if newline else data.encode(encoding)
ser.write(payload)
ser.flush()
ser.close()
return f"OK: Sent {len(payload)} bytes to {port} at {baud_rate} baud"
except serial.SerialException as e:
return f"ERROR: {e}"
def receive_data(port, baud_rate=9600, timeout=2, max_bytes=1024, encoding="utf-8"):
"""Receive data from a serial port."""
if not check_pyserial():
return "ERROR: pyserial not available"
import serial
try:
ser = serial.Serial(port, baud_rate, timeout=timeout)
data = ser.read(max_bytes)
ser.close()
if data:
try:
text = data.decode(encoding)
except UnicodeDecodeError:
text = data.hex()
return json.dumps({
"bytes_received": len(data),
"data": text,
"hex": data.hex()[:200]
}, ensure_ascii=False)
else:
return '{"info":"No data received within timeout"}'
except serial.SerialException as e:
return f"ERROR: {e}"
def send_and_receive(port, data, baud_rate=9600, timeout=2, encoding="utf-8"):
"""Send data and wait for response."""
if not check_pyserial():
return "ERROR: pyserial not available"
import serial
try:
ser = serial.Serial(port, baud_rate, timeout=timeout)
ser.write((data + '\n').encode(encoding))
ser.flush()
time.sleep(0.1)
response = ser.read(ser.in_waiting or 1024)
ser.close()
try:
text = response.decode(encoding)
except UnicodeDecodeError:
text = response.hex()
return json.dumps({
"sent": data,
"received": text,
"bytes": len(response)
}, ensure_ascii=False)
except serial.SerialException as e:
return f"ERROR: {e}"
def monitor(port, baud_rate=9600, duration=10, encoding="utf-8"):
"""Monitor serial port output for a duration (outputs in real-time)."""
if not check_pyserial():
return "ERROR: pyserial not available"
import serial
try:
ser = serial.Serial(port, baud_rate, timeout=0.1)
print(f"Monitoring {port} at {baud_rate} baud for {duration}s...")
print("--- Press Ctrl+C to stop ---")
start = time.time()
buffer = b""
while time.time() - start < duration:
if ser.in_waiting:
chunk = ser.read(ser.in_waiting)
buffer += chunk
try:
print(chunk.decode(encoding), end="", flush=True)
except UnicodeDecodeError:
print(chunk.hex(), flush=True)
time.sleep(0.05)
ser.close()
print(f"\n--- Monitor ended. Total bytes: {len(buffer)} ---")
return ""
except KeyboardInterrupt:
ser.close()
print("\n--- Monitor stopped by user ---")
return ""
except serial.SerialException as e:
return f"ERROR: {e}"
def main():
import argparse
parser = argparse.ArgumentParser(description="Serial Communication")
sub = parser.add_subparsers(dest="action")
p_list = sub.add_parser("list", help="List serial ports")
p_detect = sub.add_parser("detect", help="Detect baud rate")
p_detect.add_argument("--port", type=str, required=True)
p_send = sub.add_parser("send", help="Send data")
p_send.add_argument("--port", type=str, required=True)
p_send.add_argument("--data", type=str, required=True)
p_send.add_argument("--baud", type=int, default=9600)
p_send.add_argument("--no-newline", action="store_true")
p_recv = sub.add_parser("receive", help="Receive data")
p_recv.add_argument("--port", type=str, required=True)
p_recv.add_argument("--baud", type=int, default=9600)
p_recv.add_argument("--timeout", type=float, default=2)
p_chat = sub.add_parser("chat", help="Send and receive")
p_chat.add_argument("--port", type=str, required=True)
p_chat.add_argument("--data", type=str, required=True)
p_chat.add_argument("--baud", type=int, default=9600)
p_mon = sub.add_parser("monitor", help="Monitor port")
p_mon.add_argument("--port", type=str, required=True)
p_mon.add_argument("--baud", type=int, default=9600)
p_mon.add_argument("--duration", type=int, default=10)
args = parser.parse_args()
if args.action == "list":
print(list_ports())
elif args.action == "detect":
print(detect_baud_rate(args.port))
elif args.action == "send":
print(send_data(args.port, args.data, args.baud, newline=not args.no_newline))
elif args.action == "receive":
print(receive_data(args.port, args.baud, args.timeout))
elif args.action == "chat":
print(send_and_receive(args.port, args.data, args.baud))
elif args.action == "monitor":
monitor(args.port, args.baud, args.duration)
else:
parser.print_help()
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
IoT Controller - Control smart home devices via APIs.
Supported platforms:
- Home Assistant (REST API)
- Mijia / XiaoMi (HTTP API, requires token)
- Generic HTTP/REST endpoints
Requirements: requests (pip install requests)
Security:
- Tokens are read from environment variable HA_TOKEN (not CLI args) to avoid
exposure in process listings, shell history, and system logs.
- URLs are validated against SSRF patterns.
- Token values are never logged or printed in full.
Usage examples:
# Set token via environment variable (recommended):
$env:HA_TOKEN="your_long_lived_token"
python iot_controller.py homeassistant --url http://192.168.1.100:8123 --action list_entities
# Or pass via --token (will show warning):
python iot_controller.py homeassistant --url http://192.168.1.100:8123 --token YOUR_TOKEN --action list_entities
"""
import sys
import json
import subprocess
import os
import re
import warnings
if sys.stdout.encoding != 'utf-8':
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if sys.stderr.encoding != 'utf-8':
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
# ========== Security: URL Validation ==========
# Block non-private/local IPs to prevent SSRF (Server-Side Request Forgery)
# Allow: localhost, 127.x, 10.x, 172.16-31.x, 192.168.x, ::1, fd00+/fc00+
PRIVATE_IP_PATTERN = re.compile(
r'^(https?://)?(localhost|127\.\d+\.\d+\.\d+|10\.\d+\.\d+\.\d+)'
r'|(172\.(1[6-9]|2\d|3[01])\.\d+\.\d+)'
r'|(192\.168\.\d+\.\d+)'
r'|(\[?(::1|f[cd][0-9a-f][0-9a-f]:)\]?)',
re.IGNORECASE
)
def _validate_url(url):
"""Validate that URL points to a local/private network address."""
if not url:
raise ValueError("ERROR: URL cannot be empty")
url = url.strip()
if PRIVATE_IP_PATTERN.match(url) or 'localhost' in url.lower():
return url
# For generic HTTP mode, warn but allow (user may have valid use case)
warnings.warn(
f"WARNING: URL '{url}' does not appear to be a local/private address. "
f"Ensure this is intentional (SSRF risk for public URLs).",
UserWarning
)
return url
def _mask_token(token):
"""Return a masked version of token for safe logging/display."""
if not token:
return "(none)"
if len(token) <= 8:
return "****"
return token[:4] + "****" + token[-4:]
def _resolve_token(token_arg=None):
"""
Resolve token from environment variable first, then CLI argument fallback.
Environment variable HA_TOKEN takes priority.
Prints warning if using insecure CLI argument method.
"""
env_token = os.environ.get("HA_TOKEN", "")
if env_token:
return env_token
if token_arg:
print(
f"WARNING: Token passed via CLI argument is visible in process list. "
f"Set environment variable HA_TOKEN instead for better security.",
file=sys.stderr
)
return token_arg
raise ValueError(
"ERROR: No token provided. Set HA_TOKEN environment variable or use --token argument."
)
# ========== Dependencies ==========
def check_requests():
"""Ensure requests library is available."""
try:
import requests
return requests
except ImportError:
print("Installing requests...", file=sys.stderr)
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "requests", "-q"],
stdout=subprocess.DEVNULL
)
import requests
return requests
# ========== Home Assistant ==========
def ha_list_entities(base_url, token):
"""List all entities from Home Assistant."""
requests = check_requests()
_validate_url(base_url)
url = f"{base_url.rstrip('/')}/api/states"
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
try:
resp = requests.get(url, headers=headers, timeout=10)
if resp.status_code == 200:
entities = []
for entity in resp.json():
entities.append({
"entity_id": entity["entity_id"],
"state": entity["state"],
"friendly_name": entity["attributes"].get("friendly_name", ""),
"domain": entity["entity_id"].split(".")[0]
})
return json.dumps(entities, indent=2, ensure_ascii=False)
else:
return f"ERROR: HTTP {resp.status_code} - {resp.text}"
except Exception as e:
return f"ERROR: {e}"
def ha_get_state(base_url, token, entity_id):
"""Get state of a specific entity."""
requests = check_requests()
_validate_url(base_url)
url = f"{base_url.rstrip('/')}/api/states/{entity_id}"
headers = {"Authorization": f"Bearer {token}"}
try:
resp = requests.get(url, headers=headers, timeout=10)
if resp.status_code == 200:
return json.dumps(resp.json(), indent=2, ensure_ascii=False)
else:
return f"ERROR: HTTP {resp.status_code}"
except Exception as e:
return f"ERROR: {e}"
def ha_call_service(base_url, token, domain, service, entity_id, service_data=None):
"""Call a Home Assistant service."""
requests = check_requests()
_validate_url(base_url)
url = f"{base_url.rstrip('/')}/api/services/{domain}/{service}"
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
payload = {"entity_id": entity_id}
if service_data:
if isinstance(service_data, str):
try:
payload.update(json.loads(service_data))
except json.JSONDecodeError:
return "ERROR: Invalid service_data JSON"
else:
payload.update(service_data)
try:
resp = requests.post(url, headers=headers, json=payload, timeout=10)
if resp.status_code == 200:
return f"OK: Called {domain}.{service} on {entity_id}"
else:
return f"ERROR: HTTP {resp.status_code} - {resp.text}"
except Exception as e:
return f"ERROR: {e}"
def ha_turn_on(base_url, token, entity_id, params=None):
"""Turn on an entity."""
domain = entity_id.split(".")[0]
return ha_call_service(base_url, token, domain, "turn_on", entity_id, params)
def ha_turn_off(base_url, token, entity_id):
"""Turn off an entity."""
domain = entity_id.split(".")[0]
return ha_call_service(base_url, token, domain, "turn_off", entity_id)
def ha_toggle(base_url, token, entity_id):
"""Toggle an entity."""
domain = entity_id.split(".")[0]
return ha_call_service(base_url, token, domain, "toggle", entity_id)
# ========== Generic HTTP ==========
def http_get(url, path="", headers=None):
"""Send HTTP GET request."""
requests = check_requests()
full_url = f"{url.rstrip('/')}/{path.lstrip('/')}" if path else url
_validate_url(full_url)
hdrs = {}
if headers:
for h in headers:
k, v = h.split(":", 1)
hdrs[k.strip()] = v.strip()
try:
resp = requests.get(full_url, headers=hdrs, timeout=10)
try:
body = resp.json()
return json.dumps({"status": resp.status_code, "data": body}, indent=2, ensure_ascii=False)
except Exception:
return f"Status: {resp.status_code}\n{resp.text[:2000]}"
except Exception as e:
return f"ERROR: {e}"
def http_post(url, path="", body=None, headers=None):
"""Send HTTP POST request."""
requests = check_requests()
full_url = f"{url.rstrip('/')}/{path.lstrip('/')}" if path else url
_validate_url(full_url)
hdrs = {"Content-Type": "application/json"}
if headers:
for h in headers:
k, v = h.split(":", 1)
hdrs[k.strip()] = v.strip()
payload = None
if body:
try:
payload = json.loads(body) if isinstance(body, str) else body
except json.JSONDecodeError:
payload = body
try:
resp = requests.post(full_url, headers=hdrs, json=payload, timeout=10)
try:
rbody = resp.json()
return json.dumps({"status": resp.status_code, "data": rbody}, indent=2, ensure_ascii=False)
except Exception:
return f"Status: {resp.status_code}\n{resp.text[:2000]}"
except Exception as e:
return f"ERROR: {e}"
def http_put(url, path="", body=None, headers=None):
"""Send HTTP PUT request."""
requests = check_requests()
full_url = f"{url.rstrip('/')}/{path.lstrip('/')}" if path else url
_validate_url(full_url)
hdrs = {"Content-Type": "application/json"}
if headers:
for h in headers:
k, v = h.split(":", 1)
hdrs[k.strip()] = v.strip()
payload = None
if body:
try:
payload = json.loads(body) if isinstance(body, str) else body
except json.JSONDecodeError:
payload = body
try:
resp = requests.put(full_url, headers=hdrs, json=payload, timeout=10)
try:
rbody = resp.json()
return json.dumps({"status": resp.status_code, "data": rbody}, indent=2, ensure_ascii=False)
except Exception:
return f"Status: {resp.status_code}\n{resp.text[:2000]}"
except Exception as e:
return f"ERROR: {e}"
# ========== Mijia / XiaoMi ==========
def mijia_discover():
"""Discover Mijia devices on local network (basic SSDP scan)."""
print("INFO: For Mijia device control, use the miio library:")
print(" pip install miio")
print(" python -m miio discover")
return "INFO: Run the above commands to discover Mijia devices"
# ========== Main ==========
def main():
import argparse
parser = argparse.ArgumentParser(description="IoT Controller")
sub = parser.add_subparsers(dest="platform")
# Home Assistant
p_ha = sub.add_parser("homeassistant", help="Home Assistant control")
ha_sub = p_ha.add_subparsers(dest="action")
ha_sub.add_parser("list", help="List all entities")
ha_state = ha_sub.add_parser("state", help="Get entity state")
ha_state.add_argument("--entity-id", type=str, required=True)
ha_call = ha_sub.add_parser("call", help="Call service")
ha_call.add_argument("--domain", type=str, required=True)
ha_call.add_argument("--service", type=str, required=True)
ha_call.add_argument("--entity-id", type=str, required=True)
ha_call.add_argument("--data", type=str, help="JSON service data")
ha_on = ha_sub.add_parser("on", help="Turn on entity")
ha_on.add_argument("--entity-id", type=str, required=True)
ha_on.add_argument("--data", type=str, help="JSON params (e.g. brightness)")
ha_off = ha_sub.add_parser("off", help="Turn off entity")
ha_off.add_argument("--entity-id", type=str, required=True)
ha_tog = ha_sub.add_parser("toggle", help="Toggle entity")
ha_tog.add_argument("--entity-id", type=str, required=True)
p_ha.add_argument("--url", type=str, required=True, help="Home Assistant base URL")
p_ha.add_argument("--token", type=str, default=None,
help="Long-lived access token (prefer env var HA_TOKEN)")
# Generic HTTP
p_http = sub.add_parser("http", help="Generic HTTP/REST control")
http_sub = p_http.add_subparsers(dest="action")
http_get_p = http_sub.add_parser("get", help="HTTP GET")
http_get_p.add_argument("--path", type=str, default="")
http_post_p = http_sub.add_parser("post", help="HTTP POST")
http_post_p.add_argument("--path", type=str, default="")
http_post_p.add_argument("--body", type=str, default=None)
http_put_p = http_sub.add_parser("put", help="HTTP PUT")
http_put_p.add_argument("--path", type=str, default="")
http_put_p.add_argument("--body", type=str, default=None)
p_http.add_argument("--url", type=str, required=True)
p_http.add_argument("--header", type=str, action="append", help="Header in 'Key: Value' format")
# Mijia
p_mi = sub.add_parser("mijia", help="Mijia/XiaoMi control")
mi_sub = p_mi.add_subparsers(dest="action")
mi_sub.add_parser("discover", help="Discover devices")
args = parser.parse_args()
if args.platform == "homeassistant":
url = args.url
# Resolve token securely (env var preferred)
try:
token = _resolve_token(args.token)
except ValueError as e:
print(str(e))
sys.exit(1)
if args.action == "list":
result = ha_list_entities(url, token)
elif args.action == "state":
result = ha_get_state(url, token, args.entity_id)
elif args.action == "call":
result = ha_call_service(url, token, args.domain, args.service, args.entity_id, args.data)
elif args.action == "on":
result = ha_turn_on(url, token, args.entity_id, args.data)
elif args.action == "off":
result = ha_turn_off(url, token, args.entity_id)
elif args.action == "toggle":
result = ha_toggle(url, token, args.entity_id)
else:
p_ha.print_help()
result = None
if result:
print(result)
elif args.platform == "http":
if args.action == "get":
print(http_get(args.url, args.path, args.header))
elif args.action == "post":
print(http_post(args.url, args.path, args.body, args.header))
elif args.action == "put":
print(http_put(args.url, args.path, args.body, args.header))
else:
p_http.print_help()
elif args.platform == "mijia":
if args.action == "discover":
print(mijia_discover())
else:
p_mi.print_help()
else:
parser.print_help()
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
GUI Controller - Mouse, keyboard, screenshot, and visual control.
Capabilities:
- Mouse: move, click, double-click, right-click, drag, scroll, get position
- Keyboard: type text, press hotkeys, key combinations
- Screenshot: full screen, region, save to file
- OCR: extract text from screen regions
- Visual: find image/pattern on screen, click by color
Requirements: Windows 10/11, Python 3.x
Dependencies: pyautogui (auto-installed), pillow (auto-installed), pytesseract (optional, for OCR)
"""
import sys
import os
import json
import subprocess
import argparse
if sys.stdout.encoding != 'utf-8':
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if sys.stderr.encoding != 'utf-8':
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from common import run_ps as _run_ps
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SCREENSHOT_DIR = os.path.join(SCRIPT_DIR, "..", "screenshots")
# ========== Dependency Management ==========
def _ensure_deps(modules=None):
"""Ensure required Python packages are installed. Returns True on success."""
if modules is None:
modules = []
missing = []
for mod in modules:
try:
__import__(mod)
except ImportError:
missing.append(mod)
if not missing:
return True
# Build pip install command
pkg_map = {
"pyautogui": "pyautogui",
"PIL": "pillow",
"pytesseract": "pytesseract",
}
pkgs = []
for m in missing:
pkgs.append(pkg_map.get(m, m))
pip = sys.executable
if not pip:
return False
cmd = [pip, "-m", "pip", "install"] + pkgs
print(f"INFO: Installing missing packages: {', '.join(pkgs)}", file=sys.stderr)
try:
subprocess.run(cmd, capture_output=True, timeout=120)
return True
except Exception as e:
print(f"ERROR: Failed to install packages: {e}", file=sys.stderr)
return False
def _get_gui():
"""Import and return pyautogui, installing if needed."""
_ensure_deps(["pyautogui", "PIL"])
try:
import pyautogui
pyautogui.FAILSAFE = True # Move mouse to corner to abort
return pyautogui
except ImportError:
print("ERROR: pyautogui not available. Run: pip install pyautogui pillow")
sys.exit(1)
# ========== Mouse Control ==========
def mouse_move(x, y, duration=0.3):
"""Move mouse to absolute screen coordinates (x, y)."""
gui = _get_gui()
gui.moveTo(x, y, duration=duration)
print(f"OK: Mouse moved to ({x}, {y})")
def mouse_click(x=None, y=None, button="left", clicks=1, duration=0.2):
"""Click at position. Uses current position if x,y not provided."""
gui = _get_gui()
if x is not None and y is not None:
gui.moveTo(x, y, duration=duration)
if clicks == 1:
gui.click(button=button)
else:
gui.click(button=button, clicks=clicks, interval=0.1)
pos = gui.position()
print(f"OK: {button} click x{clicks} at ({pos.x}, {pos.y})")
def mouse_right_click(x=None, y=None):
"""Right-click at position."""
mouse_click(x, y, button="right")
def mouse_double_click(x=None, y=None):
"""Double-click at position."""
mouse_click(x, y, clicks=2)
def mouse_drag(start_x, start_y, end_x, end_y, duration=0.5, button="left"):
"""Drag from one position to another."""
gui = _get_gui()
gui.moveTo(start_x, start_y, duration=0.2)
gui.dragTo(end_x, end_y, duration=duration, button=button)
print(f"OK: Dragged from ({start_x},{start_y}) to ({end_x},{end_y})")
def mouse_scroll(x=None, y=None, clicks=5, direction="up"):
"""Scroll mouse wheel. Positive clicks=up, negative=down."""
gui = _get_gui()
if x is not None and y is not None:
gui.moveTo(x, y, duration=0.2)
amount = clicks if direction == "up" else -clicks
gui.scroll(amount)
pos = gui.position()
print(f"OK: Scrolled {direction} {abs(clicks)} clicks at ({pos.x}, {pos.y})")
def mouse_position():
"""Get current mouse position."""
gui = _get_gui()
pos = gui.position()
size = gui.size()
result = json.dumps({
"x": pos.x,
"y": pos.y,
"screen_width": size.width,
"screen_height": size.height
})
print(result)
# ========== Keyboard Control ==========
def keyboard_type(text, interval=0.02):
"""Type text character by character at current cursor position."""
gui = _get_gui()
gui.typewrite(text, interval=interval)
print(f"OK: Typed {len(text)} characters")
def keyboard_press(keys):
"""Press a key or key combination (e.g., 'ctrl', 'alt+tab', 'ctrl+shift+esc')."""
gui = _get_gui()
gui.hotkey(*keys.split("+"))
print(f"OK: Pressed {keys}")
def keyboard_hotkey(*key_list):
"""Press multiple keys simultaneously. Keys as separate args."""
gui = _get_gui()
gui.hotkey(*key_list)
print(f"OK: Pressed {'+'.join(key_list)}")
def keyboard_key_down(key):
"""Hold down a key (useful for drag with shift/ctrl)."""
gui = _get_gui()
gui.keyDown(key)
print(f"OK: Key down: {key}")
def keyboard_key_up(key):
"""Release a held key."""
gui = _get_gui()
gui.keyUp(key)
print(f"OK: Key up: {key}")
# ========== Screenshot ==========
def screenshot_full(filepath=None):
"""Take a full screen screenshot. Save to file if path provided."""
gui = _get_gui()
img = gui.screenshot()
if filepath:
os.makedirs(os.path.dirname(filepath) if os.path.dirname(filepath) else ".", exist_ok=True)
img.save(filepath)
print(f"OK: Screenshot saved to {filepath}")
else:
# Save to default location
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
auto_path = os.path.join(SCREENSHOT_DIR, "screenshot.png")
img.save(auto_path)
print(f"OK: Screenshot saved to {auto_path}")
print(f"INFO: File size: {os.path.getsize(auto_path)} bytes")
return img
def screenshot_region(x, y, width, height, filepath=None):
"""Take a screenshot of a specific region."""
gui = _get_gui()
img = gui.screenshot(region=(x, y, width, height))
if filepath:
os.makedirs(os.path.dirname(filepath) if os.path.dirname(filepath) else ".", exist_ok=True)
img.save(filepath)
print(f"OK: Region screenshot saved to {filepath}")
else:
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
auto_path = os.path.join(SCREENSHOT_DIR, "region.png")
img.save(auto_path)
print(f"OK: Region screenshot saved to {auto_path}")
return img
def screenshot_active_window(filepath=None):
"""Take a screenshot of the active (foreground) window."""
gui = _get_gui()
# Use PowerShell to get foreground window bounds
script = r"""
Add-Type @"
using System;
using System.Runtime.InteropServices;
public class WinRect {
[DllImport("user32.dll")]
public static extern bool GetWindowRect(IntPtr hWnd, out RECT lpRect);
[DllImport("user32.dll")]
public static extern IntPtr GetForegroundWindow();
[StructLayout(LayoutKind.Sequential)]
public struct RECT { public int Left, Top, Right, Bottom; }
}
"@
$hwnd = [WinRect]::GetForegroundWindow()
$rect = New-Object WinRect+RECT
[WinRect]::GetWindowRect($hwnd, [ref]$rect)
Write-Output "$($rect.Left),$($rect.Top),$($rect.Right),$($rect.Bottom)"
"""
stdout, _, code = _run_ps(script, timeout=5)
if code == 0 and stdout:
parts = stdout.split(",")
if len(parts) == 4:
left, top, right, bottom = [int(p.strip()) for p in parts]
w = right - left
h = bottom - top
img = gui.screenshot(region=(left, top, w, h))
if filepath:
os.makedirs(os.path.dirname(filepath) if os.path.dirname(filepath) else ".", exist_ok=True)
img.save(filepath)
print(f"OK: Window screenshot saved to {filepath}")
else:
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
auto_path = os.path.join(SCREENSHOT_DIR, "window.png")
img.save(auto_path)
print(f"OK: Window screenshot saved to {auto_path}")
return img
print("ERROR: Could not capture active window")
return None
def get_screen_size():
"""Get screen resolution."""
gui = _get_gui()
size = gui.size()
print(json.dumps({"width": size.width, "height": size.height}))
# ========== Visual / OCR ==========
def ocr_region(x, y, width, height, lang="chi_sim+eng"):
"""Extract text from a screen region using Tesseract OCR."""
_ensure_deps(["pytesseract"])
try:
import pytesseract
from PIL import Image
except ImportError:
# Fallback: use PowerShell + Windows OCR
print(_ocr_region_powershell(x, y, width, height))
return
gui = _get_gui()
img = gui.screenshot(region=(x, y, width, height))
try:
text = pytesseract.image_to_string(img, lang=lang)
text = text.strip()
if text:
print(f"OK: OCR result:\n{text}")
else:
print("OK: OCR result: (no text detected)")
except Exception as e:
# Fallback to PowerShell OCR
print(_ocr_region_powershell(x, y, width, height))
def _ocr_region_powershell(x, y, width, height):
"""Fallback OCR using Windows built-in OCR via PowerShell."""
gui = _get_gui()
import tempfile
# Save screenshot to a safe temp path with controlled filename (no injection)
tmp_dir = tempfile.gettempdir()
tmp = os.path.join(tmp_dir, "sc_ocr_temp.png")
# Normalize path for safety
tmp = os.path.normpath(tmp)
img = gui.screenshot(region=(x, y, width, height))
img.save(tmp)
# Use escaped single-quote path in PowerShell (path is from tempfile so it's safe)
ps_path = tmp.replace("'", "''")
script = f"""
Add-Type -AssemblyName System.Runtime.WindowsRuntime
$bytes = [System.IO.File]::ReadAllBytes('{ps_path}')
try {{
[Windows.Storage.StorageFile, Windows.Storage, ContentType=WindowsRuntime] | Out-Null
[Windows.Media.Ocr.OcrEngine, Windows.Media.Ocr, ContentType=WindowsRuntime] | Out-Null
Write-Output "INFO: Windows OCR requires async API. Use pytesseract for better results."
Write-Output "INFO: Screenshot saved at '{ps_path}'"
}} catch {{
Write-Output "INFO: OCR engine not available. Screenshot at '{ps_path}'"
}}
"""
stdout, _, code = _run_ps(script, timeout=10)
return stdout if stdout else f"INFO: Screenshot saved at {tmp}"
def ocr_full(lang="chi_sim+eng"):
"""OCR the entire screen."""
gui = _get_gui()
size = gui.size()
ocr_region(0, 0, size.width, size.height, lang)
def find_image(template_path, confidence=0.9):
"""Find an image template on screen. Returns position or error."""
import glob
# Search in screenshots dir if relative path
if not os.path.isabs(template_path):
paths = glob.glob(os.path.join(SCREENSHOT_DIR, template_path))
if paths:
template_path = paths[0]
else:
paths = glob.glob(os.path.join(SCRIPT_DIR, "..", "assets", template_path))
if paths:
template_path = paths[0]
if not os.path.exists(template_path):
print(f"ERROR: Template image not found: {template_path}")
return
gui = _get_gui()
try:
location = gui.locateOnScreen(template_path, confidence=confidence)
if location:
center = gui.center(location)
result = json.dumps({
"found": True,
"x": center.x,
"y": center.y,
"width": location.width,
"height": location.height
})
print(f"OK: Found template at center ({center.x}, {center.y})")
print(result)
else:
print(f"OK: Template not found on screen (confidence threshold: {confidence})")
except Exception as e:
print(f"ERROR: {e}")
def click_image(template_path, button="left", confidence=0.9, offset_x=0, offset_y=0):
"""Find an image on screen and click it."""
import glob
if not os.path.isabs(template_path):
paths = glob.glob(os.path.join(SCREENSHOT_DIR, template_path))
if paths:
template_path = paths[0]
if not os.path.exists(template_path):
print(f"ERROR: Template image not found: {template_path}")
return
gui = _get_gui()
try:
location = gui.locateOnScreen(template_path, confidence=confidence)
if location:
center = gui.center(location)
target_x = center.x + offset_x
target_y = center.y + offset_y
gui.click(x=target_x, y=target_y, button=button)
print(f"OK: Clicked template at ({target_x}, {target_y})")
else:
print(f"ERROR: Template not found on screen")
except Exception as e:
print(f"ERROR: {e}")
def find_color(target_color, region=None):
"""Find all pixels matching a color on screen. Color as (R,G,B) or hex."""
gui = _get_gui()
if isinstance(target_color, str):
# Parse hex color like "#FF0000"
target_color = target_color.lstrip("#")
target_color = tuple(int(target_color[i:i+2], 16) for i in (0, 2, 4))
if region:
img = gui.screenshot(region=region)
offset_x, offset_y = region[0], region[1]
else:
img = gui.screenshot()
offset_x, offset_y = 0, 0
width, height = img.size
matches = []
tolerance = 10 # Color matching tolerance
# Sample every few pixels for performance
step = 2
for y_pos in range(0, height, step):
for x_pos in range(0, width, step):
pixel = img.getpixel((x_pos, y_pos))
if all(abs(pixel[i] - target_color[i]) <= tolerance for i in range(3)):
matches.append({
"x": x_pos + offset_x,
"y": y_pos + offset_y
})
if len(matches) >= 50:
break
if len(matches) >= 50:
break
if matches:
print(f"OK: Found {len(matches)} pixels matching color {target_color}")
print(f"INFO: First match at ({matches[0]['x']}, {matches[0]['y']})")
else:
print(f"OK: No pixels found matching color {target_color}")
return matches
def pixel_color(x, y):
"""Get the color of a pixel at (x, y)."""
gui = _get_gui()
pixel = gui.pixel(x, y)
print(json.dumps({
"x": x,
"y": y,
"RGB": [pixel.red, pixel.green, pixel.blue],
"hex": f"#{pixel.red:02X}{pixel.green:02X}{pixel.blue:02X}"
}))
def list_screenshots():
"""List previously saved screenshots."""
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
files = []
for f in sorted(os.listdir(SCREENSHOT_DIR)):
if f.lower().endswith((".png", ".jpg", ".jpeg", ".bmp")):
fp = os.path.join(SCREENSHOT_DIR, f)
files.append({
"name": f,
"path": fp,
"size_bytes": os.path.getsize(fp)
})
if files:
print(json.dumps(files, ensure_ascii=False, indent=2))
else:
print("OK: No screenshots saved yet")
# ========== Main CLI ==========
def main():
parser = argparse.ArgumentParser(description="GUI Controller - Mouse, Keyboard, Screenshot, OCR")
sub = parser.add_subparsers(dest="category")
# Mouse
p_mouse = sub.add_parser("mouse", help="Mouse control")
mouse_sub = p_mouse.add_subparsers(dest="action")
mouse_sub.add_parser("position", help="Get current mouse position")
m_move = mouse_sub.add_parser("move", help="Move mouse")
m_move.add_argument("--x", type=int, required=True)
m_move.add_argument("--y", type=int, required=True)
m_move.add_argument("--duration", type=float, default=0.3)
m_click = mouse_sub.add_parser("click", help="Left click")
m_click.add_argument("--x", type=int)
m_click.add_argument("--y", type=int)
m_rclick = mouse_sub.add_parser("right-click", help="Right click")
m_rclick.add_argument("--x", type=int)
m_rclick.add_argument("--y", type=int)
m_dclick = mouse_sub.add_parser("double-click", help="Double click")
m_dclick.add_argument("--x", type=int)
m_dclick.add_argument("--y", type=int)
m_drag = mouse_sub.add_parser("drag", help="Drag from A to B")
m_drag.add_argument("--start-x", type=int, required=True)
m_drag.add_argument("--start-y", type=int, required=True)
m_drag.add_argument("--end-x", type=int, required=True)
m_drag.add_argument("--end-y", type=int, required=True)
m_drag.add_argument("--duration", type=float, default=0.5)
m_scroll = mouse_sub.add_parser("scroll", help="Scroll wheel")
m_scroll.add_argument("--x", type=int)
m_scroll.add_argument("--y", type=int)
m_scroll.add_argument("--clicks", type=int, default=5)
m_scroll.add_argument("--direction", choices=["up", "down"], default="up")
# Keyboard
p_kb = sub.add_parser("keyboard", help="Keyboard control")
kb_sub = p_kb.add_subparsers(dest="action")
kb_type = kb_sub.add_parser("type", help="Type text")
kb_type.add_argument("--text", type=str, required=True)
kb_press = kb_sub.add_parser("press", help="Press key or combo")
kb_press.add_argument("--keys", type=str, required=True, help="e.g., 'ctrl+c', 'alt+tab'")
kb_down = kb_sub.add_parser("key-down", help="Hold key")
kb_down.add_argument("--key", type=str, required=True)
kb_up = kb_sub.add_parser("key-up", help="Release key")
kb_up.add_argument("--key", type=str, required=True)
# Screenshot
p_ss = sub.add_parser("screenshot", help="Screenshot capture")
ss_sub = p_ss.add_subparsers(dest="action")
ss_sub.add_parser("full", help="Full screen")
ss_sub.add_parser("active-window", help="Active window")
ss_region = ss_sub.add_parser("region", help="Screen region")
ss_region.add_argument("--x", type=int, required=True)
ss_region.add_argument("--y", type=int, required=True)
ss_region.add_argument("--width", type=int, required=True)
ss_region.add_argument("--height", type=int, required=True)
ss_list = ss_sub.add_parser("list", help="List saved screenshots")
ss_size = ss_sub.add_parser("size", help="Get screen resolution")
# Visual / OCR
p_vis = sub.add_parser("visual", help="Visual recognition and OCR")
vis_sub = p_vis.add_subparsers(dest="action")
vis_ocr = vis_sub.add_parser("ocr", help="OCR a screen region")
vis_ocr.add_argument("--x", type=int, default=0)
vis_ocr.add_argument("--y", type=int, default=0)
vis_ocr.add_argument("--width", type=int)
vis_ocr.add_argument("--height", type=int)
vis_ocr.add_argument("--lang", type=str, default="chi_sim+eng")
vis_find = vis_sub.add_parser("find", help="Find image template on screen")
vis_find.add_argument("--template", type=str, required=True)
vis_find.add_argument("--confidence", type=float, default=0.9)
vis_click_img = vis_sub.add_parser("click-image", help="Find and click an image")
vis_click_img.add_argument("--template", type=str, required=True)
vis_click_img.add_argument("--confidence", type=float, default=0.9)
vis_click_img.add_argument("--offset-x", type=int, default=0)
vis_click_img.add_argument("--offset-y", type=int, default=0)
vis_color = vis_sub.add_parser("find-color", help="Find pixels by color")
vis_color.add_argument("--color", type=str, required=True, help="Hex color, e.g., '#FF0000'")
vis_color.add_argument("--x", type=int)
vis_color.add_argument("--y", type=int)
vis_color.add_argument("--width", type=int)
vis_color.add_argument("--height", type=int)
vis_pixel = vis_sub.add_parser("pixel", help="Get color of a pixel")
vis_pixel.add_argument("--x", type=int, required=True)
vis_pixel.add_argument("--y", type=int, required=True)
args = parser.parse_args()
try:
# Mouse
if args.category == "mouse":
if args.action == "position":
mouse_position()
elif args.action == "move":
mouse_move(args.x, args.y, args.duration)
elif args.action == "click":
mouse_click(args.x, args.y)
elif args.action == "right-click":
mouse_right_click(args.x, args.y)
elif args.action == "double-click":
mouse_double_click(args.x, args.y)
elif args.action == "drag":
mouse_drag(args.start_x, args.start_y, args.end_x, args.end_y, args.duration)
elif args.action == "scroll":
mouse_scroll(args.x, args.y, args.clicks, args.direction)
else:
p_mouse.print_help()
# Keyboard
elif args.category == "keyboard":
if args.action == "type":
keyboard_type(args.text)
elif args.action == "press":
keyboard_press(args.keys)
elif args.action == "key-down":
keyboard_key_down(args.key)
elif args.action == "key-up":
keyboard_key_up(args.key)
else:
p_kb.print_help()
# Screenshot
elif args.category == "screenshot":
if args.action == "full":
screenshot_full()
elif args.action == "active-window":
screenshot_active_window()
elif args.action == "region":
screenshot_region(args.x, args.y, args.width, args.height)
elif args.action == "list":
list_screenshots()
elif args.action == "size":
get_screen_size()
else:
p_ss.print_help()
# Visual
elif args.category == "visual":
if args.action == "ocr":
gui = _get_gui()
size = gui.size()
w = args.width if args.width else size.width
h = args.height if args.height else size.height
ocr_region(args.x, args.y, w, h, args.lang)
elif args.action == "find":
find_image(args.template, args.confidence)
elif args.action == "click-image":
click_image(args.template, confidence=args.confidence,
offset_x=args.offset_x, offset_y=args.offset_y)
elif args.action == "find-color":
region = None
if all(hasattr(args, a) and getattr(args, a) is not None
for a in ["x", "y", "width", "height"]):
region = (args.x, args.y, args.width, args.height)
find_color(args.color, region)
elif args.action == "pixel":
pixel_color(args.x, args.y)
else:
p_vis.print_help()
else:
parser.print_help()
except KeyboardInterrupt:
print("\nInterrupted by user (mouse moved to corner as failsafe)")
except Exception as e:
print(f"ERROR: {e}")
if __name__ == "__main__":
main()
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)