#
地理学 2026-01-07

las点云生成DEM、等高线

By liuyang 164 Views 47 MIN READ 0 Comments

项目介绍

  • 本项目使用python3(anaconda3)开发
  • 本项目完全开源且免费
  • 切勿用于商业环境(不稳定)
  • 如您支持本项目,可赞赏
  • gitee开源仓库

项目环境配置

  1. Anaconda3官方链接
  2. python ≥ 3.9(建议)
  3. 项目分为3个程序

    • lasCSF.py 手动绘制房屋范围前提下,利用抹布算法和顶帽变换算法剔除las点云中的树木。
    • lastoDEM.py 利用剔除建筑和树木的las生成DEM。
    • TiftoCL.py 利用DEM生成等高线。

    程序还有一些插值算法,感兴趣可自行研究

源代码

lasCSF.py

import numpy as np
import laspy
import ezdxf
import cv2
from scipy.interpolate import griddata
from scipy.ndimage import label, median_filter, grey_erosion, binary_closing
import os
import time

# ================= 参数配置 =================
# 文件路径为 Windows 示例
LAS_FILE = "C:\\Users\\XX\\jexioc\\cloudR.las"    # las源文件
DXF_FILE = "C:\\Users\\XX\\jexioc\\building.dxf"    # 手动标记的建筑区域(建筑需闭合)
OUT_FILE = "C:\\Users\\XX\\jexioc\\ground.las"    # 处理后的las文件
GRID_RESOLUTION = 0.5   # 网格分辨率
TREE_MAX_DIAMETER = 12  # 树冠直径
TREE_HEIGHT_DIFF = 4.0    # 树高阈值

# 边界处理
BOUNDARY_GAP_FILL = 20.0   # 边界间隙填充距离
EDGE_EROSION_METERS = 25.0   # 边缘侵蚀距离

# CSF 参数
CLOTH_RIGIDNESS = 3     # 布料硬度
ITERATIONS = 1000      # 模拟迭代次数
TIME_STEP = 0.6        # 时间步长
# ===============================================

def fill_grid_min(grid, xs, ys, zs):
    for x, y, z in zip(xs, ys, zs):
        if z < grid[y, x]:
            grid[y, x] = z
    return grid

def read_dxf_polygons(dxf_path):
    if not os.path.exists(dxf_path): return []
    print(f"读取 DXF 建筑范围: {dxf_path}")
    try:
        doc = ezdxf.readfile(dxf_path)
        msp = doc.modelspace()
        polys = []
        for entity in msp.query('LWPOLYLINE POLYLINE'):
            points = []
            if entity.dxftype() == 'LWPOLYLINE':
                points = [(p[0], p[1]) for p in entity.get_points()]
            else:
                points = [(p.dxf.x, p.dxf.y) for p in entity.points()]
            if len(points) > 2: polys.append(np.array(points, dtype=np.float32))
        return polys
    except: return []

def generate_strict_boundary(points, resolution, min_x, min_y, width, height):
    print("生成严格边界...")
    mask_grid = np.zeros((height, width), dtype=np.uint8)
    idx_x = np.clip(((points[:, 0] - min_x) / resolution).astype(int), 0, width-1)
    idx_y = np.clip(((points[:, 1] - min_y) / resolution).astype(int), 0, height-1)
    mask_grid[idx_y, idx_x] = 1
    
    gap_pixels = int(BOUNDARY_GAP_FILL / resolution)
    kernel_close = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (gap_pixels, gap_pixels))
    closed_mask = cv2.morphologyEx(mask_grid, cv2.MORPH_CLOSE, kernel_close)
    
    erosion_pixels = int(EDGE_EROSION_METERS / resolution)
    if erosion_pixels < 1: erosion_pixels = 1
    kernel_erode = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (erosion_pixels, erosion_pixels))
    final_mask = cv2.erode(closed_mask, kernel_erode)
    return final_mask.astype(bool)

