Colmap稀疏重建会生成稀疏的3D点云,3D点对应图像上的像素坐标、每幅图像的位姿、相机内参数。这些信息存放在工作目录下的sparse目录下,拿到核心数据和和图像后尝试通过双目视觉生成深度图,做稠密重建。(也可以让claude code完成)

1、数据准备

1.1 原始数据

原始数据存放在工程目录下的sparse/0images两个文件夹下面。
找到 COLMAP 中sparse/0目录下的三个标准文件:

COLMAP文件 内容 解析结果
cameras.txt 相机型号、分辨率、内参 cameras 字典
images.txt 图像位姿(四元数+平移)和观测点 images + observations
points3D.txt 3D点坐标和颜色 points3D 字典

准备两张图像:
在COLMAP的images图像中找到两张有重叠区域的图像,记录下文件名字,后面还要从images.txt和points3D.txt中提取特征点坐标、图像位姿、共视场的3D点等信息。

1.2 核心数据提取和处理

先解析原始文件,拿到所有图像的稀疏重建结果,原始文件数据格式可以参考之前的博客ColMap稀疏重建+OpenMVS

1.2.1 解析原始数据

1.2.1.1 解析cameras.txt

这一步骤主要提取相机内参数,sparse/0下存放的是稀疏重建过程中优化过的相机内参数,内参数个数和选择的相机模型有关。解析代码如下所示:

 with open(os.path.join(sparse_path, 'cameras.txt'), 'r') as f:
      for line in f:
          if line.startswith('#'):
              continue
          parts = line.strip().split()
          if len(parts) >= 4:
              cameras[int(parts[0])] = {
                  'model': parts[1],
                  'width': int(parts[2]),
                  'height': int(parts[3]),
                  'params': [float(p) for p in parts[4:]]
              }

cameras中存储的结果如下所示:

{1: {'model': 'OPENCV', 'width': 5280, 'height': 3956, 'params': [3181.4540285015, 3181.5465179936455, 2659.0835819355725, 1972.584134761328, -0.07435581224091158, -0.009373703822089241, -0.0003833151276594362, -0.00037182807716860835]}}

1.2.1.2 解析images.txt

这里面主要存放每张图像的位置姿态和观测点信息,解析代码如下:

with open(os.path.join(sparse_path, 'images.txt'), 'r') as f:
    lines = [l for l in f.readlines() if not l.startswith('#')]
    for i in range(0, len(lines), 2):
        parts = lines[i].strip().split()
        if len(parts) >= 10:
            image_id = int(parts[0])
            qw, qx, qy, qz = float(parts[1]), float(parts[2]), float(parts[3]), float(parts[4])
            tx, ty, tz = float(parts[5]), float(parts[6]), float(parts[7])
            name = parts[9]

            R = Rotation.from_quat([qx, qy, qz, qw]).as_matrix()

            images[image_id] = {
                'name': name,
                'camera_id': int(parts[8]),
                'q': [qw, qx, qy, qz],
                't': [tx, ty, tz],
                'R': R
            }

            # 观测点
            obs_parts = lines[i+1].strip().split()
            obs_list = []
            for j in range(0, len(obs_parts), 3):
                if j + 2 < len(obs_parts):
                    x, y = float(obs_parts[j]), float(obs_parts[j+1])
                    point3d_id = int(obs_parts[j+2])
                    if point3d_id > 0:
                        obs_list.append({'x': x, 'y': y, 'point3d_id': point3d_id})
            observations[image_id] = obs_list

图像位姿信息存储在images,其中q代表四元数,t 表示平移量,R表示旋转矩阵,字典中的key值2表示图像id:

{2: {'name': 'DJI_20260414130013_0346_V.jpeg', 'camera_id': 1, 'q': [-0.0025348225434501503, 0.270148766979058, 0.962813797989279, -0.0016759402816661462], 't': [2.6207080587578133, 1.9554057469173642, 476.4982075687798], 'R': array([[-0.85402644,  0.52019742, -0.00578663],
       [ 0.52021442,  0.85403367, -0.00185768],
       [ 0.00397562, -0.0045968 , -0.99998153]])}}

观测点信息存储在observations,截断后如下图所示,字典中的key值2表示图像id:

