MediaPipe 系列 50:IMS 数据融合——多传感器协同流水线

一、多传感器融合业务背景

1.1 为什么需要多传感器融合?

单传感器的局限性:

传感器 优势 局限性
IR 摄像头 成本低、信息丰富 受光照影响、无法穿透遮挡
毫米波雷达 穿透性强、测距准 分辨率低、无法分类
超声波 成本极低、测距准 距离短、分辨率低
热成像 全天候工作 成本高、分辨率低

融合的收益:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
┌─────────────────────────────────────────────────────────────┐
│ 多传感器融合收益 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 检测精度提升 │
│ ├── 单传感器准确率: 85% │
│ └── 多传感器融合: 95%+ │
│ │
│ 场景覆盖扩展 │
│ ├── 低光照: IR + 热成像 │
│ ├── 遮挡: 摄像头 + 雷达 │
│ └── 远距离: 摄像头 + 长焦 │
│ │
│ 鲁棒性增强 │
│ ├── 单传感器故障: 其他传感器接管 │
│ └── 传感器噪声: 多源交叉验证 │
│ │
│ Euro NCAP 2025+ 要求 │
│ ├── CPD: 推荐雷达 + 摄像头融合 │
│ └── OOP: 需要多角度覆盖 │
│ │
└─────────────────────────────────────────────────────────────┘

1.2 IMS 传感器配置

典型 IMS 传感器架构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
┌─────────────────────────────────────────────────────────────────────────┐
│ IMS 传感器配置 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ DMS (驾驶员监控系统) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ IR Camera 1 (近红外) │ │
│ │ ├── 位置: 方向盘后、仪表台上方 │ │
│ │ ├── 视角: 50° 水平,35° 垂直 │ │
│ │ ├── 分辨率: 640×480 或 1280×720 │ │
│ │ └── 用途: 疲劳、分心、眼动检测 │ │
│ │ │ │
│ │ IR Camera 2 (可选) │ │
│ │ ├── 位置: A柱 │ │
│ │ ├── 视角: 侧面视角 │ │
│ │ └── 用途: 头部姿态侧面验证 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ OMS (乘员监控系统) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ RGB/IR Camera (舱内广角) │ │
│ │ ├── 位置: 车顶中央或后视镜背面 │ │
│ │ ├── 视角: 180° 鱼眼 │ │
│ │ ├── 分辨率: 1920×1080 │ │
│ │ └── 用途: 乘员检测、CPD、遗留物检测 │ │
│ │ │ │
│ │ 毫米波雷达 (CPD) │ │
│ │ ├── 位置: 车顶或座椅下方 │ │
│ │ ├── 频段: 60GHz 或 77GHz │ │
│ │ └── 用途: 微动检测、呼吸心跳、穿透遮挡 │ │
│ │ │ │
│ │ 超声波传感器 (可选) │ │
│ │ ├── 位置: 车门、座椅 │ │
│ │ └── 用途: 近距离乘员检测 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

二、融合层次架构

2.1 三层融合架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
┌─────────────────────────────────────────────────────────────────────────┐
│ 融合层次架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Level 1: 数据级融合 (Data-level Fusion) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 时间同步: 统一所有传感器到同一时间基准 │ │
│ │ 空间对齐: 将所有数据映射到同一坐标系 │ │
│ │ 格式统一: 标准化数据格式 │ │
│ │ │ │
│ │ 输入: 原始传感器数据 │ │
│ │ 输出: 同步、对齐后的数据 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Level 2: 特征级融合 (Feature-level Fusion) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 特征提取: 各传感器独立提取特征 │ │
│ │ 特征对齐: 将特征映射到统一空间 │ │
│ │ 特征融合: 拼接、加权、注意力机制 │ │
│ │ │ │
│ │ 输入: 对齐后的数据 │ │
│ │ 输出: 融合特征向量 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Level 3: 决策级融合 (Decision-level Fusion) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 独立决策: 各传感器/算法独立输出结果 │ │
│ │ 置信度评估: 评估各结果的可靠性 │ │
│ │ 结果融合: 投票、加权平均、贝叶斯融合 │ │
│ │ │ │
│ │ 输入: 各传感器的检测结果 │ │
│ │ 输出: 最终融合决策 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