def detect_trees_tophat_padded(grid, resolution):
    """ 形态学去树 """
    print("识别树木...")
    valid_mask = np.isfinite(grid)
    if np.sum(valid_mask) == 0: return np.zeros_like(grid, dtype=bool)
    
    min_val = np.nanmin(grid)
    temp_grid = grid.copy()
    temp_grid[np.isnan(temp_grid)] = min_val
    
    pad_size = int(TREE_MAX_DIAMETER / resolution) + 5
    padded_grid = cv2.copyMakeBorder(temp_grid, pad_size, pad_size, pad_size, pad_size, cv2.BORDER_REPLICATE)
    
    kernel_size = int(TREE_MAX_DIAMETER / resolution)
    if kernel_size % 2 == 0: kernel_size += 1
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))
    
    tophat_diff = cv2.morphologyEx(padded_grid.astype(np.float32), cv2.MORPH_TOPHAT, kernel)
    
    h, w = grid.shape
    result_diff = tophat_diff[pad_size:pad_size+h, pad_size:pad_size+w]
    
    tree_mask = (result_diff > TREE_HEIGHT_DIFF) & valid_mask
    return tree_mask

def clean_edge_spikes(grid, boundary_mask):
    """ 边缘去刺 """
    print("边缘清洗...")
    ref_grid = median_filter(grid, size=5)
    diff = np.abs(grid - ref_grid)
    spike_mask = (diff > 5.0) & boundary_mask
    cleaned_grid = grid.copy()
    cleaned_grid[spike_mask] = ref_grid[spike_mask]
    return cleaned_grid

def inpainting_flat_mode_optimized(grid, bad_mask, resolution, target_dist=0.5):
    """ 建筑众数插值 """
    rows, cols = grid.shape
    structure = np.ones((3,3), dtype=int)
    labeled_array, num_features = label(bad_mask, structure=structure)
    
    if num_features == 0: return grid
    print(f"平整化 {num_features} 处建筑...")
    
    filled_grid = grid.copy()
    dilation_iter = int(np.round(target_dist / resolution))
    if dilation_iter < 1: dilation_iter = 1
    kernel = np.ones((3, 3), np.uint8)

    for i in range(1, num_features + 1):
        current_house_mask = (labeled_array == i)
        mask_uint8 = current_house_mask.astype(np.uint8)
        dilated_uint8 = cv2.dilate(mask_uint8, kernel, iterations=dilation_iter)
        dilated_mask = dilated_uint8.astype(bool)
        ring_mask = dilated_mask & (~current_house_mask) & np.isfinite(grid)
        boundary_vals = grid[ring_mask]
        
        if len(boundary_vals) == 0: continue
            
        rounded_vals = np.round(boundary_vals, 1)
        unique_vals, counts = np.unique(rounded_vals, return_counts=True)
        if len(counts) > 0:
            target_height = unique_vals[np.argmax(counts)]
            filled_grid[current_house_mask] = target_height
    return filled_grid

def simple_inpainting_linear(grid, mask_to_fill):
    """ 线性插值 """
    rows, cols = grid.shape
    valid_mask = ~mask_to_fill & np.isfinite(grid)
    target_mask = mask_to_fill
    if np.sum(target_mask) == 0: return grid
    
    grid_y, grid_x = np.meshgrid(np.arange(rows), np.arange(cols), indexing='ij')
    src_pts = np.column_stack((grid_y[valid_mask], grid_x[valid_mask]))
    src_vals = grid[valid_mask]
    target_pts = np.column_stack((grid_y[target_mask], grid_x[target_mask]))
    
    if len(src_pts) > 10000: 
        idx = np.random.choice(len(src_pts), 10000, replace=False)
        src_pts, src_vals = src_pts[idx], src_vals[idx]

    filled_vals = griddata(src_pts, src_vals, target_pts, method='linear')
    nan_mask = np.isnan(filled_vals)
    if np.any(nan_mask):
        filled_vals[nan_mask] = griddata(src_pts, src_vals, target_pts[nan_mask], method='nearest')
        
    out_grid = grid.copy()
    out_grid[target_mask] = filled_vals
    return out_grid

def simul_cloth_engine_locked(cloth_grid, terrain_grid, locked_mask, rigidness, time_step, iterations):

    rows, cols = cloth_grid.shape
    movable = np.ones((rows, cols), dtype=bool)
    cloth_grid[locked_mask] = terrain_grid[locked_mask]
    movable[locked_mask] = False
    for it in range(iterations):
        next_z = cloth_grid - time_step
        collision = next_z <= terrain_grid
        cloth_grid[collision] = terrain_grid[collision]
        cloth_grid[~collision] = next_z[~collision]
        movable[collision] = False
        cloth_grid[locked_mask] = terrain_grid[locked_mask]
        kernel = np.array([[0, 1, 0], [1, 0, 1], [0, 1, 0]], dtype=np.float32) / 4.0
        smoothed = cv2.filter2D(cloth_grid, -1, kernel, borderType=cv2.BORDER_REPLICATE)
        update_mask = movable & (~locked_mask)
        cloth_grid[update_mask] = smoothed[update_mask]
        
    return cloth_grid

