mmWave 雷达生命体征数据集:完整采集方案与自动标注代码实现

前言

雷达生命体征检测面临核心挑战:缺乏高质量的公开数据集

Scientific Data 发布了两个关键数据集:

  1. Age-Balanced Dataset(2026):6-90 岁,年龄平衡
  2. Child Vital Sign Dataset(2021):50 名儿童,<13 岁

本文提供完整采集方案自动标注代码实现


一、硬件配置详解

1.1 雷达传感器

参数 TI IWR6843ISK-ODS TI IWR6843
频率 60-64 GHz 60-64 GHz
带宽 4 GHz 4 GHz
天线 3Tx / 4Rx MIMO 3Tx / 4Rx MIMO
距离分辨率 ~4 cm ~4 cm
速度分辨率 ~0.2 m/s ~0.2 m/s
封装 ODS(超短距) 标准

1.2 参考传感器

传感器 型号 测量参数 采样率
ECG Movesense Medical 心率 250 Hz
ECG Nihon-Kohden BSM6501K 心率 + 呼吸 125 Hz
呼吸带 Beurer 呼吸率 50 Hz
血压 OMRON M2 Basic 血压 单次测量

1.3 数据采集卡

组件 型号 功能
FPGA 模块 TI DCA1000 高速 ADC 数据采集
载板 TI MMWAVEICBOOST 雷达接口
主机 Intel Core i7 数据存储

二、采集协议详解

2.1 被试招募(Child Dataset 示例)

项目 规格
总人数 50 名儿童
年龄范围 < 13 岁(1-148 个月)
性别分布 男 24 / 女 26
伦理审批 IRB HYUH 2017-05-004
监护人同意 必须签署知情同意书

2.2 采集流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
完整采集流程:

1. 准备阶段(5 分钟)
├── 向监护人和儿童解释流程
├── 儿童佩戴 ECG 电极(红/黑/白三导联)
├── 佩戴呼吸带
├── 测量血压(可选)
└── 6 岁以下儿童使用安全座椅

2. 雷达校准(2 分钟)
├── 背景噪声测量(无被试)
└── 距离校准

3. 数据采集(5 分钟)
├── 静止状态:2 分钟
├── 深呼吸:1 分钟
└── 自然状态:2 分钟

4. 同步检查(1 分钟)
├── 检查雷达与参考传感器时间戳
└── 标记体动事件

5. 数据保存
├── 雷达 ADC 数据(.bin)
├── ECG 数据(.csv)
└── 被试信息(.csv)

2.3 雷达配置参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# TI IWR6843 雷达配置(用于生命体征检测)

radar_config = {
# 射频参数
'start_freq': 60e9, # 起始频率 60 GHz
'freq_slope': 60e12, # 斜率 60 MHz/μs
'idle_time': 100, # 空闲时间 (μs)
'adc_start_time': 6, # ADC 开始时间 (μs)

# Chirp 参数
'num_chirps_per_frame': 2, # 每帧 Chirp 数
'num_samples_per_chirp': 512, # 每 Chirp 采样点
'chirp_duration': 60, # Chirp 周期 (μs)

# 帧参数
'frame_rate': 20, # 帧率 20 FPS
'num_frames': 6000, # 300 秒 = 5 分钟

# 天线配置
'num_tx': 1, # 使用 1 个发射天线
'num_rx': 4, # 使用 4 个接收天线

# 数据维度
'data_shape': (512, 20, 4), # (samples, frames, rx) per second
}

三、数据格式详解

3.1 目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
mmwave_vital_signs_dataset/
├── metadata/
│ ├── subjects.csv # 被试信息
│ │ # 列:id,age_months,gender,height_cm,weight_kg,bmi,car_seat
│ ├── sessions.csv # 采集会话信息
│ └── radar_config.json # 雷达配置参数

├── radar/
│ ├── raw/
│ │ └── subject_{id:03d}_session_{sid}.bin # 原始 ADC 数据
│ └── processed/
│ ├── range_fft/
│ │ └── subject_{id:03d}_session_{sid}.npy
│ └── vital_signals/
│ └── subject_{id:03d}_session_{sid}.npz