2.2 各层适用的场景

融合层次 适用场景 优点 缺点
数据级 同类型传感器 信息损失最小 计算量大、对齐困难
特征级 异构传感器 平衡精度与效率 需要特征对齐
决策级 算法级融合 实现简单、模块化 信息损失较大

三、时间同步

3.1 时间戳对齐

传感器时间戳同步是融合的基础:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
┌─────────────────────────────────────────────────────────────┐
│ 时间同步问题 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 时间线 │
│ ────────────────────────────────────────────────▶ │
│ │
│ 传感器 1 (IR Camera): │
│ │ │ │ │ │ │
t=0 t=33 t=66 t=99 t=132 (ms, 30fps) │
│ │
│ 传感器 2 (Radar): │
│ │ │ │ │ │ │ │
t=0 t=20 t=40 t=60 t=80 t=100 (ms, 50fps) │
│ │
│ 问题: │
│ - 采样率不同: 30fps vs 50fps │
│ - 相位偏差: 两传感器不同时启动 │
│ - 时钟漂移: 各传感器独立时钟源 │
│ │
│ 解决方案: │
│ 1. 硬件同步: 使用外部触发信号 │
│ 2. 软件同步: 时间戳插值与校正 │
│ │
└─────────────────────────────────────────────────────────────┘

3.2 时间同步 Calculator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// mediapipe/calculators/ims/time_sync_calculator.h

#ifndef MEDIAPIPE_CALCULATORS_IMS_TIME_SYNC_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_IMS_TIME_SYNC_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include <deque>
#include <map>

namespace mediapipe {

// 时间同步的数据包
template<typename T>
struct SyncedPacket {
T data;
int64_t timestamp;
float confidence;
};

class TimeSyncCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc);

absl::Status Open(CalculatorContext* cc) override;
absl::Status Process(CalculatorContext* cc) override;

private:
// 获取最近的数据包
template<typename T>
SyncedPacket<T> GetNearestPacket(
const std::deque<SyncedPacket<T>>& buffer,
int64_t target_timestamp);

// 线性插值
template<typename T>
SyncedPacket<T> InterpolatePacket(
const SyncedPacket<T>& p1,
const SyncpPacket<T>& p2,
int64_t target_timestamp);

// 数据缓冲
std::map<std::string, std::deque<SyncedPacket<cv::Mat>>> image_buffers_;
std::map<std::string, std::deque<SyncedPacket<RadarData>>> radar_buffers_;

// 同步策略配置
int64_t max_buffer_size_ms_ = 100; // 最大缓冲时间
int64_t sync_tolerance_ms_ = 5; // 同步容忍度
};

} // namespace mediapipe

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// mediapipe/calculators/ims/time_sync_calculator.cc

#include "mediapipe/calculators/ims/time_sync_calculator.h"