def process_pipeline(las_path, dxf_path, output_path):
    # 1. 读取
    print(f"step1-读取点云: {las_path}")
    las = laspy.read(las_path)
    points = np.vstack((las.x, las.y, las.z)).transpose()

    min_x, max_x = np.min(points[:, 0]), np.max(points[:, 0])
    min_y, max_y = np.min(points[:, 1]), np.max(points[:, 1])
    width = int(np.ceil((max_x - min_x) / GRID_RESOLUTION)) + 1
    height = int(np.ceil((max_y - min_y) / GRID_RESOLUTION)) + 1
    
    boundary_mask = generate_strict_boundary(points, GRID_RESOLUTION, min_x, min_y, width, height)

    # 2. 栅格化
    print("step2-栅格化 (Min Pooling)...")
    grid_z = np.full((height, width), np.inf)
    idx_x = np.clip(((points[:, 0] - min_x) / GRID_RESOLUTION).astype(int), 0, width-1)
    idx_y = np.clip(((points[:, 1] - min_y) / GRID_RESOLUTION).astype(int), 0, height-1)
    grid_z = fill_grid_min(grid_z, idx_x, idx_y, points[:, 2])
    grid_z[np.isinf(grid_z)] = np.nan

    # 处理 DXF 建筑
    print("step3: 处理 DXF 建筑...")
    building_mask = np.zeros((height, width), dtype=bool)

    dxf_polys = read_dxf_polygons(dxf_path)
    if dxf_polys:
        grid_polys = []
        for poly in dxf_polys:
            gx = ((poly[:, 0] - min_x) / GRID_RESOLUTION).astype(np.int32)
            gy = ((poly[:, 1] - min_y) / GRID_RESOLUTION).astype(np.int32)
            grid_polys.append(np.column_stack((gx, gy)))
        temp_mask = np.zeros((height, width), dtype=np.uint8)
        cv2.drawContours(temp_mask, grid_polys, -1, 1, thickness=cv2.FILLED)
        building_mask = temp_mask.astype(bool)
    # 对建筑进行众数插值
    grid_z = inpainting_flat_mode_optimized(grid_z, building_mask, GRID_RESOLUTION)
    LOCKED_MASK = building_mask.copy()

    # 去树
    print("Step 4: 形态学去树 (可选避开建筑)...")
    
    # 检测树木 (TopHat)
    tree_mask = detect_trees_tophat_padded(grid_z, GRID_RESOLUTION)
    
    # 关闭注释为避开建筑
    #tree_mask = tree_mask & (~LOCKED_MASK)
    
    # 去树插值(Linear插值)
    grid_z = simple_inpainting_linear(grid_z, tree_mask)

    # 局部平滑 (避开建筑)
    print("Step 5: 局部物理优化 (锁定建筑)...")
    terrain_final = -grid_z
    terrain_final[~boundary_mask] = -np.inf
    terrain_final[np.isnan(terrain_final)] = -np.inf
    
    cloth_final = np.full((height, width), np.nanmax(terrain_final[np.isfinite(terrain_final)]) + 1.0)
    
    # CSF
    surf_final = simul_cloth_engine_locked(
        cloth_final, 
        terrain_final, 
        LOCKED_MASK,
        CLOTH_RIGIDNESS, 
        TIME_STEP, 
        ITERATIONS
    )
    ground_surf = -surf_final
    
    # 6. 导出
    print("step6-边缘清洗与导出...")
    ground_surf = clean_edge_spikes(ground_surf, boundary_mask)
    ground_surf[~boundary_mask] = np.nan
    
    grid_x, grid_y = np.meshgrid(np.arange(width), np.arange(height))
    real_x = min_x + grid_x * GRID_RESOLUTION
    real_y = min_y + grid_y * GRID_RESOLUTION
    
    valid_out = np.isfinite(ground_surf)
    if np.sum(valid_out) > 0:
        new_header = laspy.LasHeader(point_format=3, version="1.2")
        new_header.scales = las.header.scales
        new_header.offsets = las.header.offsets
        out_las = laspy.LasData(new_header)
        out_las.x = real_x[valid_out].flatten()
        out_las.y = real_y[valid_out].flatten()
        out_las.z = ground_surf[valid_out].flatten()
        out_las.write(output_path)
        print(f"完成!输出las文件至 {output_path}")