├── reference/
│ ├── ecg/
│ │ └── subject_{id:03d}_session_{sid}.csv
│ │ # 列:timestamp_ms, ecg_mv, hr_bpm
│ ├── respiration/
│ │ └── subject_{id:03d}_session_{sid}.csv
│ │ # 列:timestamp_ms, resp_signal, rr_bpm
│ └── blood_pressure/
│ └── subject_{id:03d}_session_{sid}.csv

└── labels/
└── subject_{id:03d}_session_{sid}.json
# 自动标注结果

3.2 原始数据格式

ADC 原始数据(.bin)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
"""
ADC 原始数据格式说明

数据维度:[num_frames, num_chirps, num_samples, num_rx]
示例:[6000, 2, 512, 4] = 5分钟 @ 20 FPS

数据类型:int16(复数:实部 + 虚部)
字节序:little-endian
文件大小:6000 × 2 × 512 × 4 × 2 × 2 = 98.3 MB

读取方法:
"""

import numpy as np

def read_adc_bin(filepath, num_frames=6000, num_chirps=2,
num_samples=512, num_rx=4):
"""
读取 TI DCA1000 输出的 ADC 原始数据

Returns:
adc_data: shape (num_frames, num_chirps, num_samples, num_rx)
complex64
"""
# 读取原始字节
raw_data = np.fromfile(filepath, dtype=np.int16)

# 重塑为复数(实部 + 虚部交错)
num_complex = len(raw_data) // 2
complex_data = raw_data[:num_complex*2].astype(np.float32)
complex_data = complex_data.reshape(-1, 2)
complex_data = complex_data[:, 0] + 1j * complex_data[:, 1]

# 重塑为雷达数据立方体
expected_size = num_frames * num_chirps * num_samples * num_rx
if len(complex_data) >= expected_size:
adc_data = complex_data[:expected_size].reshape(
num_frames, num_chirps, num_samples, num_rx
)
else:
raise ValueError(f"数据不足:期望 {expected_size},实际 {len(complex_data)}")

return adc_data.astype(np.complex64)

参考传感器数据(.csv)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# ECG 数据格式示例
# subject_001_session_01.csv

"""
timestamp_ms,ecg_mv,hr_bpm
0,0.125,98
8,0.130,98
16,0.128,98
24,0.132,98
...
"""

# 呼吸数据格式示例
"""
timestamp_ms,resp_signal,rr_bpm
0,0.45,22
20,0.47,22
40,0.52,22
60,0.55,23
...
"""

3.3 被试信息格式

1
2
3
4
5
6
7
8
9
# subjects.csv 示例

"""
id,age_months,gender,height_cm,weight_kg,bmi,car_seat,hr_range_min,hr_range_max,rr_range_min,rr_range_max
001,38,F,97.5,16.3,17.15,Yes,83,117,16,37
002,44,M,101.1,16.2,15.85,Yes,96,125,19,44
003,15,F,77.5,8.7,14.48,Yes,117,150,26,56
...
"""

四、自动标注代码实现

4.1 时间同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
"""
雷达与参考传感器时间同步

由于雷达和参考传感器独立采集,需要时间同步。
本代码使用相关系数最大化进行自动对齐。
"""

import numpy as np
from scipy import signal

class TimeSynchronizer:
"""时间同步器"""

def __init__(self, radar_fs: float = 20.0, ref_fs: float = 125.0):
"""
Args:
radar_fs: 雷达采样率 (Hz)
ref_fs: 参考传感器采样率 (Hz)
"""
self.radar_fs = radar_fs
self.ref_fs = ref_fs

def sync_by_correlation(self, radar_signal: np.ndarray,
ref_signal: np.ndarray) -> tuple:
"""
通过互相关找到最佳时间偏移

Args:
radar_signal: 雷达提取的生命体征信号
ref_signal: 参考传感器信号

Returns:
(time_offset_samples, correlation_score)
"""
# 上采样雷达信号到参考传感器采样率
upsample_factor = int(self.ref_fs / self.radar_fs)
radar_upsampled = np.repeat(radar_signal, upsample_factor)

# 计算互相关
correlation = np.correlate(
radar_upsampled - np.mean(radar_upsampled),
ref_signal - np.mean(ref_signal),
mode='full'
)

# 找到最大相关位置
max_idx = np.argmax(np.abs(correlation))
time_offset = max_idx - len(ref_signal) + 1

# 归一化相关系数
correlation_score = correlation[max_idx] / (
np.std(radar_upsampled) * np.std(ref_signal) * len(ref_signal)
)