namespace mediapipe {

absl::Status TimeSyncCalculator::GetContract(CalculatorContract* cc) {
// 多传感器输入
cc->Inputs().Tag("IR_IMAGE").Set<cv::Mat>();
cc->Inputs().Tag("OMS_IMAGE").Set<cv::Mat>();
cc->Inputs().Tag("RADAR_DATA").Set<RadarData>();
cc->Inputs().Tag("TIMESTAMP").Set<int64_t>();

// 同步后的输出
cc->Outputs().Tag("SYNCED_IR_IMAGE").Set<cv::Mat>();
cc->Outputs().Tag("SYNCED_OMS_IMAGE").Set<cv::Mat>();
cc->Outputs().Tag("SYNCED_RADAR_DATA").Set<RadarData>();

return absl::OkStatus();
}

absl::Status TimeSyncCalculator::Process(CalculatorContext* cc) {
int64_t target_timestamp = cc->InputTimestamp().Value();

// 1. 将新数据放入缓冲
if (!cc->Inputs().Tag("IR_IMAGE").IsEmpty()) {
const auto& ir_image = cc->Inputs().Tag("IR_IMAGE").Get<cv::Mat>();
image_buffers_["IR"].push_back({ir_image, target_timestamp, 1.0f});
}

if (!cc->Inputs().Tag("OMS_IMAGE").IsEmpty()) {
const auto& oms_image = cc->Inputs().Tag("OMS_IMAGE").Get<cv::Mat>();
image_buffers_["OMS"].push_back({oms_image, target_timestamp, 1.0f});
}

if (!cc->Inputs().Tag("RADAR_DATA").IsEmpty()) {
const auto& radar = cc->Inputs().Tag("RADAR_DATA").Get<RadarData>();
radar_buffers_["RADAR"].push_back({radar, target_timestamp, 1.0f});
}

// 2. 清理过期数据
for (auto& [name, buffer] : image_buffers_) {
while (!buffer.empty() &&
target_timestamp - buffer.front().timestamp > max_buffer_size_ms_ * 1000) {
buffer.pop_front();
}
}

// 3. 获取同步数据
auto synced_ir = GetNearestPacket(image_buffers_["IR"], target_timestamp);
auto synced_oms = GetNearestPacket(image_buffers_["OMS"], target_timestamp);
auto synced_radar = GetNearestPacket(radar_buffers_["RADAR"], target_timestamp);

// 4. 检查同步质量
bool ir_valid = std::abs(synced_ir.timestamp - target_timestamp) < sync_tolerance_ms_ * 1000;
bool oms_valid = std::abs(synced_oms.timestamp - target_timestamp) < sync_tolerance_ms_ * 1000;
bool radar_valid = std::abs(synced_radar.timestamp - target_timestamp) < sync_tolerance_ms_ * 1000;

// 5. 输出
if (ir_valid) {
cc->Outputs().Tag("SYNCED_IR_IMAGE").AddPacket(
MakePacket<cv::Mat>(synced_ir.data).At(cc->InputTimestamp()));
}

if (oms_valid) {
cc->Outputs().Tag("SYNCED_OMS_IMAGE").AddPacket(
MakePacket<cv::Mat>(synced_oms.data).At(cc->InputTimestamp()));
}

if (radar_valid) {
cc->Outputs().Tag("SYNCED_RADAR_DATA").AddPacket(
MakePacket<RadarData>(synced_radar.data).At(cc->InputTimestamp()));
}

return absl::OkStatus();
}

template<typename T>
SyncedPacket<T> TimeSyncCalculator::GetNearestPacket(
const std::deque<SyncedPacket<T>>& buffer,
int64_t target_timestamp) {

if (buffer.empty()) {
return {T(), 0, 0.0f};
}

// 找到最近的两个点
auto it = std::lower_bound(buffer.begin(), buffer.end(), target_timestamp,
[](const SyncedPacket<T>& p, int64_t ts) {
return p.timestamp < ts;
});

if (it == buffer.end()) {
return buffer.back();
}

if (it == buffer.begin()) {
return *it;
}

auto prev = it - 1;

// 线性插值
return InterpolatePacket(*prev, *it, target_timestamp);
}

REGISTER_CALCULATOR(TimeSyncCalculator);

} // namespace mediapipe

四、空间标定

4.1 外参标定

