LMDB 数据库模块
高性能的LMDB数据库操作模块,支持大规模数据存储和高效读取
📋 目录
✨ 功能特性
- 🚀 高性能: 基于LMDB的高性能数据库操作
- 📦 数据序列化: 自动处理numpy数组和复杂数据结构
- 🔄 多进程支持: 支持多进程并发读写
- 💾 内存优化: 智能内存管理和大小控制
- 🔧 工具丰富: 提供数据库合并、分割、修复等工具
- 🛡️ 数据安全: 事务性操作,确保数据一致性
🚀 快速开始
基本使用
import sindre.lmdb as lmdb
import numpy as np
# 创建数据库,支持目录,也支持文件
writer = lmdb.Writer('./data.db', map_size_limit=1024*100) # map_size_limit单位为MB
#writer = lmdb.Writer('./data', map_size_limit=1024*100) # 会创建data目录
# 写入数据
data = {
'points': np.random.rand(100, 3),
'labels': np.random.randint(0, 10, 100),
'version': '1.0',
}
writer.put_samples(data)
writer.close()
# 读取数据
reader = lmdb.Reader('./data.db')
sample = reader[0]
print(f"读取到 {len(sample['points'])} 个点")
reader.close()
PyTorch 数据集集成
import torch
from sindre.lmdb import Reader
class LMDBDataset(torch.utils.data.Dataset):
def __init__(self, db_path):
self.db = Reader(db_path, multiprocessing=False)
def __len__(self):
return len(self.db)
def __getitem__(self, idx):
data = self.db[idx]
# 转换为torch张量
return {k: torch.from_numpy(v) for k, v in data.items()}
# 使用
dataset = LMDBDataset('./data')
dataloader = torch.utils.data.DataLoader(dataset, batch_size=32)
🔧 核心类
Writer - 数据库写入器
class Writer:
"""LMDB数据库写入器"""
def __init__(self, dirpath: str, map_size_limit: int, multiprocessing: bool = False):
"""
初始化写入器
Args:
dirpath: 数据库目录路径
map_size_limit: 数据库大小限制(MB)
multiprocessing: 是否启用多进程支持
"""
def put_samples(self, samples: dict):
"""批量写入样本数据"""
def change_value(self, num_id: int, samples: dict):
"""修改指定ID的数据"""
def change_db_value(self, key: int, value: dict, safe_model: bool = True):
"""安全修改数据库值,带确认提示"""
def check_sample_size(self, samples: dict):
"""检查样本大小(GB)"""
def close(self):
"""关闭数据库连接"""
Reader - 数据库读取器
class Reader:
"""LMDB数据库读取器"""
def __init__(self, dirpath: str, multiprocessing: bool = False):
"""
初始化读取器
Args:
dirpath: 数据库目录路径
multiprocessing: 是否启用多进程支持
"""
def __getitem__(self, idx: int):
"""通过索引获取数据"""
def get_sample(self, idx: int):
"""获取单个样本"""
def get_samples(self, start_idx: int, size: int):
"""批量获取样本"""
def get_data_keys(self, i: int = 0):
"""获取第i个样本的所有键"""
def get_data_value(self, i: int, key: str):
"""获取第i个样本的指定键值"""
def get_data_specification(self, i: int):
"""获取第i个样本的数据规范"""
def get_meta_str(self, key):
"""获取元数据字符串"""
def __len__(self):
"""获取数据库大小"""
ReaderList - 多数据库读取器
class ReaderList:
"""多个LMDB数据库的统一读取器"""
def __init__(self, db_path_list: list, multiprocessing: bool = True):
"""
初始化多数据库读取器
Args:
db_path_list: 数据库路径列表
multiprocessing: 是否启用多进程支持
"""
ReaderSSD - SSD优化读取器
class ReaderSSD:
"""针对SSD优化的读取器"""
def __init__(self, db_path: str, multiprocessing: bool = False):
"""
初始化SSD读取器
Args:
db_path: 数据库路径
multiprocessing: 是否启用多进程支持
"""
def get_batch(self, indices: list):
"""批量获取数据"""
📖 使用指南
1. 数据写入
基本写入
import sindre.lmdb as lmdb
import numpy as np
# 创建写入器
writer = lmdb.Writer('./dataset', map_size_limit=1024*100) # 100GB
# 写入单个样本
data = {
'points': np.random.rand(1000, 3),
'labels': np.random.randint(0, 10, 1000),
'features': np.random.rand(1000, 128)
}
writer.put_samples({0: data})
# 批量写入
for i in range(1000):
data = {
'points': np.random.rand(100, 3),
'labels': np.random.randint(0, 10, 100),
'id': i
}
writer.put_samples({i: data})
writer.close()
设置元数据
# 设置数据库元数据
writer.set_meta_str("description", "点云数据集")
writer.set_meta_str("version", "1.0")
writer.set_meta_str("created_by", "sindre")
数据修改
# 修改现有数据
new_data = {
'points': np.random.rand(200, 3),
'labels': np.random.randint(0, 10, 200),
'updated': True
}
writer.change_value(0, new_data)
# 安全修改(带确认提示)
writer.change_db_value(0, new_data, safe_model=True)
内存大小检查
# 检查数据大小
data = {
'points': np.random.rand(10000, 3),
'labels': np.random.randint(0, 10, 10000)
}
gb_required = writer.check_sample_size(data)
print(f"数据大小: {gb_required:.2f} GB")
2. 数据读取
基本读取
# 创建读取器
reader = lmdb.Reader('./dataset')
# 获取数据库大小
print(f"数据库包含 {len(reader)} 个样本")
# 读取单个样本
sample = reader[0]
print(f"样本键: {list(sample.keys())}")
# 读取指定样本
sample = reader.get_sample(0)
print(f"点云数量: {len(sample['points'])}")
批量读取
# 批量读取
samples = reader.get_samples(0, 10)
print(f"读取了 {len(samples)} 个样本")
# 使用ReaderList读取多个数据库
reader_list = lmdb.ReaderList(['./db1', './db2', './db3'])
print(f"总样本数: {len(reader_list)}")
元数据查询
# 获取元数据
description = reader.get_meta_str("description")
version = reader.get_meta_str("version")
print(f"描述: {description}, 版本: {version}")
# 获取数据键信息
data_keys = reader.get_data_keys(0)
print(f"数据键: {data_keys}")
# 获取数据规范
spec = reader.get_data_specification(0)
for key, info in spec.items():
print(f"{key}: shape={info['shape']}, dtype={info['dtype']}")
3. 多进程支持
# 启用多进程写入
writer = lmdb.Writer('./dataset', map_size_limit=1024*100, multiprocessing=True)
# 启用多进程读取
reader = lmdb.Reader('./dataset', multiprocessing=True)
🔧 高级功能
数据库工具函数
import sindre.lmdb as lmdb
# 合并数据库
lmdb.MergeLmdb(
target_dir='./merged_db',
source_dirs=['./db1', './db2', './db3'],
map_size_limit=1024*100,
multiprocessing=True
)
# 分割数据库
lmdb.SplitLmdb(
source_dir='./large_db',
target_dirs=['./part1', './part2', './part3'],
map_size_limit=1024*50,
multiprocessing=True
)
# 修复Windows大小问题
lmdb.fix_lmdb_windows_size('./database')
# 并行写入
def process_function(file_path):
# 处理单个文件的函数
return {'processed_data': np.random.rand(100, 3)}
lmdb.parallel_write(
output_dir='./processed_db',
file_list=['file1.txt', 'file2.txt', 'file3.txt'],
process=process_function,
map_size_limit=1024*100,
num_processes=4,
multiprocessing=True
)
SSD优化读取
# 使用SSD优化读取器
reader_ssd = lmdb.ReaderSSD('./dataset', multiprocessing=False)
# 批量读取
indices = [0, 1, 2, 3, 4]
batch_data = reader_ssd.get_batch(indices)
print(f"批量读取了 {len(batch_data)} 个样本")
# 多数据库SSD读取
reader_ssd_list = lmdb.ReaderSSDList(['./db1', './db2'], multiprocessing=False)
⚡ 性能优化
1. 内存管理
# 合理设置map_size_limit
# 建议设置为预期数据大小的1.5-2倍
expected_size_gb = 50
map_size_limit_mb = int(expected_size_gb * 1.5 * 1024)
writer = lmdb.Writer('./dataset', map_size_limit=map_size_limit_mb)
2. 多进程优化
# 写入时使用多进程
writer = lmdb.Writer('./dataset', map_size_limit=1024*100, multiprocessing=True)
# 读取时根据数据大小决定是否使用多进程
if len(reader) > 10000:
reader = lmdb.Reader('./dataset', multiprocessing=True)
else:
reader = lmdb.Reader('./dataset', multiprocessing=False)
3. 批量操作
# 批量写入而不是逐个写入
batch_data = {}
for i in range(1000):
batch_data[i] = {
'points': np.random.rand(100, 3),
'labels': np.random.randint(0, 10, 100)
}
writer.put_samples(batch_data)
❓ 常见问题
Q1: map_size_limit 设置多大合适?
A: 建议设置为预期数据大小的1.5-2倍。例如,如果数据大约50GB,可以设置为:
map_size_limit = int(50 * 1.5 * 1024) # 75GB in MB
Q2: 多进程模式什么时候使用?
A: - 写入时: 数据量大(>1GB)时建议使用 - 读取时: 数据库样本数多(>10000)时建议使用
Q3: 如何处理数据库损坏?
A: 使用修复工具:
lmdb.fix_lmdb_windows_size('./database')
Q4: 如何检查数据库状态?
A:
writer.check_db_stats() # 检查数据库统计信息
Q5: 支持哪些数据类型?
A: 主要支持numpy数组,其他类型会自动转换:
# 支持的数据
data = {
'points': np.random.rand(100, 3), # numpy数组
'labels': np.random.randint(0, 10, 100), # numpy数组
'metadata': 'test' # 字符串(会被序列化)
}
Q6: 如何高效地修改现有数据?
A: 使用安全修改模式:
# 带确认提示的安全修改
writer.change_db_value(0, new_data, safe_model=True)
# 直接修改(无确认)
writer.change_value(0, new_data)
📊 性能基准
操作 | 数据大小 | 时间 | 内存使用 |
---|---|---|---|
写入 | 1GB | ~30s | ~2GB |
读取 | 1GB | ~5s | ~1GB |
批量读取 | 1GB | ~2s | ~1.5GB |
随机访问 | 1GB | ~10s | ~1GB |