8. 油藏地质建模基础_2026-05-05_01-17-58
油藏地质建模基础理论教程
1.1 概念定义与工程意义
1.1.1 油藏地质建模的定义
油藏地质建模是指利用计算机技术,基于地质资料、地球物理数据、生产动态以及实验分析结果等信息,建立反映地下油藏空间分布特征和动态变化规律的三维数值模型的过程。该过程涉及地质解释、岩石物理特性评估、沉积相带划分等多个环节。
1.1.2 工程意义
油藏地质建模在石油工业中具有核心地位:
- 勘探阶段:通过建立区域构造模型,预测有利勘探区,降低钻探风险
- 开发规划:确定井位布置、注采方案,优化采收率
- 生产动态分析:模拟不同开采条件下的流体流动行为
- 经济性评价:为投资决策提供量化依据
# 油藏建模工作流程示例(伪代码)
class ReservoirModelingWorkflow:
def __init__(self):
self.geological_data = {} # 地质资料
self.seismic_data = {} # 地震数据
self.production_data = {} # 生产动态
def build_model(self):
"""建立三维油藏模型的主流程"""
phases = [
"数据预处理",
"构造建模",
"相带划分",
"属性预测"
]
for phase in phases:
print(f"执行阶段:{phase}")
yield phase # 生成器模式,便于进度追踪
return self.get_model()
1.1.3 工程应用实例
某海上油田开发项目中,通过地质建模确定了最佳井位布局,使采收率提高约15%,投资回收周期缩短2-3年。此类案例表明,准确的地质建模对于经济效益具有决定性影响。
1.2 关键参数与不确定性
1.2.1 关键地质参数
油藏地质建模涉及多个核心参数的量化:
P o r e V o l u m e = ϕ × V t o t a l Pore\ Volume = \phi \times V_{total} Pore Volume=ϕ×Vtotal
其中 ϕ \phi ϕ 为孔隙度, V t o t a l V_{total} Vtotal 为总体积。其他重要参数包括渗透率 K K K、含油饱和度 S o S_o So、毛净比等。
1.2.2 不确定性量化方法
由于地质数据的有限性和不完整性,建模过程必然存在不确定性:
P ( X ∣ D ) = P ( D ∣ X ) × P ( X ) P ( D ) P(X|D) = \frac{P(D|X) \times P(X)}{P(D)} P(X∣D)=P(D)P(D∣X)×P(X)
这是贝叶斯定理在油藏评价中的应用形式。常用评估方法包括蒙特卡洛模拟、序贯高斯模拟等。
1.2.3 不确定性传播分析
import numpy as np
from scipy.stats import norm
class UncertaintyAnalysis:
def __init__(self, data_points):
self.n = len(data_points)
def calculate_confidence_interval(self, value, sigma=0.1):
"""计算置信区间"""
z_score = 1.96 # 95%置信度对应的Z值
margin = z_score * sigma / np.sqrt(self.n)
return (value - margin, value + margin)
# 实际应用示例
uncertainty_analyzer = UncertaintyAnalysis(128) # 128个测井数据点
ci_upper, ci_lower = uncertainty_analyzer.calculate_confidence_interval(0.25, 0.05)
print(f"孔隙度置信区间:[{ci_lower:.3f}, {ci_upper:.3f}]")
1.2.4 参数敏感性分析
通过改变输入参数的范围,观察输出结果的变化幅度。常用方法包括Sobol指数、方差分解等高级统计工具。
1.3 数值模拟发展趋势
1.3.1 高性能计算应用
现代油藏模拟采用并行计算架构:
import multiprocessing as mp
from typing import List, Dict
class ParallelSimulation:
def __init__(self, num_processes=8):
self.processes = num_processes
def distribute_work(self, tasks: List[Dict],
worker_func) -> List[float]:
"""分布式任务分配"""
with mp.Pool(processes=self.processes) as pool:
results = pool.map(worker_func, tasks)
return results
# 模拟示例:多个区块同时计算压力分布
def calculate_pressure_block(block_id: int):
# 实际物理计算代码在此处
return 1.0 / (block_id + 1) # 简化示例
parallel_sim = ParallelSimulation(16)
workload = [{"id": i, "params": {}} for i in range(100)]
results = parallel_sim.distribute_work(workload, calculate_pressure_block)
1.3.2 机器学习融合技术
传统数值模拟与AI技术的结合正在改变行业格局:
y ^ = f N N ( x 1 , x 2 , . . . , x n ) + ϵ M L P \hat{y} = f_{NN}(x_1, x_2, ..., x_n) + \epsilon_{MLP} y^=fNN(x1,x2,...,xn)+ϵMLP
神经网络可以学习复杂非线性关系,辅助历史数据匹配和参数反演。
1.3.3 云计算平台
云原生架构使得大规模并行模拟成为可能:
# Kubernetes配置示例(油藏模拟集群)
apiVersion: apps/v1
kind: Deployment
metadata:
name: reservoir-simulator-cluster
spec:
replicas: 50
selector:
matchLabels:
app: simulator
template:
metadata:
labels:
app: simulator
spec:
containers:
- name: simulation-engine
image: reservoir/sim:v2.3.1
resources:
requests:
memory: "4Gi"
cpu: "2000m"
1.3.4 实时数据同化
将生产动态数据实时反馈到模型中,实现模型的自适应更新:
from datetime import datetime
class RealTimeDataAssimilation:
def __init__(self, model):
self.model = model
def assimilate_measurement(self, timestamp, observed_value,
uncertainty):
"""数据同化算法"""
time_diff = (datetime.now() - timestamp).total_seconds() / 3600.0
# 卡尔曼滤波更新
predicted_value = self.model.predict(time_diff)
kalman_gain = uncertainty ** 2 / (uncertainty ** 2 +
self.model.variance())
updated_state = predicted_value + kalman_gain * \
(observed_value - predicted_value)
return {
"timestamp": timestamp,
"predicted": predicted_value,
"updated": updated_value if 'updated' in locals() else None,
"gain": kalman_gain
}
1.3.5 可视化与交互界面
现代模拟软件强调三维可视化和交互式操作:
class VisualizationEngine:
def __init__(self):
self.scene = {}
def render_reservoir(self, model_data, output_format="vtk"):
"""渲染油藏模型"""
# 生成网格数据
grid_points = self.generate_mesh(model_data)
# 计算等值面
iso_surfaces = calculate_isocontours(grid_points)
return {
"format": output_format,
"surfaces": iso_surfaces,
"render_time_ms": sum(1 for _ in range(10)) * 5
}
# 示例调用
viz_engine = VisualizationEngine()
result = viz_engine.render_reservoir({"grid_size": [200, 200, 80]})
print(f"渲染完成,等值面数:{len(result['surfaces'])}")
1.3.6 开源工具链发展
近年来,开源项目如PyECLIPSE、OpenAMG等在学术界和工业界得到广泛应用。这些工具提供了灵活的API接口:
# 模拟工作流管理示例
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class SimulationJob:
name: str
input_files: list[str] = field(default_factory=list)
parameters: dict = field(default_factory=dict)
status: str = "pending"
output_dir: Optional[str] = None
def submit(self, scheduler):
"""提交到调度系统"""
self.status = "running"
# 实际调度逻辑在此处
def job_manager(jobs: list[SimulationJob]):
for job in jobs:
print(f"{job.name}: {job.status}")
# 创建模拟作业
jobs = [
SimulationJob("field_A", ["input_A.dat"], {"time_step": 3600}),
SimulationJob("field_B", ["input_B.dat"])
]
job_manager(jobs)
油藏地质建模技术正朝着智能化、自动化、实时化的方向发展。随着算法优化和计算能力提升,模型精度不断提高,为油气资源的高效开发提供坚实保障。
2. 数据准备与前处理技术
在油藏地质建模的初始阶段,原始数据的完整性与一致性直接决定了后续模拟结果的可靠性。本节将深入探讨测井资料标准化整合与地震资料条件化处理的核心流程。
2.1 测井资料标准化整合
多口井的测井曲线往往存在深度基准不一致、单位制不同(如英尺/米)以及数据缺失等问题。标准化整合旨在建立统一的坐标系,确保属性预测的空间连续性。主要步骤包括深度配准、单位换算及异常值剔除。
D a l i g n e d = D r a w + δ i D_{aligned} = D_{raw} + \delta_i Daligned=Draw+δi
其中 D a l i g n e d D_{aligned} Daligned 为对齐后的深度, δ i \delta_i δi 代表第 i i i 口井的相对位移量。对于数据缺失区域,通常采用线性插值或样条函数进行填充:
f ( x ) = f ( x 0 ) + f ( x 1 ) − f ( x 0 ) x 1 − x 0 ( x − x 0 ) f(x) = f(x_0) + \frac{f(x_1)-f(x_0)}{x_1-x_0}(x-x_0) f(x)=f(x0)+x1−x0f(x1)−f(x0)(x−x0)
对于高阶平滑需求,三次样条插值更为常用,其多项式形式为:
S i ( x ) = a i + b i ( x − x i ) + c i ( x − x i ) 2 + d i ( x − x i ) 3 S_i(x) = a_i + b_i(x-x_i) + c_i(x-x_i)^2 + d_i(x-x_i)^3 Si(x)=ai+bi(x−xi)+ci(x−xi)2+di(x−xi)3
以下代码演示了如何构建一个测井数据标准化类,实现深度对齐与单位转换。
import pandas as pd
from typing import List, Dict
import numpy as np
class WellLogStandardizer:
"""测井资料标准化整合工具"""
def __init__(self):
self.conversion_factors = {
'ft_to_m': 0.3048,
'psi_to_bar': 0.06895
}
def align_depths(self, well_data: pd.DataFrame, ref_depth_start: float) -> pd.DataFrame:
"""将单口井深度对齐到参考深度起点"""
offset = ref_depth_start - well_data['depth'].iloc[0]
well_data['aligned_depth'] = well_data['depth'] + offset
return well_data
def standardize_units(self, data: pd.DataFrame) -> pd.DataFrame:
"""批量转换测井单位"""
# 示例:将深度从英尺转换为米
if 'depth_ft' in data.columns:
data['depth_m'] = data['depth_ft'].astype(float) * self.conversion_factors['ft_to_m']
return data
def process_wells(self, well_list: List[Dict]) -> pd.DataFrame:
"""处理多口井数据流"""
master_df = pd.concat([
well['data'].copy().pipe(lambda x: self.align_depths(x, 0))
for well in well_list
], ignore_index=True)
return self.standardize_units(master_df)
# 模拟数据样例
raw_well_1 = {
'depth_ft': [5000.0, 5010.0, 5020.0],
'GR': [80, 85, 90]
}
processor = WellLogStandardizer()
result_df = processor.process_wells([{'data': pd.DataFrame(raw_well_1)}])
print(f"处理后深度范围:{result_df['depth_m'].min():.2f}m - {result_df['depth_m'].max():.2f}m")
上述代码利用 pandas 高效处理表格数据,通过偏移量计算实现深度基准统一。在实际工程应用中,需结合地质专家判断确定合理的 δ i \delta_i δi 值,避免过度校正导致的地质失真。对于异常值剔除,常采用 3 σ 3\sigma 3σ 原则,即移除偏离均值超过三个标准差的离群点:
Z = ∣ x − μ ∣ σ > 3.0 ⟹ x o u t l i e r Z = \frac{|x - \mu|}{\sigma} > 3.0 \implies x_{outlier} Z=σ∣x−μ∣>3.0⟹xoutlier
2.2 地震资料条件化处理
地震反射波记录中包含大量噪声成分,如面波、随机噪声及低频趋势项。条件化处理的目的是在不破坏振幅信息的前提下增强有效信号。常用的数学方法包括维纳滤波和预测反卷积:
S o u t ( f ) = H ( f ) ⋅ S i n ( f ) S_{out}(f) = H(f) \cdot S_{in}(f) Sout(f)=H(f)⋅Sin(f)
其中 H ( f ) H(f) H(f) 为滤波器传递函数,设计时需确保相位不畸变以保持同相轴清晰。对于振幅保持型处理,通常采用零相位滤波技术,其冲激响应满足对称性条件:
h ( n ) = h ( − n ) h(n) = h(-n) h(n)=h(−n)
import numpy as np
class SeismicConditioner:
"""地震资料条件化处理模块"""
def __init__(self, sample_rate_hz=25):
self.sample_rate = sample_rate_hz
self.nyquist = sample_rate / 2
def apply_bandpass_filter(self, trace: np.ndarray,
low_cut: float, high_cut: float) -> np.ndarray:
"""应用带通滤波器"""
# 构建滤波器系数 (简化版巴特沃斯设计逻辑)
nyquist_rad = self.sample_rate * np.pi
w1 = 2 * np.arcsin(low_cut / self.nyquist)
w2 = 2 * np.arcsin(high_cut / self.nyquist)
# 计算滤波器阶数
order = int(np.floor((np.log(w2/w1) + 5.641)/3))
# 此处省略复杂的 IIR 系数计算,直接演示频域滤波原理
fft_trace = np.fft.fft(trace, n=2**next(2**i for i in range(30) if 2**i >= len(trace)))
freq_axis = np.fft.fftfreq(len(trace), d=1/self.sample_rate)
mask = (np.abs(freq_axis) > low_cut / self.sample_rate) & \
(np.abs(freq_axis) < high_cut / self.sample_rate)
filtered_trace = np.real(np.fft.ifft(fft_trace * mask))
return filtered_trace
def denoise(self, data_block: np.ndarray) -> np.ndarray:
"""基于中值滤波去噪"""
window_size = 15
result = np.zeros_like(data_block)
for i in range(len(data_block)):
start = max(0, i - window_size // 2)
end = min(len(data_block), i + window_size // 2 + 1)
median_val = np.median(data_block[start:end])
result[i] = median_val if abs(data_block[i] - median_val) < 3.0 else data_block[i]
return result
# 应用示例
trace_data = np.random.normal(0, 1, 2048) + 0.5 * np.sin(np.linspace(0, 10*np.pi, 2048))
conditioner = SeismicConditioner()
cleaned_trace = conditioner.apply_bandpass_filter(trace_data, low_cut=30, high_cut=60)
denoised_trace = conditioner.denoise(cleaned_trace)
print(f"原始数据点数:{len(trace_data)}, 处理后点数:{len(denoised_trace)}")
在地震处理软件中,此类算法通常集成于更大的工作流引擎中。工程师可根据信噪比(SNR)指标动态调整滤波参数。对于复杂地质条件下的数据,建议结合层位约束进行联合反演预处理。通过上述标准化与条件化手段,原始资料被转化为高质量输入文件,为后续的油藏数值模拟奠定坚实基础。
【本章完】
3. 核心算法与网格生成
3.1 网格划分策略选择
油藏数值模拟的精度高度依赖于网格系统的几何结构与属性分布。选择合适的网格类型需要平衡计算效率、内存消耗与地质细节表征能力。
结构化网格 vs 非结构化网格
**结构化网格(Structured Grid)**采用笛卡尔坐标系,每个单元格由正交面界定:
┌─────────┬─────────┬─────────┐
│ Cell │ Cell │ Cell │
├────┬────┼────┬────┼────┬────┤
│Cell│Cell │Cell│Cell │...│ ... │
└───┴───┴───┴───┘
每个单元仅6个相邻面,便于快速邻居查找
**非结构化网格(Unstructured Grid)**允许任意多面体形状:
import numpy as np
from typing import List, Tuple
class GridStrategySelector:
"""网格划分策略选择器"""
def __init__(self):
# 地质复杂度指数,0-1范围
self.complexity_threshold = 0.65
def evaluate_geological_complexity(
self,
fault_count: int,
pinch_points: List[Tuple[float, float]],
heterogeneity_ratio: float
) -> float:
"""计算地质复杂度指数(0-1)"""
# 归一化故障数量影响
fault_factor = min(fault_count / 500.0, 1.0) * 0.3
# 尖灭点密度影响
pinch_factor = len(pinch_points) / max(1, len(pinch_points))
# 异质性权重
het_factor = heterogeneity_ratio ** 0.7
complexity_score = fault_factor + pinch_factor * 0.5 + het_factor * 0.2
return min(max(complexity_score, 0.0), 1.0)
def recommend_grid_type(
self,
complexity: float,
memory_limit_gb: int = 64
) -> dict:
"""推荐网格类型"""
if complexity < self.complexity_threshold and memory_limit_gb > 32:
return {
'type': 'structured_hex',
'reasoning': '简单构造,结构化六面体网格计算效率高',
'estimated_cells': 10_000_000,
'memory_usage_mb': 256
}
elif complexity < self.complexity_threshold:
return {
'type': 'structured_tet',
'reasoning': '中等复杂,六面体为主辅以四面体填充',
'estimated_cells': 8_000_000,
'memory_usage_mb': 192
}
else:
return {
'type': 'unstructured_tet',
'reasoning': '高复杂度,需要非结构化四面体网格精确拟合界面',
'estimated_cells': 5_000_000,
'memory_usage_mb': 128
}
# 应用示例
selector = GridStrategySelector()
complexity = selector.evaluate_geological_complexity(
fault_count=127,
pinch_points=[(150.5, 300.2), (450.0, 120.8)],
heterogeneity_ratio=0.35
)
recommendation = selector.recommend_grid_type(complexity)
print(f"推荐网格类型:{recommendation['type']}")
print(f"地质复杂度指数:{complexity:.3f}")
六面体 vs 四面体元素对比
| 特性 | 六面体 (Hexahedral) | 四面体 (Tetrahedral) |
|---|---|---|
| 节点数/单元 | 8个角点 | 4个角点 |
| 插值精度 | 二次及以上 | 线性为主 |
| 内存效率 | 较高 | 最优(约60%) |
| 界面拟合 | 阶梯近似 | 精确匹配 |
3.2 插值算法原理分析
构造属性场时,需要在离散数据点之间建立数学关系。不同插值方法在平滑度、计算成本和地质合理性上各有优劣。
Kriging(克里金)插值理论
Kriging基于空间自相关性假设,通过变异函数建模:
γ ( h ) = 1 2 n ∑ i = 1 n ∑ j = 1 n ( Z ( x i ) − Z ( x j ) ) 2 , ∣ x i − x j ∣ = h \gamma(h) = \frac{1}{2n}\sum_{i=1}^{n}\sum_{j=1}^{n}(Z(x_i) - Z(x_j))^2, \quad |x_i - x_j| = h γ(h)=2n1i=1∑nj=1∑n(Z(xi)−Z(xj))2,∣xi−xj∣=h
普通克里金(OK):假设均值恒定
import numpy as np
from scipy.spatial import distance_matrix
class OrdinaryKriging:
"""普通克里金插值实现"""
def __init__(self, nugget_ratio: float = 0.1):
self.nugget = nugget_ratio
def compute_sill(self, data: np.ndarray, lags: np.ndarray) -> tuple:
"""计算变异函数参数(简化版)"""
n = len(data)
h = np.median(lags[lags > 0])
# 经验公式估计基台值
sill_estimate = np.var(data) / (1 - self.nugget_ratio)
return {
'sill': sill_estimate,
'nugget': sill_estimate * self.nugget_ratio,
'range_h': h
}
def krige(self, known_points: np.ndarray, known_values: np.ndarray,
target_point: np.ndarray) -> float:
"""单点插值计算"""
# 简化演示:基于距离加权的近似克里金
distances = distance_matrix([target_point], known_points).flatten()
if np.all(distances == 0):
return known_values[0]
weights = 1.0 / (distances ** 2 + 0.01)
weights /= weights.sum()
prediction = np.dot(weights, known_values)
return prediction
# 数据示例
sample_locations = np.array([
[150.0, 320.0], [180.0, 290.0], [210.0, 340.0],
[160.0, 270.0], [200.0, 300.0]
])
sample_gr_values = np.array([85.2, 92.1, 78.5, 88.9, 90.3])
krige_model = OrdinaryKriging()
target_location = np.array([175.0, 315.0])
predicted_gr = krige_model.krige(sample_locations, sample_gr_values, target_location)
print(f"预测GR值:{predicted_gr:.2f} gAPI")
反距离加权(IDW)插值
基于距离衰减原理,无需假设空间自相关模式:
Z ( x 0 ) = ∑ i = 1 n ( 1 d i p ) z i ∑ i = 1 n ( 1 d i p ) Z(x_0) = \frac{\sum_{i=1}^{n}\left(\frac{1}{d_i^p}\right)z_i}{\sum_{i=1}^{n}\left(\frac{1}{d_i^p}\right)} Z(x0)=∑i=1n(dip1)∑i=1n(dip1)zi
其中 p p p 为幂次参数,通常取2。
import numpy as np
class InverseDistanceWeighting:
"""反距离加权插值器"""
def __init__(self, power: float = 2.0):
self.power = power
def interpolate(self, known_locs: np.ndarray,
known_values: np.ndarray,
target_loc: np.ndarray) -> float:
"""计算目标点插值"""
# 欧几里得距离
diffs = known_locs - target_loc
distances = np.sqrt(np.sum(diffs ** 2, axis=1))
# 避免除零错误
distances[distances == 0] = 1e-10
weights = 1.0 / (distances ** self.power)
prediction = np.dot(weights, known_values) / weights.sum()
return prediction
# 批量插值演示
idw_model = InverseDistanceWeighting(power=2.5)
grid_points = np.array([
[i*10 + j for i in range(5)]
for j in range(4, 7)
])
predictions = np.array([
idw_model.interpolate(sample_locations, sample_gr_values, loc)
for loc in grid_points
])
print(f"网格点插值结果范围:{predictions.min():.2f} - {predictions.max():.2f}")
径向基函数(RBF)插值
提供全局平滑解,适合数据稀疏区域:
S ( x ) = ∑ i = 1 n α i ϕ ( ∥ x − x i ∥ ) + β T x + c S(x) = \sum_{i=1}^{n}\alpha_i\phi(\|x-x_i\|) + \beta^T x + c S(x)=i=1∑nαiϕ(∥x−xi∥)+βTx+c
常用核函数包括高斯核 ϕ ( r ) = e − r 2 / σ 2 \phi(r) = e^{-r^2/\sigma^2} ϕ(r)=e−r2/σ2 和多二次核 ϕ ( r ) = ( c 2 + r 2 ) p / 2 \phi(r) = (c^2+r^2)^{p/2} ϕ(r)=(c2+r2)p/2。
3.3 地质界面追踪技术
Marching Cubes算法
将三维体积数据投影到等值面,通过预定义拓扑模板生成三角网格:
import numpy as np
from typing import List, Tuple
class MarchingCubes:
"""Marching Cubes实现类"""
# 15种配置表(简化版)
config_table = {
(0, 0): ((0, 2), (0, 3)),
(0, 1): ((0, 2), (0, 7)),
(0, 2): ((0, 4), (0, 5)),
# ... 完整配置表略
}
def __init__(self, voxel_size: float = 1.0):
self.voxel_size = voxel_size
def extract_surface(self, grid_data: np.ndarray,
threshold: float) -> Tuple[np.ndarray, np.ndarray]:
"""提取等值面"""
vertices = []
faces = []
# 遍历体素网格(简化循环结构)
for z in range(grid_data.shape[2] - 1):
for y in range(grid_data.shape[1] - 1):
for x in range(grid_data.shape[0] - 1):
cube = grid_data[x:y+2, y:z+2, z:z+2]
# 计算体素角点相对于阈值的位置
edge_bits = self._compute_edge_bits(cube, threshold)
if edge_bits != 0:
vertex_indices = self.config_table.get(edge_bits, ((),))
for idx in vertex_indices:
v_idx = x + y + z * 128 # 简化索引计算
vertices.append((v_idx, cube[idx]))
return np.array(vertices), np.array(faces)
def _compute_edge_bits(self, values: np.ndarray,
threshold: float) -> int:
"""计算边缘位图"""
edge_mask = 0
# 8个角点位置及其对应的边
corners_and_edges = [
(0, 1), (1, 3), (2, 5), (3, 7),
(4, 6), (5, 4), (6, 2), (7, 0)
]
for corner_idx, edge_idx in corners_and_edges:
if values[corner_idx] > threshold:
edge_mask |= 1 << edge_idx
return edge_mask
# 模拟地质数据生成
np.random.seed(42)
grid_shape = (80, 60, 50)
geological_interface_data = np.zeros(grid_shape)
# 创建倾斜界面
x, y, z = np.meshgrid(*[np.arange(s) for s in grid_shape])
interface_eq = x * 0.3 + y * 0.2 - z * 0.5 - 10.0
geological_interface_data = (interface_eq > -2.0).astype(float) * 100
# 提取界面
mc = MarchingCubes()
vertices, faces = mc.extract_surface(geological_interface_data, threshold=50.0)
print(f"提取的三角面数量:{len(faces)}")
print(f"顶点数量:{len(vertices)}")
隐式曲面表示法
使用距离函数 D ( x , y , z ) D(x,y,z) D(x,y,z) 描述界面,负值表示内部,正值表示外部:
$$D(x) = \begin{cases}
0 & x \in \text{Interface} \
< 0 & x \in \text{Interior} \
0 & x \in \text{Exterior}
\end{cases}$$
import numpy as np
class ImplicitSurface:
"""隐式曲面表示与操作"""
def __init__(self, initial_function: np.ndarray):
self.function = initial_function
self.gradient_cache = None
def compute_distance_transform(self) -> np.ndarray:
"""计算距离变换(简化版欧几里得)"""
# 实际应用中需要高效的最近邻搜索算法
distances = np.sqrt(np.sum((np.arange(len(self.function)) -
np.where(self.function > 0)[0])**2, axis=1))
return np.clip(distances, 0.0, None)
def level_set_operation(self, operation: str = 'erosion') -> np.ndarray:
"""水平集操作"""
if operation == 'erosion':
# 侵蚀:收缩界面
new_function = self.function - 1.5
return np.where(new_function < 0, 0,
np.where(self.function > 1.5, 1.0,
np.interp(
np.linspace(0, 1.5, len(new_function)),
[0, 1.5],
new_function)))
else:
return self.function
# 界面演化模拟
initial_interface = np.zeros((20, 20))
for i in range(len(initial_interface)):
initial_interface[i] = (i - 9) ** 2 / 100.0
implicit_surface = ImplicitSurface(initial_interface)
evolved_interface = implicit_surface.level_set_operation('erosion')
print(f"侵蚀后界面最大值:{evolved_interface.max():.3f}")
自适应网格细化(AMR)
在地质界面附近动态增加网格分辨率,平衡精度与计算成本:
import numpy as np
class AdaptiveMeshRefinement:
"""自适应网格细化模块"""
def __init__(self, base_resolution: int = 10):
self.base_res = base_resolution
def calculate_refinement_level(self,
gradient_magnitude: np.ndarray,
max_level: int = 4) -> np.ndarray:
"""基于梯度计算细化级别"""
# 归一化梯度
grad_norm = np.linalg.norm(gradient_magnitude, axis=-1)
# 确定最大细化级数
max_refinement = min(max(np.log2(grad_norm.max() / grad_norm.mean()),
max_level), max_level)
return max_refinement
def generate_adaptive_grid(self, domain_bounds: Tuple[float, float],
gradient_field: np.ndarray) -> dict:
"""生成自适应网格配置"""
base_cell = 1.0 / self.base_res
# 计算边界处的细化需求
x_range = domain_bounds[1] - domain_bounds[0]
refinement_config = {
'base_size': base_cell,
'max_refinement': max(2, self.calculate_refinement_level(gradient_field)),
'total_cells_estimate': int(x_range / (base_cell * 2 **
self.calculate_refinement_level(gradient_field)[0]))
}
return refinement_config
# 应用示例
amr = AdaptiveMeshRefinement()
refinement = amr.generate_adaptive_grid(
domain_bounds=(0.0, 1000.0),
gradient_field=np.random.exponential(1, (500, 500))
)
print(f"基础网格大小:{refinement['base_size']:.4f}")
print(f"最大细化级别:{refinement['max_refinement']}")
3.4 综合工作流示例
将上述模块整合为完整的数据处理流程:
from typing import List, Dict
import numpy as np
class GeologicalModelBuilder:
"""地质模型构建器"""
def __init__(self):
self.selector = GridStrategySelector()
self.idw_model = InverseDistanceWeighting(power=2.0)
def build_complete_workflow(
self,
geological_data: Dict[str, np.ndarray],
well_locations: List[Tuple[float, float]]
) -> dict:
"""构建完整建模工作流"""
# 1. 评估地质复杂度并选择网格类型
complexity = self.selector.evaluate_geological_complexity(
fault_count=len(np.where(geological_data['faults'] > 0)[0]),
pinch_points=geological_data.get('pinch_points', []),
heterogeneity_ratio=np.std(geological_data['porosity']) /
np.mean(geological_data['porosity'])
)
grid_recommendation = self.selector.recommend_grid_type(complexity)
# 2. 为每个数据点计算插值
interpolated_values = []
for point in geological_data.get('sample_points', well_locations):
pred_value = self.idw_model.interpolate(
np.array([well_locations]),
np.array([100.0, 95.0, 88.0, 92.0, 89.0]),
point
)
interpolated_values.append(pred_value)
return {
'grid_type': grid_recommendation['type'],
'complexity_index': complexity,
'interpolated_data': np.array(interpolated_values),
'workflow_status': 'completed'
}
# 最终测试
builder = GeologicalModelBuilder()
result = builder.build_complete_workflow(
geological_data={
'faults': np.random.randint(0, 2, (100,)),
'pinch_points': [(i*10, i*5) for i in range(5)],
'porosity': np.random.uniform(0.1, 0.35, (100,))
},
well_locations=[(100.0, 200.0), (150.0, 250.0)]
)
print(f"工作流完成状态:{result['workflow_status']}")
print(f"推荐网格类型:{result['grid_type']}")
【本章完】
4. 模型实现与质量管控
4.1 静态模型构建流程
在石油地质建模中,静态模型表征油藏某一时刻的地质状态,包括储层几何结构、岩性分布和物性参数。完整的静态模型构建遵循以下标准流程:
数据准备阶段
首先进行多源数据的融合与预处理:
from typing import Dict, List, Optional, Tuple
import numpy as np
from datetime import datetime
class StaticModelData:
"""静态模型数据容器"""
def __init__(self):
self.well_data = [] # 井数据
self.seismic_data = None # 地震数据
self.surface_samples = [] # 露头采样点
def load_and_preprocess(
self,
well_files: List[str],
seismic_file: str,
sample_locations: List[Tuple[float, float]]
) -> dict:
"""加载并预处理数据"""
preprocessing_log = {
'timestamp': datetime.now().isoformat(),
'data_sources': [],
'quality_metrics': {}
}
# 1. 井数据解析
well_records = []
for well_file in well_files:
try:
# 模拟读取DLIS/DFW格式数据
well_info = {
'well_id': well_file.split('_')[-1].replace('.dat', ''),
'depth_range': (1200.5, 3850.2),
'measurements': ['GR', 'RHOB', 'NPHI', 'SP']
}
well_records.append(well_info)
except Exception as e:
preprocessing_log['quality_metrics']['well_read_error'] = str(e)
self.well_data = well_records
# 2. 地震数据加载
seismic_volume = np.zeros((100, 100, 80))
# 模拟地震反演得到的层位属性
for z in range(seismic_volume.shape[2]):
seismic_volume[:, :, z] = np.sin(z * 0.05) + np.random.normal(0, 0.1, (100, 100))
self.seismic_data = seismic_volume
preprocessing_log['data_sources'].append({
'wells': len(well_records),
'seismic_stack_size': seismic_volume.shape
})
return preprocessing_log
# 数据质量验证函数
def validate_model_data(data: StaticModelData) -> Dict[str, float]:
"""验证模型数据的完整性和一致性"""
metrics = {}
# 井数据完整性检查
if data.well_data:
total_depth_range = sum(w['depth_range'][1] - w['depth_range'][0]
for w in data.well_data)
metrics['well_coverage'] = total_depth_range / (4000.0 * len(data.well_data))
# 地震数据质量评估
if data.seismic_data is not None:
# SNR估计(信噪比)
signal_power = np.mean(np.abs(data.seismic_data)) ** 2
noise_power = np.var(data.seismic_data.flatten())
snr_db = 10 * np.log10(signal_power / max(noise_power, 1e-15))
metrics['seismic_snr'] = float(snr_db)
return metrics
# 应用示例
model_data = StaticModelData()
preprocess_log = model_data.load_and_preprocess(
well_files=['well001.dat', 'well002.dat'],
seismic_file='seismic_stack.bin',
sample_locations=[(50.0, 50.0), (150.0, 150.0)]
)
quality_metrics = validate_model_data(model_data)
print(f"数据质量指标:{quality_metrics}")
网格生成与属性映射
完成数据预处理后,根据地质复杂度选择适当的网格类型(参考3.2节),并执行属性赋值:
class StaticModelBuilder:
"""静态模型构建器"""
def __init__(self, grid_config: dict):
self.grid = {
'type': grid_config.get('grid_type', 'structured'),
'dimensions': (100, 100, 40), # X, Y, Z尺寸
'cell_size': (15.0, 15.0, 2.5) # 米
}
def generate_grid_topology(self) -> np.ndarray:
"""生成网格拓扑结构"""
nx, ny, nz = self.grid['dimensions']
cell_offsets = {
'x': np.linspace(0, self.grid['cell_size'][0], nx+1),
'y': np.linspace(0, self.grid['cell_size'][1], ny+1),
'z': np.linspace(0, self.grid['cell_size'][2], nz+1)
}
# 创建单元连接表(简化版)
cell_connections = {}
for i in range(nx):
for j in range(ny):
for k in range(nz):
cell_id = (i, j, k)
neighbors = {
'E': (min(i+1, nx-1), j, k),
'W': (max(0, i-1), j, k),
'N': (i, min(j+1, ny-1), k),
'S': (i, max(0, j-1), k),
'U': (i, j, min(k+1, nz-1)),
'D': (i, j, max(0, k-1))
}
cell_connections[cell_id] = neighbors
return np.array(list(cell_connections.items()), dtype=object)
def map_attributes_to_grid(self,
attribute_type: str,
source_data: np.ndarray) -> np.ndarray:
"""将属性数据映射到网格单元"""
grid_volumes = {}
# 对于每个层位,计算属性平均值作为代表性值
for k in range(len(source_data)):
layer_mask = (self.grid['z'][k] <= source_data[:, :, k]) & \
(source_data[:, :, k] < self.grid['z'][k+1])
if np.any(layer_mask):
mean_value = np.mean(source_data[layer_mask, :, :])
grid_volumes[k] = mean_value
return np.array(grid_volumes)
# 构建静态模型实例
builder = StaticModelBuilder({
'grid_type': 'structured_hex',
'dimensions': (100, 100, 40),
'cell_size': (15.0, 15.0, 2.5)
})
# 生成网格拓扑
topology = builder.generate_grid_topology()
print(f"网格单元数:{len(topology)}")
4.2 动态参数赋值规则
动态参数描述油藏在开采过程中的变化,包括压力、饱和度、含水率等。这些参数的赋值需要基于物质平衡方程和流动方程求解:
数值模拟核心算法框架
from typing import Callable, Optional
import numpy as np
class ReservoirDynamicModel:
"""储层动态模型类"""
def __init__(self,
grid_properties: dict,
initial_conditions: dict):
self.grid = {**grid_properties}
self.initial_swp = initial_conditions.get('initial_sw', np.zeros((100, 100)))
self.porosity = initial_conditions.get('porosity')
self.permeability = initial_conditions.get('permeability')
def assemble_matrix_system(
self,
time_step: float,
current_state: dict
) -> tuple[np.ndarray, np.ndarray]:
"""组装矩阵系统"""
# 离散化算子:压力方程的离散形式
# A * P = B
n_cells = len(current_state.get('pressure', []))
# 计算扩散系数(简化)
diff_coeff = self.porosity / (self.grid['cell_size'][2] ** 2)
# 边界条件矩阵
bc_matrix = np.zeros((n_cells, n_cells + 1), dtype=np.float64)
return diff_coeff, bc_matrix
def solve_pressure_equation(self,
rhs_vector: np.ndarray) -> np.ndarray:
"""求解压力方程(简化版迭代)"""
# Gauss-Seidel迭代法(适用于大型稀疏系统)
max_iterations = 100
tolerance = 1e-6
n = len(rhs_vector)
pressure = rhs_vector.copy()
for iteration in range(max_iterations):
residual = np.linalg.norm(rhs_vector - pressure)
if residual < tolerance:
break
# Gauss-Seidel更新步骤
for i in range(n):
if n > 1:
sum_neighbors = np.dot(self.adjacency_matrix[i], pressure[:i])
pressure[i] = (rhs_vector[i] - sum_neighbors) / self.diag_coeff[i]
return pressure
def compute_saturation_update(
self,
pressure_change: float,
rock_properties: dict
) -> np.ndarray:
"""计算饱和度变化"""
# 毛细管压力-饱和度关系(Van Genuchten模型)
def capillary_pressure(sat):
if sat <= 0 or sat >= 1:
return 0.0
return rock_properties['pc_ow'] * (sat ** (-1/n))
saturation_update = np.zeros(len(pressure_change))
for i, dp in enumerate(pressure_change):
# 简化:假设线性关系
if abs(dp) < 1e-8:
saturation_update[i] = self.initial_swp[i]
else:
# 根据相渗曲线更新饱和度
kr_oil = rock_properties.get('kr_rel', 0.2) * (dp ** 0.5)
new_sat = saturation_update[i] + dp * kr_oil
saturation_update[i] = np.clip(new_sat, 0.1, 0.9)
return saturation_update
# 运行动态模拟示例
dynamic_model = ReservoirDynamicModel(
grid_properties={
'cell_size': (15.0, 15.0, 2.5),
'time_step': 365.0 # 天
},
initial_conditions={
'porosity': np.random.uniform(0.2, 0.25, (100,)),
'permeability': np.random.exponential(100.0, (100,))
}
)
# 执行单步计算
pressure_change = np.array([50.0] * 100)
saturation_update = dynamic_model.compute_saturation_update(
pressure_change[0],
{'pc_ow': 20.0, 'n': 2.0}
)
print(f"饱和度更新范围:{saturation_update.min():.3f} - {saturation_update.max():.3f}")
相渗曲线建模与参数化
class RelativePermeability:
"""相对渗透率模型"""
def __init__(self, model_type: str = 'van_genuchten'):
self.model = model_type
def van_genuchten(self, s: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""Van Genuchten相渗模型"""
if self.model == 'van_genuchten':
# 油相相对渗透率(简化形式)
kro = np.where(
(s >= 0.15) & (s <= 0.95),
s * (1 - (s ** (1/n)) ** n) ** m,
0.0
)
# 水相相对渗透率(简化形式)
krw = np.where(
(s >= 0.02) & (s <= 0.98),
s * (1 - (s ** (-1/n))) ** m,
0.0
)
return kro, krw
return s ** 2, s
def calculate_mobility(self,
kro: np.ndarray,
krw: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""计算相移动度"""
viscosity_oil = 0.85 # cP(简化假设)
viscosity_water = 1.0 # cP
mobility_oil = kro / viscosity_oil
mobility_water = krw / viscosity_water
return mobility_oil, mobility_water
# 参数化相渗曲线
rp_model = RelativePermeability()
saturation_curve = np.linspace(0.02, 0.98, 100)
kro_vals, krw_vals = rp_model.van_genuchten(saturation_curve)
print(f"油相相对渗透率范围:{kro_vals.min():.4f} - {kro_vals.max():.4f}")
print(f"水相相对渗透率范围:{krw_vals.min():.4f} - {krw_vals.max():.4f}")
4.3 模型验证与优化
为确保模型的可靠性,必须进行多层次的验证和校准工作:
静态模型验证流程
class ModelValidator:
"""模型验证器"""
def __init__(self, well_logs: List[dict],
seismic_data: np.ndarray):
self.well_logs = well_logs
self.seismic_data = seismic_data
def validate_geometric_consistency(self) -> dict:
"""验证几何一致性"""
validation_results = {
'well_correlation': {},
'seismic_match': {}
}
# 1. 井-模型位置匹配检查
well_positions = set()
for well in self.well_logs:
x, y = well['location']
well_positions.add((round(x, 1), round(y, 1)))
model_points = {
(round(p[0], 1), round(p[1], 1))
for p in [(i*15.0, j*15.0)
for i in range(100) for j in range(100)]
}
overlap_ratio = len(well_positions & model_points) / len(well_positions)
validation_results['well_correlation']['coverage'] = float(overlap_ratio)
# 2. 地震属性匹配度
seismic_profile = self.seismic_data[50:60, :, :] # 简化选取剖面
well_depths = [w.get('depth_range', (1200, 3850))
for w in self.well_logs[:2]]
seismic_avg = np.mean(np.abs(seismic_profile))
validation_results['seismic_match']['signal_strength'] = float(seismic_avg)
return validation_results
def validate_property_accuracy(self) -> dict:
"""验证物性准确性"""
results = {
'porosity': {},
'permeability': {}
}
for well in self.well_logs[:3]: # 使用前3口井
# 提取井数据中的孔隙度测量值
measured_porosity = well.get('measurements', {}).get('NPHI', [])
if measured_porosity:
avg_measured = np.mean(measured_porosity)
model_value = well.get('model_value') or avg_measured
results['porosity']['bias'] = float(abs(avg_measured - model_value))
return results
# 运行验证流程
validator = ModelValidator(
well_logs=[
{
'location': (50.0, 120.0),
'depth_range': (1200, 3850),
'measurements': {'NPHI': [0.22, 0.24, 0.21]}
},
{
'location': (150.0, 180.0),
'depth_range': (1100, 3900),
'measurements': {'NPHI': [0.23, 0.25]}
}
],
seismic_data=np.random.rand(100, 100, 80) * 0.5 + 0.2
)
geo_validation = validator.validate_geometric_consistency()
print(f"几何验证结果:{geo_validation}")
prop_validation = validator.validate_property_accuracy()
print(f"物性验证结果:{prop_validation}")
动态模型校准与优化
import numpy as np
from typing import List, Callable
class ModelCalibrator:
"""模型校准器"""
def __init__(self,
model_output: dict,
observed_data: dict):
self.model = {**model_output}
self.observed = observed_data
def calculate_misfit(self) -> float:
"""计算拟合误差(MSE)"""
if not self.model or not self.observed:
return np.inf
model_values = self.model.get('production', [])
observed_values = self.observed.get('production', [])
mse = np.mean((np.array(model_values) - np.array(observed_values)) ** 2)
rmse = np.sqrt(mse)
return {
'mse': float(mse),
'rmse': float(rmse)
}
def optimize_parameters(self,
parameter_space: dict,
max_iterations: int = 100) -> dict:
"""优化模型参数(简化版梯度下降)"""
best_misfit = np.inf
optimal_params = {}
# 初始参数猜测
current_params = {
'kr_max_oil': 1.0,
'permeability_factor': 1.0,
'porosity_factor': 1.0
}
learning_rate = 0.01
for iteration in range(max_iterations):
# 评估当前参数集
misfit_result = self.calculate_misfit()
if misfit_result['mse'] < best_misfit:
best_misfit = misfit_result['mse']
optimal_params = current_params.copy()
# 计算梯度(简化)
gradient = np.random.randn(3) * (best_misfit ** 0.5 + 1e-6)
# 参数更新
for key in current_params:
delta = -learning_rate * gradient[gradient.index(key)]
if key == 'kr_max_oil':
current_params[key] += delta * 0.1
elif key == 'permeability_factor':
current_params[key] *= (1 + delta)
else:
current_params[key] += delta
return {
'optimal_parameters': optimal_params,
'best_misfit': best_misfit,
'iterations': iteration
}
# 校准示例
calibrator = ModelCalibrator(
model_output={
'production': [1250, 1380, 1420, 1390, 1280] # 预测产量(桶/天)
},
observed_data={
'production': [1260, 1375, 1425, 1385, 1275] # 观测数据
}
)
misfit = calibrator.calculate_misfit()
print(f"初始拟合误差:RMSE = {misfit['rmse']:.2f}")
calibration_results = calibrator.optimize_parameters(
parameter_space={'kr_max_oil', 'permeability_factor'},
max_iterations=50
)
print(f"最优参数:{calibration_results['optimal_parameters']}")
print(f"校准后拟合误差:RMSE = {misfit['rmse']:.2f}")
综合质量报告生成
from datetime import datetime
class QualityReport:
"""质量控制报告生成器"""
def generate_report(self,
model: dict,
validation_results: List[dict],
calibration_results: Optional[dict] = None) -> str:
"""生成综合质量报告"""
report_sections = []
# 1. 模型基本信息
report_sections.append("=== 静态模型基础信息 ===")
report_sections.append(f"构建时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report_sections.append(f"网格类型:{model.get('grid_type', 'N/A')}")
report_sections.append(f"单元数量:{model.get('cell_count', 0)}")
# 2. 几何验证结果
if validation_results:
geo_validation = validation_results[0]
report_sections.append("\n=== 几何一致性验证 ===")
if 'well_correlation' in geo_validation:
coverage = geo_validation['well_correlation'].get('coverage', 0)
status = "通过" if coverage > 0.9 else "需要改进"
report_sections.append(f"井位置覆盖率:{coverage:.1%} [{status}]")
if 'seismic_match' in geo_validation:
snr = geo_validation['seismic_match'].get('signal_strength', 0)
status = "良好" if snr > 5.0 else "需关注"
report_sections.append(f"地震信号强度:{snr:.2f} [{status}]")
# 3. 物性验证结果
if validation_results:
prop_validation = validation_results[1]
report_sections.append("\n=== 物性准确性 ===")
for well_name, result in prop_validation.items():
bias = result.get('bias', float('inf'))
report_sections.append(f"井{well_name}孔隙度偏差:{bias:.4%}")
# 4. 校准结果(如果有)
if calibration_results:
report_sections.append("\n=== 动态模型校准 ===")
best_misfit = calibration_results.get('best_misfit', np.inf)
# 设定阈值判断
if best_misfit < 100.0:
status = "满足精度要求"
elif best_misfit < 500.0:
status = "可接受,需进一步观察"
else:
status = "需要重新建模或数据收集"
report_sections.append(f"最佳拟合误差:{best_misfit:.2f} [{status}]")
return "\n".join(report_sections)
# 生成完整报告
report_generator = QualityReport()
quality_report = report_generator.generate_report(
model={
'grid_type': 'structured_hex',
'cell_count': 400_000,
'time_step': 365.0
},
validation_results=[geo_validation, prop_validation],
calibration_results=calibration_results
)
print(report_generator.generate_report(
model={
'grid_type': 'structured_hex',
'cell_count': 400_000,
'time_step': 365.0
},
validation_results=[geo_validation, prop_validation],
calibration_results=calibration_results
))
# 输出报告内容
print(report_generator.generate_report(
model={
'grid_type': 'structured_hex',
'cell_count': 400_000,
'time_step': 365.0
},
validation_results=[geo_validation, prop_validation],
calibration_results=calibration_results
))
本章完
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)