将各传感器数据映射到统一坐标系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
┌─────────────────────────────────────────────────────────────┐
│ 坐标系定义 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 车辆坐标系 (Vehicle Frame) │
│ ├── 原点: 车辆后轴中心 │
│ ├── X 轴: 指向车辆前方 │
│ ├── Y 轴: 指向车辆左侧 │
│ └── Z 轴: 垂直向上 │
│ │
│ 相机坐标系 (Camera Frame) │
│ ├── 原点: 相机光心 │
│ ├── X 轴: 图像右方向 │
│ ├── Y 轴: 图像下方向 │
│ └── Z 轴: 光轴方向 │
│ │
│ 变换关系:
│ P_vehicle = R_cam_to_vehicle * P_cam + T_cam_to_vehicle │
│ │
│ 其中:
│ - R: 旋转矩阵 (3x3) │
│ - T: 平移向量 (3x1) │
│ │
└─────────────────────────────────────────────────────────────┘

4.2 标定 Calculator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// mediapipe/calculators/ims/spatial_calibration_calculator.cc

absl::Status SpatialCalibrationCalculator::Process(CalculatorContext* cc) {
// 获取各传感器的检测结果
const auto& dms_detection = cc->Inputs().Tag("DMS_DETECTION").Get<FaceDetection>();
const auto& oms_detection = cc->Inputs().Tag("OMS_DETECTION").Get<OccupantDetection>();

// 加载标定参数
// 格式: [R|T], 3x4 矩阵
Eigen::Matrix4f T_dms_to_vehicle = LoadCalibration("dms_to_vehicle.yaml");
Eigen::Matrix4f T_oms_to_vehicle = LoadCalibration("oms_to_vehicle.yaml");

// 将 DMS 检测结果转换到车辆坐标系
Eigen::Vector3f dms_position_cam(
dms_detection.center_x,
dms_detection.center_y,
dms_detection.center_z
);

Eigen::Vector4f dms_pos_homogeneous;
dms_pos_homogeneous << dms_position_cam, 1.0f;
Eigen::Vector4f dms_position_vehicle = T_dms_to_vehicle * dms_pos_homogeneous;

// 将 OMS 检测结果转换到车辆坐标系
std::vector<Eigen::Vector3f> oms_positions_vehicle;
for (const auto& occupant : oms_detection.occupants) {
Eigen::Vector3f pos_cam(occupant.center_x, occupant.center_y, occupant.center_z);
Eigen::Vector4f pos_homogeneous;
pos_homogeneous << pos_cam, 1.0f;
Eigen::Vector4f pos_vehicle = T_oms_to_vehicle * pos_homogeneous;
oms_positions_vehicle.push_back(pos_vehicle.head<3>());
}

// 输出转换后的结果
FusionResult result;
result.dms_position_vehicle = dms_position_vehicle.head<3>();
result.oms_positions_vehicle = oms_positions_vehicle;

cc->Outputs().Tag("FUSION_RESULT").AddPacket(
MakePacket<FusionResult>(result).At(cc->InputTimestamp()));

return absl::OkStatus();
}

五、决策级融合

5.1 多源决策融合策略

IMS 中常用的融合策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
┌─────────────────────────────────────────────────────────────┐
│ 决策融合策略 │
├─────────────────────────────────────────────────────────────┤
│ │
1. 加权投票 (Weighted Voting) │
│ ├── 每个传感器投票 │
│ ├── 权重根据传感器可靠性动态调整 │
│ └── 最终结果 = Σ(weight_i * vote_i) / Σ(weight_i) │
│ │
2. 贝叶斯融合 (Bayesian Fusion) │
│ ├── 将各传感器结果视为观测 │
│ ├── 使用贝叶斯定理更新后验概率 │
│ └── P(state|observations) ∝ P(observations|state) │
│ │
3. Dempster-Shafer 证据理论 │
│ ├── 处理不确定性和冲突 │
│ ├── 支持多假设 │
│ └── 适用于传感器冲突场景 │
│ │
4. 卡尔曼滤波 (Kalman Filter) │
│ ├── 状态估计与预测 │
│ ├── 融合多传感器测量 │
│ └── 适用于目标跟踪 │
│ │
└─────────────────────────────────────────────────────────────┘