if __name__ == "__main__":
    if os.path.exists(LAS_FILE):
        process_pipeline(LAS_FILE, DXF_FILE, OUT_FILE)

lastoDEM.py

import whitebox
import os

# 初始化工具
wbt = whitebox.WhiteboxTools()
wbt.verbose = False  # 是否显示详细日志

# 设置工作目录
work_dir = "C:\\Users\\XX\\jexioc"
wbt.set_working_dir(work_dir)

# 输入和输出文件名
input_las = "ground.las"
output_dem = "output_dem.tif"

# 2. 插值生成栅格 (IDW方法 或 TIN方法)
# resolution: 输出栅格的分辨率(例如 1.0 代表 1米/像素)
# radius: 搜索半径
print("正在生成 DEM...")
wbt.lidar_idw_interpolation(
    i=input_las,
    output=output_dem,
    parameter="elevation",
    returns="last",  # 使用最后回波(通常更接近地面)
    resolution=0.5,
    weight=1,
    radius=1
)

print(f"完成!文件已保存为: {output_dem}")

TiftoCL.py

import os
import subprocess
import glob
import shutil

# ====== 设置 ======
INPUT_DIR = r"C:\\Users\\XX\\jexioc\\output_dem.tif"    # DEM文件所在的文件夹
OUTPUT_DIR = r"C:\\Users\\XX\\jexioc\\dxf"    # 输出等高线的文件夹
TEMP_DIR = r"C:\\Users\\XX\\jexioc\\dxf\tmp"    # 临时文件夹,存放中间过程的shp
INTERVAL = 1    # 等高距,单位与dem一致,一般都为米
# =================

os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(TEMP_DIR, exist_ok=True)
dem_files = glob.glob(os.path.join(INPUT_DIR, "*.tif"))

for dem_path in dem_files:
    file_name = os.path.basename(dem_path)
    base_name = file_name.replace(".tif", "")
    # 临时 SHP 路径
    temp_shp_path = os.path.join(TEMP_DIR, f"{base_name}.shp")
    # 最终 DXF 路径
    final_dxf_path = os.path.join(OUTPUT_DIR, f"{base_name}.dxf")
    print(f"正在处理: {file_name} ...")
    
    #  DEM -> Shapefile (SHP)
    #  直接生成dxf会导致等高线缺少高程信息
    cmd_1 = [
        "gdal_contour",
        "-a", "ELEV",        # 将高度写入 ELEV 字段
        "-i", str(INTERVAL),
        dem_path,
        temp_shp_path
    ]
    
    # ---------------------------------------------------------
    # 第二步: SHP -> DXF
    # 关键参数: -zfield ELEV
    # ---------------------------------------------------------
    cmd_2 = [
        "ogr2ogr",
        "-f", "DXF",
        "-zfield", "ELEV",   # 把字段值转为几何高度
        final_dxf_path,
        temp_shp_path
    ]
    
    try:
        # 执行第一步
        subprocess.run(cmd_1, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        # 执行第二步
        subprocess.run(cmd_2, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print("成功生成带高度的 DXF")
        
    except subprocess.CalledProcessError as e:
        print(f"失败!: {e.stderr.decode()}")

# 清理临时文件 (需要保留shp格式等高线的话可以注释掉)
try:
    shutil.rmtree(TEMP_DIR)
    print("临时shp文件已清理")
except:
    pass

print("完成!")

所需python库及anaconda库

#requirements.txt
laspy[lazrs]>=2.0.0
whitebox
ezdxf
opencv-python
numpy
scipy

#shell
conda install -c conda-forge gdal

其他说明

这些程序免费吗?

免费

程序为什么不做成一个而分为单个?

主要还是满足不同需求,不需要做一个高集成度的软件,需要哪个用哪个
主要是懒

自己不会配置需要协助怎么办?

因为是免费软件,且程序比较简陋,上手难度可能较高,可以赞赏一下(其他金额可备注邮箱),备注邮箱给你提供远程协助

本文由 liuyang 原创

采用 CC BY-NC-SA 4.0 协议进行许可

转载请注明出处:https://jexioc.com/index.php/archives/3/

TAGS: Geography

相关推荐

  • 暂无相关推荐,看看别的吧。

0 评论

发表评论