{2: [{'x': 3405.968994140625, 'y': 18.37702751159668, 'point3d_id': 29111}, {'x': 4053.47509765625, 'y': 21.25806427001953, 'point3d_id': 124439}, {'x': 4694.0625, 'y': 20.730045318603516, 'point3d_id': 163115}, {'x': 2291.442626953125, 'y': 22.975664138793945, 'point3d_id': 28885}, {'x': 4161.3798828125, 'y': 21.80597686767578, 'point3d_id': 64871}

1.2.1.3 解析points3D.txt

这里主要提取3D点坐标,存放在points3D中,部分坐标如下图所示,66195就是3D点坐标和observations中的point3d_id相对应:

{66195: {'X': [-21.20274576552423, -20.23251208649, 426.2565654427749], 'rgb': [99, 97, 82]}, 66202: {'X': [-19.068800230311567, -40.23213922604851, 426.1895808890543], 'rgb': [168, 157, 135]}, 66204: {'X': [-18.73671501425717, -39.71555546210877, 426.21430727349775], 'rgb': [153, 148, 129]}, 66209: {'X': [-16.435486041637958, -67.97927640756718, 426.96049004134557], 'rgb': [178, 170, 148]}...}

1.2.2 加工原始数据

1.2.2.1 计算yaw角和基线

分析images中图像的EXIF信息或者直观查看图像,找到一组图像,记录下文件名,可以根据images.txt中提取的images提取图像id。根据图像id获取两幅图像的旋转矩阵和平移量。

R1 = [[-0.9616942  -0.27411445 -0.00235083]
 [-0.27410779  0.96169488 -0.00280549]
 [ 0.00302981 -0.00205364 -0.9999933 ]]
R2 = [[-0.96027438 -0.27902479 -0.00427451]
 [-0.27900617  0.96027887 -0.00447642]
 [ 0.00535375 -0.00310598 -0.99998084]]
t1 = [4.46504256e-01 1.07021500e+02 4.76170087e+02]
t2 = [0.8112175  115.36848786 475.93598341]

COLMAP使用的世界坐标到相机坐标变换公式:

P_cam = R @ P_world + t

其中,R是3×3 旋转矩阵,表示世界坐标系到相机坐标系的变换关系,t为3×1 平移向量。
相机中心计算推导:
相机中心在相机坐标系的原点,坐标为(0,0,0),在世界坐标系下的坐标假设为C,通过上面的变换公式可以得到:

0 = R @ C + t  ->  C = -R.T @ t

两张图像相对关系计算推导:
假设有一个向量 v 在世界坐标系中,它在两个相机坐标系中的表示分别为:

v1 = R1 @ v      # 在相机1坐标系中
v2 = R2 @ v      # 在相机2坐标系中

我们想要从相机1坐标系变换到相机2坐标系:

v = R1.T @ v1    # 从相机1回到世界
v2 = R2 @ v      # 从世界到相机2
v2 = R2 @ R1.T @ v1

所以相对旋转为:

R_rel = R2 @ R1.T    (相机1 → 相机2

根据上面内容计算相对位置姿态代码实现:

# 相对旋转,相机2相对相机1旋转了多少,后面转欧拉角进而计算偏航角
from scipy.spatial.transform import Rotation
R1 = np.array([[-0.9616942  -0.27411445 -0.00235083]
 [-0.27410779  0.96169488 -0.00280549]
 [ 0.00302981 -0.00205364 -0.9999933 ]])
R2 = np.array([[-0.96027438 -0.27902479 -0.00427451]
 [-0.27900617  0.96027887 -0.00447642]
 [ 0.00535375 -0.00310598 -0.99998084]])
t1 = np.array([4.46504256e-01 1.07021500e+02 4.76170087e+02])
t2 = np.array([0.8112175  115.36848786 475.93598341])

R_rel = R2 @ R1.T
euler_rel = Rotation.from_matrix(R_rel).as_euler('xyz', degrees=True)

# 相机中心
C1 = -R1.T @ t1
C2 = -R2.T @ t2
baseline = np.linalg.norm(C2 - C1)  # 两个相机中心的欧式距离,也就是双目视觉中的基线

print(f"\nRelative pose:")
print(f"  Yaw diff: {abs(euler_rel[2]):.1f} degrees")
print(f"  Baseline: {baseline:.2f}m")
1.2.2.2 组装内参数
intrinsic = {
     'camera_model': cam1['model'],
     'width': cam1['width'],
     'height': cam1['height'],
     'fx': params1[0],
     'fy': params1[1],
     'cx': params1[2],
     'cy': params1[3],
     'distortion': params1[4:9] if len(params1) >= 9 else params1[4:]
    }
# 结果如下:
'''{'camera_model': 'OPENCV', 'width': 5280, 'height': 3956, 'fx': 3181.4540285015, 'fy': 3181.5465179936455, 'cx': 2659.0835819355725, 'cy': 1972.584134761328, 'distortion': [-0.07435581224091158, -0.009373703822089241, -0.0003833151276594362, -0.00037182807716860835]}'''
1.2.2.3 组装外参数

主要包含旋转矩阵、平移量、基线和yaw角差异:

extrinsic = {
    'image1': {
        'quaternion': img1_info['q'],
        'translation': img1_info['t'],
        'rotation_matrix': img1_info['R'].tolist()
    },
    'image2': {
        'quaternion': img2_info['q'],
        'translation': img2_info['t'],
        'rotation_matrix': img2_info['R'].tolist()
    },
    'relative_pose': {
        'yaw_diff_deg': abs(euler_rel[2]),
        'baseline': baseline
    }
}
1.2.2.4 提取两张图像中共同观测到的3D点

通过点id集合的交集计算:

obs1 = observations[img_id1]
obs2 = observations[img_id2]
point_ids1 = set(o['point3d_id'] for o in obs1)
point_ids2 = set(o['point3d_id'] for o in obs2)
common_ids = list(point_ids1 & point_ids2)
print(f"  Image1 points: {len(obs1)}")
print(f"  Image2 points: {len(obs2)}")
print(f"  Common points: {len(common_ids)}")
1.2.2.5 提取两张图像中匹配的像点

基本思路是遍历共视3D点id,获取3D点坐标,对应两张图像的像点坐标,同时根据这些信息计算点深度。将3D点的世界坐标带入坐标转换公式中计算出点在相机坐标系下的坐标取Z值的分量作为深度信息。
相关代码:

for pid in common_ids:
   if pid in points3D:
       p = points3D[pid]
       obs1_pts = [o for o in obs1 if o['point3d_id'] == pid]
       obs2_pts = [o for o in obs2 if o['point3d_id'] == pid]

       if obs1_pts and obs2_pts:
           obs1_pt = obs1_pts[0]
           obs2_pt = obs2_pts[0]

           # 计算深度 (COLMAP公式: P_cam = R @ P_world + t)
           X = np.array(p['X']) # 世界坐标系中的3D点坐标 [X, Y, Z]
           X_cam = img1_info['R'] @ X + t1 # 变换到相机坐标系
           depth = X_cam[2]  # 取相机坐标系Z分量 = 深度

           matches.append({
               'point3d_id': pid,
               'image1': {'x': obs1_pt['x'], 'y': obs1_pt['y']},
               'image2': {'x': obs2_pt['x'], 'y': obs2_pt['y']},
               'depth_image1': depth if depth > 0 else None
           })

统计两幅图像之间匹配点的视差和深度分布,视差代表同一个3D点在两张图像上的像素位置差异,统计方法如下:

if matches:
   dx_list = [m['image2']['x'] - m['image1']['x'] for m in matches]
   dy_list = [m['image2']['y'] - m['image1']['y'] for m in matches]
   depths = [m['depth_image1'] for m in matches if m['depth_image1']]

   print(f"\n5. Disparity statistics...")
   print(f"  X disparity: mean={np.mean(dx_list):.1f}px, range=[{np.min(dx_list):.0f}, {np.max(dx_list):.0f}]")
   print(f"  Y disparity: mean={np.mean(dy_list):.1f}px, range=[{np.min(dy_list):.0f}, {np.max(dy_list):.0f}]")
   if depths:
       print(f"  Depth: mean={np.mean(depths):.2f}m, range=[{np.min(depths):.2f}, {np.max(depths):.2f}]")

2、计算深度图生成稠密点云

2.1 计算深度图

在这里计算深度只用了图像1,然后就是图像2和图像1计算出来的深度值,并没有直接使用图像2。整个计算过程中没有通过两幅图像的纹理等信息协同计算,而时采用cubic插值的方法,根据所有3D点拟合曲面,这个就是稠密重建结果中整个面看起来很平滑,凸起来的地方也无法重建出来的原因。这一步骤没有物理约束,不同区域的点会互相影响,是有较大的优化空间的,后面再想办法优化一下。

def interpolate_depth(matches, img_width, img_height, scale=0.1):
    """
    从稀疏匹配点插值生成稠密深度图
    适用于任意yaw差,直接利用COLMAP的正确深度值
    """
    points_uv = np.array([[m['image1']['x'], m['image1']['y']] for m in matches])
    depths = np.array([m['depth_image1'] for m in matches])
    w_s = int(img_width * scale)
    h_s = int(img_height * scale)
    xi = np.linspace(0, img_width, w_s)
    yi = np.linspace(0, img_height, h_s)
    xi_grid, yi_grid = np.meshgrid(xi, yi)
    # cubic插值
    depth_grid = griddata(points_uv, depths, (xi_grid, yi_grid),
                          method='cubic', fill_value=0)
    return depth_grid, scale

2.2 根据深度计算3D坐标

这里核心是使用反投影公式,从图像像素和深度还原3D点在相机坐标系中的位置。
参考公式:
相机成像模型(正向投影)
相机坐标系中的3D点 (X,Y,Z)(X, Y, Z)(X,Y,Z) 投影到图像平面:
x=fx⋅XZ+cxx = f_x \cdot \frac{X}{Z} + c_xx=fxZX+cx
y=fy⋅YZ+cyy = f_y \cdot \frac{Y}{Z} + c_yy=fyZY+cy
其中:

  • fx,fyf_x, f_yfx,fy:焦距(像素单位)
  • cx,cyc_x, c_ycx,cy:光心位置(图像中心偏移)

反投影公式
已知像素坐标 (u,v)(u, v)(u,v) 和深度 ZZZ,求解相机坐标 (X,Y,Z)(X, Y, Z)(X,Y,Z)
X=(u−cx)⋅ZfxX = \frac{(u - c_x) \cdot Z}{f_x}X=fx(ucx)Z
Y=(v−cy)⋅ZfyY = \frac{(v - c_y) \cdot Z}{f_y}Y=fy(vcy)Z

示例:

相机参数:
fx = 3000, fy = 3000
cx = 2640, cy = 1978(图像中心)
像素点和深度:
u = 3042, v = 2254(像素坐标)
depth = 50m
反投影计算:
X = (3042 - 2640) × 50 / 3000 = 402 × 50 / 3000 = 6.7m
Y = (2254 - 1978) × 50 / 3000 = 276 × 50 / 3000 = 4.6m
Z = 50m
结果:该像素对应的3D点在相机前方50m处,
偏右6.7m,偏上4.6m

反投影公式实现的代码:

def backproject_to_camera(u, v, depth, fx, fy, cx, cy):
    """反投影:像素坐标 + 深度 → 相机坐标系"""
    X = (u - cx) * depth / fx
    Y = (v - cy) * depth / fy
    Z = depth
    return X, Y, Z

注意这里计算出来的是相机坐标系下的坐标,单位是m
相机坐标系转世界坐标系相关代码:

def transform_to_world(P_cam, R, t):
    """转换:相机坐标系 → 世界坐标系 (COLMAP公式)"""
    P_world = (R.T @ (P_cam - t.reshape(1, 3)).T).T
    return P_world

3、附录

prepare_data.py相关代码,主要负责核心数据的提取:

#!/usr/bin/env python3
"""
prepare_data.py - 提取图像对数据

从COLMAP重建结果中提取:
- 两张图像
- 内参数
- 外参数(两张图的位姿)
- 3D点
- 匹配点坐标
"""

import os
import numpy as np
import json
import shutil
from scipy.spatial.transform import Rotation


def parse_colmap_sparse(sparse_path):
    """解析COLMAP稀疏重建"""

    cameras = {}
    images = {}
    points3D = {}
    observations = {}

    # cameras.txt
    with open(os.path.join(sparse_path, 'cameras.txt'), 'r') as f:
        for line in f:
            if line.startswith('#'):
                continue
            parts = line.strip().split()
            if len(parts) >= 4:
                cameras[int(parts[0])] = {
                    'model': parts[1],
                    'width': int(parts[2]),
                    'height': int(parts[3]),
                    'params': [float(p) for p in parts[4:]]
                }

    # images.txt
    with open(os.path.join(sparse_path, 'images.txt'), 'r') as f:
        lines = [l for l in f.readlines() if not l.startswith('#')]
        for i in range(0, len(lines), 2):
            parts = lines[i].strip().split()
            if len(parts) >= 10:
                image_id = int(parts[0])
                qw, qx, qy, qz = float(parts[1]), float(parts[2]), float(parts[3]), float(parts[4])
                tx, ty, tz = float(parts[5]), float(parts[6]), float(parts[7])
                name = parts[9]

                R = Rotation.from_quat([qx, qy, qz, qw]).as_matrix()

                images[image_id] = {
                    'name': name,
                    'camera_id': int(parts[8]),
                    'q': [qw, qx, qy, qz],
                    't': [tx, ty, tz],
                    'R': R
                }

                # 观测点
                obs_parts = lines[i+1].strip().split()
                obs_list = []
                for j in range(0, len(obs_parts), 3):
                    if j + 2 < len(obs_parts):
                        x, y = float(obs_parts[j]), float(obs_parts[j+1])
                        point3d_id = int(obs_parts[j+2])
                        if point3d_id > 0:
                            obs_list.append({'x': x, 'y': y, 'point3d_id': point3d_id})
                observations[image_id] = obs_list

    # points3D.txt
    with open(os.path.join(sparse_path, 'points3D.txt'), 'r') as f:
        for line in f:
            if line.startswith('#'):
                continue
            parts = line.strip().split()
            if len(parts) >= 4:
                points3D[int(parts[0])] = {
                    'X': [float(parts[1]), float(parts[2]), float(parts[3])],
                    'rgb': [int(parts[4]), int(parts[5]), int(parts[6])] if len(parts) >= 7 else [128, 128, 128]
                }

    return cameras, images, points3D, observations


def find_image_by_name(images, target_name):
    """根据文件名查找图像ID"""
    for image_id, info in images.items():
        if info['name'] == target_name:
            return image_id, info
    return None, None


def main():
    # 输入路径
    sparse_path = "sparse/0"
    image_dir = "images"

    # 输出路径
    output_dir = "data_5"
    os.makedirs(output_dir, exist_ok=True)

    print("=" * 70)
    print("Prepare data for DJI_0157 and DJI_0158")
    print("=" * 70)

    # 解析COLMAP
    cameras, images, points3D, observations = parse_colmap_sparse(sparse_path)

    print(f"\nTotal images: {len(images)}")

    # 查找目标图像
    img1_name = "DJI_20260414134720_0157_V.jpeg"
    img2_name = "DJI_20260414134722_0158_V.jpeg"

    img_id1, img1_info = find_image_by_name(images, img1_name)
    img_id2, img2_info = find_image_by_name(images, img2_name)

    if img_id1 is None or img_id2 is None:
        print(f"ERROR: Cannot find images!")
        print(f"  Looking for: {img1_name}, {img2_name}")
        print(f"  Available images:")
        for id, info in list(images.items())[:10]:
            print(f"    ID={id}: {info['name']}")
        return

    print(f"\nFound images:")
    print(f"  Image1: ID={img_id1}, {img1_name}")
    print(f"  Image2: ID={img_id2}, {img2_name}")

    # 计算相对位姿
    R1 = img1_info['R']
    R2 = img2_info['R']
    t1 = np.array(img1_info['t'])
    t2 = np.array(img2_info['t'])

    # 相对旋转
    R_rel = R2 @ R1.T
    euler_rel = Rotation.from_matrix(R_rel).as_euler('xyz', degrees=True)

    # 相机中心
    C1 = -R1.T @ t1
    C2 = -R2.T @ t2
    baseline = np.linalg.norm(C2 - C1)

    print(f"\nRelative pose:")
    print(f"  Yaw diff: {abs(euler_rel[2]):.1f} degrees")
    print(f"  Baseline: {baseline:.2f}m")

    # 1. 复制图像
    print("\n1. Copy images...")
    shutil.copy(os.path.join(image_dir, img1_name), os.path.join(output_dir, "image1.jpeg"))
    shutil.copy(os.path.join(image_dir, img2_name), os.path.join(output_dir, "image2.jpeg"))
    print(f"  image1.jpeg <- {img1_name}")
    print(f"  image2.jpeg <- {img2_name}")

    # 2. 内参数
    print("\n2. Intrinsic parameters...")
    cam1 = cameras[img1_info['camera_id']]
    params1 = cam1['params']

    intrinsic = {
        'camera_model': cam1['model'],
        'width': cam1['width'],
        'height': cam1['height'],
        'fx': params1[0],
        'fy': params1[1],
        'cx': params1[2],
        'cy': params1[3],
        'distortion': params1[4:9] if len(params1) >= 9 else params1[4:]
    }

    with open(os.path.join(output_dir, "intrinsic.json"), 'w') as f:
        json.dump(intrinsic, f, indent=2)
    print(f"  fx={intrinsic['fx']:.2f}, fy={intrinsic['fy']:.2f}")
    print(f"  Size: {intrinsic['width']}x{intrinsic['height']}")

    # 3. 外参数
    print("\n3. Extrinsic parameters...")
    extrinsic = {
        'image1': {
            'quaternion': img1_info['q'],
            'translation': img1_info['t'],
            'rotation_matrix': img1_info['R'].tolist()
        },
        'image2': {
            'quaternion': img2_info['q'],
            'translation': img2_info['t'],
            'rotation_matrix': img2_info['R'].tolist()
        },
        'relative_pose': {
            'yaw_diff_deg': abs(euler_rel[2]),
            'baseline': baseline
        }
    }

    with open(os.path.join(output_dir, "extrinsic.json"), 'w') as f:
        json.dump(extrinsic, f, indent=2)
    print(f"  Yaw diff: {abs(euler_rel[2]):.1f} deg")
    print(f"  Baseline: {baseline:.2f}m")

    # 4. 共同观测的3D点
    print("\n4. Common 3D points...")
    obs1 = observations[img_id1]
    obs2 = observations[img_id2]

    point_ids1 = set(o['point3d_id'] for o in obs1)
    point_ids2 = set(o['point3d_id'] for o in obs2)
    common_ids = list(point_ids1 & point_ids2)

    print(f"  Image1 points: {len(obs1)}")
    print(f"  Image2 points: {len(obs2)}")
    print(f"  Common points: {len(common_ids)}")

    # 提取匹配点
    matches = []
    # for pid in common_ids[:500]:
    for pid in common_ids:
        if pid in points3D:
            p = points3D[pid]
            obs1_pts = [o for o in obs1 if o['point3d_id'] == pid]
            obs2_pts = [o for o in obs2 if o['point3d_id'] == pid]

            if obs1_pts and obs2_pts:
                obs1_pt = obs1_pts[0]
                obs2_pt = obs2_pts[0]

                # 计算深度 (COLMAP公式: P_cam = R @ P_world + t)
                X = np.array(p['X'])
                X_cam = img1_info['R'] @ X + t1
                depth = X_cam[2]

                matches.append({
                    'point3d_id': pid,
                    'image1': {'x': obs1_pt['x'], 'y': obs1_pt['y']},
                    'image2': {'x': obs2_pt['x'], 'y': obs2_pt['y']},
                    'depth_image1': depth if depth > 0 else None
                })

    with open(os.path.join(output_dir, "matches.json"), 'w') as f:
        json.dump(matches, f, indent=2)
    print(f"  matches.json: {len(matches)} points")

    # 视差统计
    if matches:
        dx_list = [m['image2']['x'] - m['image1']['x'] for m in matches]
        dy_list = [m['image2']['y'] - m['image1']['y'] for m in matches]
        depths = [m['depth_image1'] for m in matches if m['depth_image1']]

        print(f"\n5. Disparity statistics...")
        print(f"  X disparity: mean={np.mean(dx_list):.1f}px, range=[{np.min(dx_list):.0f}, {np.max(dx_list):.0f}]")
        print(f"  Y disparity: mean={np.mean(dy_list):.1f}px, range=[{np.min(dy_list):.0f}, {np.max(dy_list):.0f}]")
        if depths:
            print(f"  Depth: mean={np.mean(depths):.2f}m, range=[{np.min(depths):.2f}, {np.max(depths):.2f}]")

    print("\n" + "=" * 70)
    print("Data preparation complete!")
    print("=" * 70)
    print(f"\nOutput: {output_dir}")


if __name__ == "__main__":
    main()

stereo_pipeline.py主要负责生成深度图和点云,代码如下:

#!/usr/bin/env python3
"""
stereo_pipeline.py - 通用双目立体视觉点云生成流程

自动检测yaw差并选择合适的方法:
- 任意yaw差:使用稀疏点插值(利用COLMAP的正确深度)
- 小yaw差(可选):使用块匹配计算视差

流程:
1. 从COLMAP数据加载相机参数和匹配点
2. 检测yaw差
3. 生成深度图(插值或块匹配)
4. 反投影生成点云
5. 保存PLY和可视化结果

使用方法:
  python stereo_pipeline.py --data-dir data_2 --output-dir result_2
  python stereo_pipeline.py --data-dir data_3 --output-dir result_3
"""

import os
import cv2
import numpy as np
import json
import argparse
from scipy.interpolate import griddata
from scipy.spatial.transform import Rotation


def load_data(data_dir):
    """加载所有数据"""

    with open(os.path.join(data_dir, "intrinsic.json"), 'r') as f:
        intrinsic = json.load(f)

    with open(os.path.join(data_dir, "extrinsic.json"), 'r') as f:
        extrinsic = json.load(f)

    with open(os.path.join(data_dir, "matches.json"), 'r') as f:
        matches = json.load(f)

    img1 = cv2.imread(os.path.join(data_dir, "image1.jpeg"))
    img2 = cv2.imread(os.path.join(data_dir, "image2.jpeg"))

    return intrinsic, extrinsic, matches, img1, img2


def analyze_pose(extrinsic, matches):
    """分析相机位姿和视差"""

    # 获取baseline
    baseline = extrinsic['relative_pose']['baseline']

    # 获取yaw_diff(兼容新旧格式)
    if 'yaw_diff_deg' in extrinsic['relative_pose']:
        yaw_diff = extrinsic['relative_pose']['yaw_diff_deg']
    else:
        # 从旋转矩阵计算yaw差
        R1 = np.array(extrinsic['image1']['rotation_matrix'])
        R2 = np.array(extrinsic['image2']['rotation_matrix'])
        euler1 = Rotation.from_matrix(R1).as_euler('xyz', degrees=True)
        euler2 = Rotation.from_matrix(R2).as_euler('xyz', degrees=True)
        yaw_diff = abs(euler2[2] - euler1[2])
        yaw_diff = min(yaw_diff, 360 - yaw_diff)

    # 视差统计
    dx_list = [m['image2']['x'] - m['image1']['x'] for m in matches]
    dy_list = [m['image2']['y'] - m['image1']['y'] for m in matches]

    total_disp = [np.sqrt(dx**2 + dy**2) for dx, dy in zip(dx_list, dy_list)]
    avg_disp = np.mean(total_disp)

    # 计算有效基线
    depths = [m['depth_image1'] for m in matches if m['depth_image1']]
    avg_depth = np.mean(depths) if depths else 0

    return {
        'yaw_diff': yaw_diff,
        'baseline': baseline,
        'avg_disp': avg_disp,
        'avg_depth': avg_depth,
        'disp_x_range': (np.min(dx_list), np.max(dx_list)),
        'disp_y_range': (np.min(dy_list), np.max(dy_list))
    }


def interpolate_depth(matches, img_width, img_height, scale=0.1):
    """
    从稀疏匹配点插值生成稠密深度图

    适用于任意yaw差,直接利用COLMAP的正确深度值
    """

    points_uv = np.array([[m['image1']['x'], m['image1']['y']] for m in matches])
    depths = np.array([m['depth_image1'] for m in matches])

    w_s = int(img_width * scale)
    h_s = int(img_height * scale)

    xi = np.linspace(0, img_width, w_s)
    yi = np.linspace(0, img_height, h_s)
    xi_grid, yi_grid = np.meshgrid(xi, yi)

    # cubic插值
    depth_grid = griddata(points_uv, depths, (xi_grid, yi_grid),
                          method='cubic', fill_value=0)

    return depth_grid, scale


def backproject_to_camera(u, v, depth, fx, fy, cx, cy):
    """反投影:像素坐标 + 深度 → 相机坐标系"""

    X = (u - cx) * depth / fx
    Y = (v - cy) * depth / fy
    Z = depth

    return X, Y, Z


def transform_to_world(P_cam, R, t):
    """转换:相机坐标系 → 世界坐标系 (COLMAP公式)"""

    P_world = (R.T @ (P_cam - t.reshape(1, 3)).T).T
    return P_world


def get_colors(u_valid, v_valid, img):
    """批量获取颜色"""

    h, w = img.shape[:2]
    u_int = np.clip(u_valid.astype(np.int32), 0, w - 1)
    v_int = np.clip(v_valid.astype(np.int32), 0, h - 1)

    bgr = img[v_int, u_int]
    return bgr[:, [2, 1, 0]]  # BGR -> RGB


def save_ply(points, colors, path):
    """保存PLY格式"""

    header = """ply
format ascii 1.0
element vertex {}
property float x
property float y
property float z
property uchar red
property uchar green
property uchar blue
end_header
"""

    with open(path, 'w') as f:
        f.write(header.format(len(points)))
        for i in range(len(points)):
            x, y, z = points[i]
            r, g, b = colors[i]
            f.write(f"{x:.3f} {y:.3f} {z:.3f} {int(r)} {int(g)} {int(b)}\n")


def save_xyz(points, colors, path):
    """保存XYZ格式"""

    with open(path, 'w') as f:
        f.write("# X Y Z R G B\n")
        for i in range(len(points)):
            x, y, z = points[i]
            r, g, b = colors[i]
            f.write(f"{x:.3f} {y:.3f} {z:.3f} {int(r)} {int(g)} {int(b)}\n")


def validate_depth(depth_map, matches, scale, img_width, img_height):
    """验证深度质量"""

    h_depth, w_depth = depth_map.shape

    errors = []
    for m in matches[:50]:
        sparse_depth = m['depth_image1']
        u = m['image1']['x']
        v = m['image1']['y']

        # 正确的坐标映射
        u_s = int(u / img_width * w_depth)
        v_s = int(v / img_height * h_depth)

        # 边界检查
        u_s = min(max(u_s, 0), w_depth - 1)
        v_s = min(max(v_s, 0), h_depth - 1)

        interp_depth = depth_map[v_s, u_s]
        if interp_depth > 0 and sparse_depth > 0:
            error = abs(interp_depth - sparse_depth) / sparse_depth * 100
            errors.append(error)

    return np.mean(errors) if errors else 0, np.median(errors) if errors else 0


def run_pipeline(data_dir, output_dir, scale=0.1):
    """运行完整流程"""

    print("=" * 70)
    print("Stereo Vision Pipeline")
    print("=" * 70)

    os.makedirs(output_dir, exist_ok=True)

    # Step 1: 加载数据
    print("\n[Step 1] Load data...")
    intrinsic, extrinsic, matches, img1, img2 = load_data(data_dir)

    img_width = intrinsic['width']
    img_height = intrinsic['height']
    fx = intrinsic['fx']
    fy = intrinsic['fy']
    cx = intrinsic['cx']
    cy = intrinsic['cy']

    R = np.array(extrinsic['image1']['rotation_matrix'])
    t = np.array(extrinsic['image1']['translation'])

    print(f"  Image size: {img_width}x{img_height}")
    print(f"  Sparse points: {len(matches)}")

    # Step 2: 分析位姿
    print("\n[Step 2] Analyze pose...")
    pose_info = analyze_pose(extrinsic, matches)

    print(f"  Yaw difference: {pose_info['yaw_diff']:.1f} deg")
    print(f"  Baseline: {pose_info['baseline']:.2f}m")
    print(f"  Average disparity: {pose_info['avg_disp']:.0f}px")
    print(f"  Average depth: {pose_info['avg_depth']:.2f}m")

    # Step 3: 生成深度图(插值方法)
    print("\n[Step 3] Generate depth map (interpolation)...")
    print(f"  Method: Sparse point interpolation (适用于任意yaw差)")
    print(f"  Scale: {scale}")

    depth_map, scale_used = interpolate_depth(matches, img_width, img_height, scale)

    h_depth, w_depth = depth_map.shape
    valid_mask = depth_map > 0
    valid_count = np.sum(valid_mask)

    print(f"  Depth map size: {w_depth}x{h_depth}")
    print(f"  Valid pixels: {valid_count} ({valid_count/(h_depth*w_depth)*100:.1f}%)")

    if valid_count > 0:
        d_valid = depth_map[valid_mask]
        print(f"  Depth range: {np.min(d_valid):.2f} - {np.max(d_valid):.2f}m")
        print(f"  Mean depth: {np.mean(d_valid):.2f}m")

    # Step 4: 验证
    print("\n[Step 4] Validate depth...")
    mean_err, median_err = validate_depth(depth_map, matches, scale, img_width, img_height)
    print(f"  Mean error: {mean_err:.3f}%")
    print(f"  Median error: {median_err:.3f}%")

    # Step 5: 反投影到相机坐标
    print("\n[Step 5] Back-project to camera coordinates...")

    u_indices = np.linspace(0, img_width, w_depth)
    v_indices = np.linspace(0, img_height, h_depth)
    u_grid, v_grid = np.meshgrid(u_indices, v_indices)

    u_valid = u_grid[valid_mask]
    v_valid = v_grid[valid_mask]
    d_valid = depth_map[valid_mask]

    X_cam, Y_cam, Z_cam = backproject_to_camera(u_valid, v_valid, d_valid, fx, fy, cx, cy)
    P_cam = np.vstack([X_cam, Y_cam, Z_cam]).T

    print(f"  Points: {len(P_cam)}")
    print(f"  Camera Z range: [{np.min(Z_cam):.2f}, {np.max(Z_cam):.2f}]m")

    # Step 6: 转换到世界坐标
    print("\n[Step 6] Transform to world coordinates...")
    P_world = transform_to_world(P_cam, R, t)

    print(f"  X range: [{np.min(P_world[:,0]):.1f}, {np.max(P_world[:,0]):.1f}]m")
    print(f"  Y range: [{np.min(P_world[:,1]):.1f}, {np.max(P_world[:,1]):.1f}]m")
    print(f"  Z range: [{np.min(P_world[:,2]):.1f}, {np.max(P_world[:,2]):.1f}]m")

    # Step 7: 获取颜色
    print("\n[Step 7] Get colors...")
    colors = get_colors(u_valid, v_valid, img1)
    print(f"  Colors: {len(colors)}")

    # Step 8: 保存结果
    print("\n[Step 8] Save results...")

    # 深度图可视化
    if valid_count > 0:
        d_min = np.min(d_valid)
        d_max = np.max(d_valid)
        depth_vis = np.clip(depth_map, d_min, d_max)
        depth_norm = ((depth_vis - d_min) / (d_max - d_min) * 255).astype(np.uint8)
        depth_colored = cv2.applyColorMap(depth_norm, cv2.COLORMAP_JET)
        depth_colored[~valid_mask] = [0, 0, 0]
        cv2.imwrite(os.path.join(output_dir, "depth.png"), depth_colored)
        print(f"  depth.png")

    # 保存深度数据
    np.save(os.path.join(output_dir, "depth_map.npy"), depth_map)
    print(f"  depth_map.npy")

    # 点云
    ply_path = os.path.join(output_dir, "dense_pointcloud.ply")
    save_ply(P_world, colors, ply_path)
    print(f"  dense_pointcloud.ply")

    xyz_path = os.path.join(output_dir, "dense_pointcloud.xyz")
    save_xyz(P_world, colors, xyz_path)
    print(f"  dense_pointcloud.xyz")

    # 完成
    print("\n" + "=" * 70)
    print("Pipeline Complete!")
    print("=" * 70)
    print(f"\nTotal points: {len(P_world)}")
    print(f"Depth error: {median_err:.3f}% (median)")
    print(f"\nOutput: {output_dir}")
    print("\nView point cloud:")
    print("  meshlab dense_pointcloud.ply")
    print("  cloudcompare dense_pointcloud.ply")
    return P_world, colors, depth_map
def main():
    run_pipeline('data_5', 'result_5', 0.25)

if __name__ == "__main__":
    main()
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