return time_offset, correlation_score

def align_signals(self, radar_signal: np.ndarray,
ref_signal: np.ndarray,
offset: int) -> tuple:
"""
根据偏移量对齐信号

Returns:
(aligned_radar, aligned_ref)
"""
upsample_factor = int(self.ref_fs / self.radar_fs)
radar_upsampled = np.repeat(radar_signal, upsample_factor)

if offset >= 0:
aligned_radar = radar_upsampled[offset:]
aligned_ref = ref_signal[:len(aligned_radar)]
else:
aligned_ref = ref_signal[-offset:]
aligned_radar = radar_upsampled[:len(aligned_ref)]

return aligned_radar, aligned_ref

4.2 生命体征自动标注

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
"""
自动标注器:从雷达信号自动生成心率和呼吸率标签

依赖:numpy, scipy, heartpy(心率检测库)
"""

import numpy as np
from scipy import signal
from scipy.fft import fft, fftfreq
from dataclasses import dataclass
from typing import List, Dict, Optional
import json

@dataclass
class VitalSignsLabel:
"""生命体征标签"""
timestamp_ms: float
heart_rate_bpm: float
respiration_rate_bpm: float
heart_rate_confidence: float
respiration_confidence: float
body_movement: bool

class VitalSignsAutoLabeler:
"""生命体征自动标注器"""

def __init__(self, radar_fs: float = 20.0, window_sec: float = 10.0):
"""
Args:
radar_fs: 雷达帧率 (Hz)
window_sec: 分析窗口长度 (秒)
"""
self.radar_fs = radar_fs
self.window_sec = window_sec
self.window_samples = int(window_sec * radar_fs)

# 呼吸频段:0.1-0.5 Hz(6-30 次/分钟)
self.breath_band = (0.1, 0.5)
# 心跳频段:0.8-2.5 Hz(48-150 次/分钟)
self.heart_band = (0.8, 2.5)

def process_adc_data(self, adc_data: np.ndarray) -> np.ndarray:
"""
处理 ADC 数据,提取距离-时间信号

Args:
adc_data: shape (num_frames, num_chirps, num_samples, num_rx)

Returns:
vital_signal: shape (num_frames,) 时间域信号
"""
num_frames = adc_data.shape[0]

# 1. Range FFT
range_fft = np.fft.fft(adc_data[:, 0, :, :], axis=1)
range_profile = np.abs(range_fft)

# 2. 找到人体位置(方差最大的距离bin)
variance = np.var(range_profile, axis=0)
target_range_bin = np.argmax(np.var(range_profile, axis=0))

# 3. 提取该距离bin的相位
phase_signal = np.angle(range_fft[:, target_range_bin, :])

# 4. 相位解缠绕
phase_unwrapped = np.unwrap(phase_signal, axis=0)

# 5. 取平均(多接收天线)
vital_signal = np.mean(phase_unwrapped, axis=1)

# 6. 去除直流分量
vital_signal = vital_signal - np.mean(vital_signal)

return vital_signal

def extract_vital_signs(self, signal_data: np.ndarray) -> Dict:
"""
从时域信号提取心率和呼吸率

Args:
signal_data: 时间域信号

Returns:
dict: heart_rate, resp_rate, confidence
"""
n = len(signal_data)
nyq = self.radar_fs / 2

# 带通滤波
b_breath, a_breath = signal.butter(
4, [self.breath_band[0]/nyq, self.breath_band[1]/nyq],
btype='band'
)
b_heart, a_heart = signal.butter(
4, [self.heart_band[0]/nyq, self.heart_band[1]/nyq],
btype='band'
)

breath_signal = signal.filtfilt(b_breath, a_breath, signal_data)
heart_signal = signal.filtfilt(b_heart, a_heart, signal_data)