5.2 疲劳检测融合示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// 疲劳检测多传感器融合

struct FatigueEvidence {
// 来自 DMS 摄像头
float eye_closure_score; // 眼睛闭合程度 [0, 1]
float yawn_score; // 打哈欠分数 [0, 1]
float blink_rate; // 眨眼频率
float gaze_deviation; // 视线偏离

// 来自车辆信号
float steering_entropy; // 方向盘熵(不稳定性)
float lane_deviation; // 车道偏离
float speed_variation; // 速度变化

// 来自时间统计
float driving_duration_hours; // 连续驾驶时长
float time_of_day; // 一天中的时间 (影响生物钟)
};

class FatigueFusionEngine {
public:
FatigueResult Fuse(const FatigueEvidence& evidence) {
// 1. 计算各指标权重
CalculateWeights(evidence);

// 2. 多指标加权融合
float fatigue_score = 0.0f;

// 眼睛相关指标(权重最高)
fatigue_score += weights_.eye_closure * evidence.eye_closure_score;
fatigue_score += weights_.yawn * evidence.yawn_score;
fatigue_score += weights_.blink_rate * NormalizeBlinkRate(evidence.blink_rate);

// 行为相关指标
fatigue_score += weights_.steering * evidence.steering_entropy;
fatigue_score += weights_.lane * evidence.lane_deviation;

// 时间相关因素
fatigue_score *= GetTimeFactor(evidence.driving_duration_hours, evidence.time_of_day);

// 3. 确定疲劳等级
FatigueLevel level = DetermineLevel(fatigue_score);

// 4. 置信度评估
float confidence = CalculateConfidence(evidence);

return {fatigue_score, level, confidence};
}

private:
void CalculateWeights(const FatigueEvidence& evidence) {
// 根据传感器可用性动态调整权重
float total_weight = 0.0f;

// 眼睛指标权重(始终最重要)
weights_.eye_closure = 0.35f;
weights_.yawn = 0.15f;
weights_.blink_rate = 0.10f;

// 行为指标权重(根据车速调整)
weights_.steering = 0.20f;
weights_.lane = 0.15f;

// 归一化
float sum = weights_.eye_closure + weights_.yawn + weights_.blink_rate +
weights_.steering + weights_.lane;
weights_.eye_closure /= sum;
weights_.yawn /= sum;
weights_.blink_rate /= sum;
weights_.steering /= sum;
weights_.lane /= sum;
}

float GetTimeFactor(float driving_hours, float time_of_day) {
float factor = 1.0f;

// 长时间驾驶增加疲劳风险
if (driving_hours > 2.0f) {
factor += (driving_hours - 2.0f) * 0.1f;
}

// 深夜/凌晨时段风险更高
if (time_of_day >= 0 && time_of_day < 6) {
factor *= 1.5f;
} else if (time_of_day >= 13 && time_of_day < 15) {
factor *= 1.2f; // 午后困倦
}

return std::min(factor, 2.0f);
}
};

六、完整融合 Graph

6.1 IMS 融合 Graph 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# mediapipe/graphs/ims/ims_fusion_graph.pbtxt

# 输入
input_stream: "IR_IMAGE:ir_image"
input_stream: "OMS_IMAGE:oms_image"
input_stream: "RADAR_DATA:radar_data"
input_stream: "VEHICLE_STATE:vehicle_state"
input_stream: "TIMESTAMP:timestamp"

# 输出
output_stream: "DMS_RESULT:dms_result"
output_stream: "OMS_RESULT:oms_result"
output_stream: "FUSION_RESULT:fusion_result"

# ============== 1. 时间同步 ==============
node {
calculator: "TimeSyncCalculator"
input_stream: "IR_IMAGE:ir_image"
input_stream: "OMS_IMAGE:oms_image"
input_stream: "RADAR_DATA:radar_data"
input_stream: "TIMESTAMP:timestamp"
output_stream: "SYNCED_IR_IMAGE:synced_ir"
output_stream: "SYNCED_OMS_IMAGE:synced_oms"
output_stream: "SYNCED_RADAR_DATA:synced_radar"
}

# ============== 2. DMS 处理 ==============
node {
calculator: "DMSGraph"
input_stream: "IR_IMAGE:synced_ir"
output_stream: "DMS_RESULT:dms_raw"
}

# ============== 3. OMS 处理 ==============
node {
calculator: "OMSGraph"
input_stream: "IMAGE:synced_oms"
input_stream: "RADAR_DATA:synced_radar"
output_stream: "OMS_RESULT:oms_raw"
}

# ============== 4. 空间标定 ==============
node {
calculator: "SpatialCalibrationCalculator"
input_stream: "DMS_DETECTION:dms_raw"
input_stream: "OMS_DETECTION:oms_raw"
output_stream: "CALIBRATED_DATA:calibrated_data"
}

# ============== 5. 决策融合 ==============
node {
calculator: "DecisionFusionCalculator"
input_stream: "DMS_RESULT:dms_raw"
input_stream: "OMS_RESULT:oms_raw"
input_stream: "VEHICLE_STATE:vehicle_state"
input_stream: "CALIBRATED_DATA:calibrated_data"
output_stream: "FUSION_RESULT:fusion_result"
}

# ============== 6. 输出标准化 ==============
node {
calculator: "ResultFormatterCalculator"
input_stream: "DMS_RESULT:dms_raw"
input_stream: "OMS_RESULT:oms_raw"
input_stream: "FUSION_RESULT:fusion_result"
output_stream: "DMS_RESULT:dms_result"
output_stream: "OMS_RESULT:oms_result"
output_stream: "FUSION_RESULT:fusion_result"
}

七、性能优化

7.1 并行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 使用 Executor 实现并行处理

# DMS 和 OMS 并行执行
node {
calculator: "DMSGraph"
input_stream: "IR_IMAGE:synced_ir"
output_stream: "DMS_RESULT:dms_raw"
executor: "dms_executor"
}

node {
calculator: "OMSGraph"
input_stream: "IMAGE:synced_oms"
input_stream: "RADAR_DATA:synced_radar"
output_stream: "OMS_RESULT:oms_raw"
executor: "oms_executor"
}

# 配置 Executor
executor {
name: "dms_executor"
type: "ThreadPool"
options {
[mediapipe.ThreadPoolExecutorOptions.ext] {
num_threads: 2
}
}
}

executor {
name: "oms_executor"
type: "ThreadPool"
options {
[mediapipe.ThreadPoolExecutorOptions.ext] {
num_threads: 2
}
}
}

7.2 内存优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 使用零拷贝设计减少内存拷贝

// 方式 1: 使用共享指针
auto shared_image = std::make_shared<cv::Mat>(image);
cc->Outputs().Tag("IMAGE").AddPacket(
MakePacket<std::shared_ptr<cv::Mat>>(shared_image).At(cc->InputTimestamp()));

// 方式 2: 使用 GpuBuffer(GPU 端零拷贝)
node {
calculator: "ImageTransformationGpu"
input_stream: "IMAGE_GPU:input_image"
output_stream: "IMAGE_GPU:output_image"
# GPU 端无内存拷贝
}

八、总结

要点 说明
时间同步 统一时间基准、插值补偿
空间标定 外参标定、坐标转换
特征融合 特征拼接、加权融合
决策融合 加权投票、贝叶斯融合

系列进度: 50/55
更新时间: 2026-03-12


MediaPipe 系列 50:IMS 数据融合——多传感器协同流水线
https://dapalm.com/2026/03/12/MediaPipe系列50-IMS数据融合:多传感器协同/
作者
Mars
发布于
2026年3月12日
许可协议