# FFT 分析
freq = fftfreq(n, 1/self.radar_fs)[:n//2]

breath_spectrum = np.abs(fft(breath_signal))[:n//2]
heart_spectrum = np.abs(fft(heart_signal))[:n//2]

# 找主频
breath_idx = np.argmax(breath_spectrum)
heart_idx = np.argmax(heart_spectrum)

breath_freq = freq[breath_idx]
heart_freq = freq[heart_idx]

# 转换为 BPM
resp_rate_bpm = breath_freq * 60
heart_rate_bpm = heart_freq * 60

# 置信度(峰值与平均比)
breath_conf = breath_spectrum[breath_idx] / np.mean(breath_spectrum)
heart_conf = heart_spectrum[heart_idx] / np.mean(heart_spectrum)

return {
'heart_rate_bpm': heart_rate_bpm,
'resp_rate_bpm': resp_rate_bpm,
'heart_confidence': heart_conf,
'resp_confidence': breath_conf
}

def detect_body_movement(self, signal_data: np.ndarray,
threshold: float = 3.0) -> bool:
"""
检测体动

Args:
signal_data: 信号数据
threshold: 能量阈值

Returns:
True if body movement detected
"""
energy = np.std(signal_data)
baseline = np.std(signal_data[:int(len(signal_data)*0.1)])

return energy > baseline * threshold

def auto_label(self, adc_data: np.ndarray,
reference_data: Optional[Dict] = None) -> List[VitalSignsLabel]:
"""
自动标注完整采集数据

Args:
adc_data: ADC 原始数据
reference_data: 参考传感器数据(可选,用于验证)

Returns:
标签列表
"""
# 提取时间域信号
vital_signal = self.process_adc_data(adc_data)

labels = []
num_frames = len(vital_signal)

# 滑动窗口标注
for start in range(0, num_frames - self.window_samples,
self.window_samples // 2):
end = start + self.window_samples
window_signal = vital_signal[start:end]

# 提取生命体征
vitals = self.extract_vital_signs(window_signal)

# 检测体动
body_movement = self.detect_body_movement(window_signal)

# 如果有参考数据,验证准确性
if reference_data:
ref_hr = reference_data['heart_rate'][start:end]
ref_rr = reference_data['respiration_rate'][start:end]

# 计算误差
hr_error = abs(vitals['heart_rate_bpm'] - np.mean(ref_hr))
rr_error = abs(vitals['resp_rate_bpm'] - np.mean(ref_rr))

# 如果误差太大,降低置信度
if hr_error > 10:
vitals['heart_confidence'] *= 0.5
if rr_error > 5:
vitals['resp_confidence'] *= 0.5

label = VitalSignsLabel(
timestamp_ms=start * 1000 / self.radar_fs,
heart_rate_bpm=vitals['heart_rate_bpm'],
respiration_rate_bpm=vitals['resp_rate_bpm'],
heart_rate_confidence=vitals['heart_confidence'],
respiration_confidence=vitals['resp_confidence'],
body_movement=body_movement
)
labels.append(label)

return labels

def save_labels(self, labels: List[VitalSignsLabel],
filepath: str):
"""保存标签到 JSON 文件"""
data = {
'labels': [
{
'timestamp_ms': l.timestamp_ms,
'heart_rate_bpm': round(l.heart_rate_bpm, 1),
'respiration_rate_bpm': round(l.respiration_rate_bpm, 1),
'heart_rate_confidence': round(l.heart_rate_confidence, 2),
'respiration_confidence': round(l.respiration_confidence, 2),
'body_movement': l.body_movement
}
for l in labels
],
'metadata': {
'num_labels': len(labels),
'window_sec': self.window_sec,
'frame_rate': self.radar_fs
}
}

with open(filepath, 'w') as f:
json.dump(data, f, indent=2)


# 使用示例
if __name__ == "__main__":
# 初始化标注器
labeler = VitalSignsAutoLabeler(radar_fs=20.0, window_sec=10.0)

# 读取 ADC 数据
adc_data = read_adc_bin("radar/raw/subject_001_session_01.bin")

# 自动标注
labels = labeler.auto_label(adc_data)

# 保存标签
labeler.save_labels(labels, "labels/subject_001_session_01.json")

# 打印统计
valid_labels = [l for l in labels if not l.body_movement]
print(f"总标签数: {len(labels)}")
print(f"有效标签(无体动): {len(valid_labels)}")

if valid_labels:
avg_hr = np.mean([l.heart_rate_bpm for l in valid_labels])
avg_rr = np.mean([l.respiration_rate_bpm for l in valid_labels])
print(f"平均心率: {avg_hr:.1f} BPM")
print(f"平均呼吸率: {avg_rr:.1f} BPM")

4.3 批量标注脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
"""
批量处理脚本:处理整个数据集
"""

import os
import glob
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm

def process_subject(subject_dir: str, output_dir: str):
"""处理单个被试的所有数据"""

# 初始化
labeler = VitalSignsAutoLabeler(radar_fs=20.0, window_sec=10.0)
sync = TimeSynchronizer(radar_fs=20.0, ref_fs=125.0)

# 读取雷达数据
radar_files = glob.glob(f"{subject_dir}/radar/raw/*.bin")

for radar_file in radar_files:
# 提取被试 ID 和会话 ID
basename = os.path.basename(radar_file)
subject_id = basename.split('_')[1]
session_id = basename.split('_')[3].replace('.bin', '')

# 读取数据
adc_data = read_adc_bin(radar_file)
ecg_file = f"{subject_dir}/reference/ecg/subject_{subject_id}_session_{session_id}.csv"

# 自动标注
labels = labeler.auto_label(adc_data)

# 保存标签
output_file = f"{output_dir}/subject_{subject_id}_session_{session_id}.json"
labeler.save_labels(labels, output_file)

return len(radar_files)

def batch_process(dataset_dir: str, output_dir: str, num_workers: int = 4):
"""批量处理整个数据集"""

subject_dirs = glob.glob(f"{dataset_dir}/subject_*")

os.makedirs(output_dir, exist_ok=True)

with ProcessPoolExecutor(max_workers=num_workers) as executor:
results = list(tqdm(
executor.map(
lambda d: process_subject(d, output_dir),
subject_dirs
),
total=len(subject_dirs),
desc="Processing subjects"
))

print(f"处理完成:{sum(results)} 个文件")

if __name__ == "__main__":
batch_process(
dataset_dir="mmwave_vital_signs_dataset",
output_dir="mmwave_vital_signs_dataset/labels",
num_workers=4
)

五、数据集验证

5.1 相关性验证结果

指标 心率 (HR) 呼吸率 (RR)
ICC(组内相关系数) 0.875 0.905
偏置(BPM) 1.8 -0.73
LoA 下限(BPM) -10 -4.7
LoA 上限(BPM) 14 3.3
p 值 0.003 0.001

5.2 年龄分组准确性(GoogLeNet 分类器)

分组方式 准确率
二分类(<6岁 vs ≥6岁) 96.25%
三分类 ~85%
四分类 ~75%

六、自主采集指南

6.1 硬件采购清单

组件 型号 用途 预估价格
mmWave 雷达 TI IWR6843ISK-ODS 主传感器 $300
数据采集卡 TI DCA1000 ADC 数据采集 $200
载板 TI MMWAVEICBOOST 雷达接口 $150
ECG 传感器 Movesense Medical 心率参考 $150
呼吸带 Beurer 呼吸率参考 $50
总计 ~$850

6.2 软件环境

1
2
3
4
5
6
7
8
# TI mmWave SDK
wget https://dr-download.ti.com/software-development/software-development-kit-sdk/MD-QeFjAbMuu2/04.03.00.02/mmwave_sdk_04_03_00_02-Linux.x86_64.bin

# Python 依赖
pip install numpy scipy h5py tqdm

# TI DCA1000 数据采集软件
# 从 TI 官网下载 mmWaveStudio

6.3 采集注意事项

  1. 伦理审批:必须通过 IRB 审批
  2. 知情同意:监护人必须签署同意书
  3. 儿童安全:6 岁以下使用安全座椅
  4. 体动检测:实时监测并标记体动事件
  5. 数据质量:低质量数据需重新采集

七、总结

数据集核心价值

维度 价值
年龄平衡 6-90 岁,覆盖全年龄段
同步参考 ECG + 呼吸带,ICC > 0.87
开放数据 Zenodo/FigShare 免费下载
验证充分 临床级传感器验证

开发建议

  1. 算法验证:使用本数据集验证心率/呼吸率检测精度
  2. 年龄分类:可用于儿童年龄分组模型训练
  3. 自主采集:参考本文采集方案和代码实现

参考来源:

发布日期: 2026-04-11
关键词: mmWave雷达, 生命体征检测, 数据集, TI IWR6843, 自动标注


mmWave 雷达生命体征数据集:完整采集方案与自动标注代码实现
https://dapalm.com/2026/04/11/2026-04-11-mmWave-Radar-Vital-Signs-Dataset-Age-Balanced/
作者
Mars
发布于
2026年4月11日
许可协议