MediaPipe 系列 25:错误处理 Calculator——异常恢复机制完整指南

前言:为什么需要错误处理?

25.1 生产环境的挑战

IMS DMS 系统必须具备高可靠性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
┌─────────────────────────────────────────────────────────────────────────┐
│ IMS DMS 系统可靠性要求 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 挑战: │
│ ├── 摄像头断连、遮挡、低光 │
│ ├── 模型推理失败、内存不足、GPU 异常 │
│ ├── 网络中断、数据延迟 │
│ └── 外部依赖不可用(车辆信号、ADAS 接口) │
│ │
│ 后果: │
│ ├── 疲劳检测失效 → 安全风险 │
│ ├── 系统崩溃 → 用户体验极差 │
│ └── 误报/漏报 → 用户信任度下降 │
│ │
│ 解决方案:错误处理 + 异常恢复 │
│ ├── Calculator 级别:单节点容错 │
│ ├── Graph 级别:流水线降级 │
│ └── 系统级别:多级备份 │
│ │
└─────────────────────────────────────────────────────────────────────────┘

25.2 错误处理设计原则

原则 说明
Fail-Safe 失败时进入安全状态,不做危险操作
Graceful Degradation 降级而非崩溃,保证核心功能
快速恢复 短暂故障后快速恢复正常
日志完整 记录错误信息,便于排查

二十六、Calculator 级别错误处理

26.1 错误类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
┌─────────────────────────────────────────────────────────────┐
│ MediaPipe 错误类型 │
├─────────────────────────────────────────────────────────────┤
│ │
1. 输入错误 (Input Errors) │
│ ├── 输入流为空 │
│ ├── 输入类型不匹配 │
│ └── 输入数据损坏 │
│ │
2. 处理错误 (Processing Errors) │
│ ├── 模型推理失败 │
│ ├── 内存不足 │
│ └── 计算超时 │
│ │
3. 外部依赖错误 (External Errors) │
│ ├── 文件不存在 │
│ ├── 网络请求失败 │
│ └── 硬件不可用 │
│ │
4. 配置错误 (Configuration Errors) │
│ ├── 参数无效 │
│ ├── 资源路径错误 │
│ └── 版本不兼容 │
│ │
└─────────────────────────────────────────────────────────────┘

26.2 错误处理策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// error_handling_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_CORE_ERROR_HANDLING_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_CORE_ERROR_HANDLING_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/port/status.h"

namespace mediapipe {

// ========== 错误处理策略 ==========
enum class ErrorHandlingStrategy {
// 传递错误(默认)- 停止 Graph
PROPAGATE_ERROR,

// 忽略错误 - 跳过当前帧
IGNORE_ERROR,

// 输出默认值 - 使用预设值
OUTPUT_DEFAULT,

// 重试 - 重试处理
RETRY,

// 降级 - 使用备用方案
FALLBACK,
};

// ========== 错误处理 Calculator ==========
class ErrorHandlingCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
// 主输入/输出
cc->Inputs().Tag("INPUT").Set<AnyType>();
cc->Outputs().Tag("OUTPUT").Set<AnyType>();

// 错误输出(可选)
if (cc->Outputs().HasTag("ERROR")) {
cc->Outputs().Tag("ERROR").Set<absl::Status>();
}

// 配置
cc->Options<ErrorHandlingOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<ErrorHandlingOptions>();

strategy_ = options.strategy();
max_retries_ = options.max_retries();
retry_delay_ms_ = options.retry_delay_ms();

// 设置默认值
if (options.has_default_value()) {
default_value_ = options.default_value();
}

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
auto status = ProcessInternal(cc);

if (status.ok()) {
return absl::OkStatus();
}

// ========== 根据策略处理错误 ==========
switch (strategy_) {
case ErrorHandlingStrategy::PROPAGATE_ERROR:
// 直接返回错误,Graph 会停止
return status;

case ErrorHandlingStrategy::IGNORE_ERROR:
// 忽略错误,跳过此帧
LOG(WARNING) << "Ignoring error: " << status;
return absl::OkStatus();

case ErrorHandlingStrategy::OUTPUT_DEFAULT:
// 输出默认值
OutputDefault(cc);
LOG(WARNING) << "Outputting default due to error: " << status;
return absl::OkStatus();

case ErrorHandlingStrategy::RETRY:
// 重试处理
return RetryProcess(cc, status);

case ErrorHandlingStrategy::FALLBACK:
// 降级处理
return FallbackProcess(cc, status);

default:
return status;
}
}

private:
ErrorHandlingStrategy strategy_;
int max_retries_ = 3;
int retry_delay_ms_ = 100;
std::optional<AnyType> default_value_;
int retry_count_ = 0;

absl::Status ProcessInternal(CalculatorContext* cc) {
// 实际处理逻辑(由子类实现)
return absl::OkStatus();
}

void OutputDefault(CalculatorContext* cc) {
if (default_value_.has_value()) {
cc->Outputs().Tag("OUTPUT").AddPacket(
MakePacket<AnyType>(default_value_.value()).At(cc->InputTimestamp()));
}
}

absl::Status RetryProcess(CalculatorContext* cc, const absl::Status& error) {
if (retry_count_ < max_retries_) {
retry_count_++;
LOG(WARNING) << "Retry " << retry_count_ << "/" << max_retries_
<< " after error: " << error;

// 延迟重试
std::this_thread::sleep_for(std::chrono::milliseconds(retry_delay_ms_));

return ProcessInternal(cc);
}

// 重试次数用尽,返回错误
retry_count_ = 0;
LOG(ERROR) << "Max retries exceeded";
return error;
}

absl::Status FallbackProcess(CalculatorContext* cc, const absl::Status& error) {
LOG(WARNING) << "Fallback processing due to error: " << error;
// 降级逻辑(由子类实现)
return absl::OkStatus();
}
};

REGISTER_CALCULATOR(ErrorHandlingCalculator);

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_CORE_ERROR_HANDLING_CALCULATOR_H_

二十七、Graph 级别容错

27.1 使用 Gate Calculator 实现降级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
┌─────────────────────────────────────────────────────────────────────────┐
Graph 级别降级策略 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 场景:人脸检测 + 关键点检测 + 疲劳判断 │
│ │
│ 正常流程: │
│ ┌─────────┐ ┌───────────┐ ┌─────────────┐ │
│ │ Camera │ ──→ │ Face Mesh │ ──→ │ Fatigue │ ──→ 结果 │
│ │ Input │ │ (468) │ │ Detection │ │
│ └─────────┘ └───────────┘ └─────────────┘ │
│ │
│ 降级流程(Face Mesh 失败): │
│ ┌─────────┐ ┌───────────┐ ┌─────────────┐ │
│ │ Camera │ ──→ │ Face Det │ ──→ │ Simplified │ ──→ 结果 │
│ │ Input │ │ (仅检测) │ │ Analysis │ │
│ └─────────┘ └───────────┘ └─────────────┘ │
│ │
│ 使用 Gate Calculator 选择路径: │
│ ├── 正常:Gate 选择 Face Mesh 路径 │
│ └── 降级:Gate 选择 Face Detection 路径 │
│ │
└─────────────────────────────────────────────────────────────────────────┘

27.2 降级 Graph 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# ========== 降级 Graph 配置 ==========

input_stream: "IMAGE:image"
output_stream: "RESULT:result"

# ========== 1. 人脸检测(必需)==========
node {
calculator: "FaceDetectionShortRangeCpu"
input_stream: "IMAGE:image"
output_stream: "DETECTIONS:face_detections"
}

# ========== 2. 尝试 Face Mesh ==========
node {
calculator: "FaceMeshCpu"
input_stream: "IMAGE:image"
input_stream: "DETECTIONS:face_detections"
output_stream: "LANDMARKS:face_landmarks"
output_stream: "FACE_RECTS:face_rects"
node_options: {
[type.googleapis.com/mediapipe.FaceMeshOptions]: {
max_num_faces: 1
}
}
}

# ========== 3. 错误检测 ==========
node {
calculator: "PacketPresenceCalculator"
input_stream: "PACKET:face_landmarks"
output_stream: "PRESENCE:mesh_available"
}

# ========== 4. 疲劳检测(完整)==========
node {
calculator: "FatigueDetectionCalculator"
input_stream: "LANDMARKS:face_landmarks"
output_stream: "RESULT:full_result"
}

# ========== 5. 疲劳检测(简化)==========
node {
calculator: "SimplifiedFatigueCalculator"
input_stream: "DETECTIONS:face_detections"
output_stream: "RESULT:simplified_result"
}

# ========== 6. Gate 选择 ==========
node {
calculator: "GateCalculator"
input_stream: "full_result"
input_stream: "simplified_result"
input_stream: "ENABLE:mesh_available"
output_stream: "OUTPUT:result"
node_options: {
[type.googleapis.com/mediapipe.GateCalculatorOptions]: {
allow: true
}
}
}

# ========== 7. 默认值兜底 ==========
node {
calculator: "DefaultIfEmptyCalculator"
input_stream: "INPUT:result"
output_stream: "OUTPUT:final_result"
node_options: {
[type.googleapis.com/mediapipe.DefaultIfEmptyCalculatorOptions]: {
default_value: {
fatigue_score: 0.0
fatigue_level: 0
confidence: 0.0
}
}
}
}

27.3 DefaultIfEmpty Calculator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// default_if_empty_calculator.cc

#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

class DefaultIfEmptyCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("INPUT").Set<AnyType>();
cc->Outputs().Tag("OUTPUT").Set<AnyType>();
cc->Options<DefaultIfEmptyOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<DefaultIfEmptyOptions>();
default_value_ = options.default_value();
return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
if (cc->Inputs().Tag("INPUT").IsEmpty()) {
// 输入为空,输出默认值
cc->Outputs().Tag("OUTPUT").AddPacket(
MakePacket<AnyType>(default_value_).At(cc->InputTimestamp()));
VLOG(1) << "Outputting default value";
} else {
// 正常传递
cc->Outputs().Tag("OUTPUT").AddPacket(
cc->Inputs().Tag("INPUT").Value());
}
return absl::OkStatus();
}

private:
AnyType default_value_;
};

REGISTER_CALCULATOR(DefaultIfEmptyCalculator);

} // namespace mediapipe

二十八、重试与超时机制

28.1 带超时的推理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// timeout_inference_calculator.cc

#include "mediapipe/framework/calculator_framework.h"
#include <future>

namespace mediapipe {

class TimeoutInferenceCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("TENSORS").Set<std::vector<TfLiteTensor>>();
cc->Outputs().Tag("TENSORS").Set<std::vector<TfLiteTensor>>();
cc->Options<TimeoutInferenceOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<TimeoutInferenceOptions>();
timeout_ms_ = options.timeout_ms();

// 加载模型
model_ = tflite::FlatBufferModel::BuildFromFile(options.model_path().c_str());
interpreter_ = std::make_unique<tflite::Interpreter>();
model_->error_reporter();
tflite::InterpreterBuilder(*model_)(*interpreter_);
interpreter_->AllocateTensors();

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
const auto& input_tensors =
cc->Inputs().Tag("TENSORS").Get<std::vector<TfLiteTensor>>();

// ========== 异步推理 ==========
auto future = std::async(std::launch::async, [this, &input_tensors]() {
// 复制输入数据
for (int i = 0; i < input_tensors.size(); ++i) {
memcpy(interpreter_->input_tensor(i)->data.raw,
input_tensors[i].data.raw,
input_tensors[i].bytes);
}

// 执行推理
return interpreter_->Invoke();
});

// ========== 等待结果(带超时)==========
auto status = future.wait_for(std::chrono::milliseconds(timeout_ms_));

if (status == std::future_status::timeout) {
LOG(WARNING) << "Inference timeout after " << timeout_ms_ << "ms";
return absl::DeadlineExceededError("Inference timeout");
}

if (status == std::future_status::ready) {
auto result = future.get();
if (result != kTfLiteOk) {
return absl::InternalError("Inference failed");
}

// 复制输出
auto output_tensors = std::make_unique<std::vector<TfLiteTensor>>();
for (int i = 0; i < interpreter_->outputs().size(); ++i) {
output_tensors->push_back(*interpreter_->output_tensor(i));
}

cc->Outputs().Tag("TENSORS").Add(output_tensors.release(),
cc->InputTimestamp());
}

return absl::OkStatus();
}

private:
std::unique_ptr<tflite::FlatBufferModel> model_;
std::unique_ptr<tflite::Interpreter> interpreter_;
int timeout_ms_ = 100;
};

REGISTER_CALCULATOR(TimeoutInferenceCalculator);

} // namespace mediapipe

二十九、IMS DMS 实战:完整错误处理

29.1 DMS Graph 错误处理架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
┌─────────────────────────────────────────────────────────────────────────┐
│ DMS 完整错误处理架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ 第一层:输入验证 │ │
│ │ ┌────────────┐ ┌─────────────┐ ┌──────────────┐ │ │
│ │ │ IR Camera │ ──→ │ Image │ ──→ │ Quality │ │ │
│ │ │ │ │ Validator │ │ Check │ │ │
│ │ └────────────┘ └─────────────┘ └──────────────┘ │ │
│ │ │ │ │ │
│ │ ↓ ↓ │ │
│ │ 格式/尺寸验证 低光/模糊检测 │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ 第二层:人脸检测 │ │
│ │ ┌────────────┐ ┌─────────────┐ ┌──────────────┐ │ │
│ │ │ Face │ ──→ │ Error │ ──→ │ Fallback │ │ │
│ │ │ Detection │ │ Handler │ │ Detector │ │ │
│ │ └────────────┘ └─────────────┘ └──────────────┘ │ │
│ │ │ │ │ │
│ │ ↓ ↓ │ │
│ │ BlazeFace Haar/ROI方法 │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ 第三层:关键点检测 │ │
│ │ ┌────────────┐ ┌─────────────┐ ┌──────────────┐ │ │
│ │ │ Face Mesh │ ──→ │ Quality │ ──→ │ Keypoint │ │ │
│ │ │ │ │ Filter │ │ Validator │ │ │
│ │ └────────────┘ └─────────────┘ └──────────────┘ │ │
│ │ │ │ │ │
│ │ ↓ ↓ │ │
│ │ 置信度过滤 关键点有效性检查 │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ 第四层:疲劳判断 │ │
│ │ ┌────────────┐ ┌─────────────┐ ┌──────────────┐ │ │
│ │ │ Fatigue │ ──→ │ Result │ ──→ │ Alert │ │ │
│ │ │ Detection │ | Validator │ │ Manager │ │ │
│ │ └────────────┘ └─────────────┘ └──────────────┘ │ │
│ │ │ │ │ │
│ │ ↓ ↓ │ │
│ │ 连续帧确认 多级告警策略 │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

29.2 图像质量检查 Calculator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// image_quality_calculator.cc

#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/formats/image_frame.h"

namespace mediapipe {

// ========== 图像质量消息 ==========
message ImageQuality {
float brightness = 1; // 亮度 [0, 1]
float contrast = 2; // 对比度 [0, 1]
float sharpness = 3; // 清晰度 [0, 1]
float occlusion = 4; // 遮挡程度 [0, 1]
bool is_valid = 5; // 是否有效
std::string reason = 6; // 无效原因
}

class ImageQualityCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("IMAGE").Set<ImageFrame>();
cc->Outputs().Tag("IMAGE").Set<ImageFrame>();
cc->Outputs().Tag("QUALITY").Set<ImageQuality>();
cc->Options<ImageQualityOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<ImageQualityOptions>();
min_brightness_ = options.min_brightness();
max_brightness_ = options.max_brightness();
min_contrast_ = options.min_contrast();
min_sharpness_ = options.min_sharpness();
return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
const auto& image = cc->Inputs().Tag("IMAGE").Get<ImageFrame>();

ImageQuality quality;

// ========== 计算亮度 ==========
quality.set_brightness(CalculateBrightness(image));

// ========== 计算对比度 ==========
quality.set_contrast(CalculateContrast(image));

// ========== 计算清晰度(拉普拉斯方差)==========
quality.set_sharpness(CalculateSharpness(image));

// ========== 验证质量 ==========
bool valid = true;
std::string reason;

if (quality.brightness() < min_brightness_) {
valid = false;
reason = "Too dark: " + std::to_string(quality.brightness());
} else if (quality.brightness() > max_brightness_) {
valid = false;
reason = "Too bright: " + std::to_string(quality.brightness());
} else if (quality.contrast() < min_contrast_) {
valid = false;
reason = "Low contrast: " + std::to_string(quality.contrast());
} else if (quality.sharpness() < min_sharpness_) {
valid = false;
reason = "Too blurry: " + std::to_string(quality.sharpness());
}

quality.set_is_valid(valid);
quality.set_reason(reason);

// ========== 输出 ==========
cc->Outputs().Tag("IMAGE").AddPacket(
cc->Inputs().Tag("IMAGE").Value());
cc->Outputs().Tag("QUALITY").AddPacket(
MakePacket<ImageQuality>(quality).At(cc->InputTimestamp()));

if (!valid) {
VLOG(1) << "Image quality check failed: " << reason;
}

return absl::OkStatus();
}

private:
float min_brightness_ = 0.1f;
float max_brightness_ = 0.9f;
float min_contrast_ = 0.05f;
float min_sharpness_ = 10.0f;

float CalculateBrightness(const ImageFrame& image) {
// 计算平均亮度
float sum = 0.0f;
int count = 0;

for (int y = 0; y < image.Height(); ++y) {
const uint8_t* row = image.PixelData() + y * image.WidthStep();
for (int x = 0; x < image.Width(); ++x) {
sum += row[x];
count++;
}
}

return (sum / count) / 255.0f;
}

float CalculateContrast(const ImageFrame& image) {
// 计算标准差作为对比度
float mean = 0.0f;
int count = image.Width() * image.Height();

// 计算均值
for (int y = 0; y < image.Height(); ++y) {
const uint8_t* row = image.PixelData() + y * image.WidthStep();
for (int x = 0; x < image.Width(); ++x) {
mean += row[x];
}
}
mean /= count;

// 计算方差
float variance = 0.0f;
for (int y = 0; y < image.Height(); ++y) {
const uint8_t* row = image.PixelData() + y * image.WidthStep();
for (int x = 0; x < image.Width(); ++x) {
variance += (row[x] - mean) * (row[x] - mean);
}
}
variance /= count;

return std::sqrt(variance) / 255.0f;
}

float CalculateSharpness(const ImageFrame& image) {
// 拉普拉斯方差法
// 简化实现:计算相邻像素差异
float sum = 0.0f;
int count = 0;

for (int y = 1; y < image.Height() - 1; ++y) {
const uint8_t* row_prev = image.PixelData() + (y - 1) * image.WidthStep();
const uint8_t* row = image.PixelData() + y * image.WidthStep();
const uint8_t* row_next = image.PixelData() + (y + 1) * image.WidthStep();

for (int x = 1; x < image.Width() - 1; ++x) {
// 拉普拉斯算子
int laplacian = -4 * row[x] + row_prev[x] + row_next[x] +
row[x - 1] + row[x + 1];
sum += laplacian * laplacian;
count++;
}
}

return std::sqrt(sum / count);
}
};

REGISTER_CALCULATOR(ImageQualityCalculator);

} // namespace mediapipe

29.3 告警管理 Calculator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// alert_manager_calculator.cc

#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// ========== 告警消息 ==========
message Alert {
enum Level {
NONE = 0;
INFO = 1;
WARNING = 2;
CRITICAL = 3;
}

Level level = 1;
std::string type = 2;
std::string message = 3;
uint64 timestamp_ms = 4;
}

// ========== 告警管理 Calculator ==========
class AlertManagerCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("FATIGUE_RESULT").Set<FatigueResult>();
cc->Inputs().Tag("IMAGE_QUALITY").Set<ImageQuality>();
cc->Outputs().Tag("ALERT").Set<Alert>();
cc->Options<AlertManagerOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<AlertManagerOptions>();

// 告警阈值
warning_threshold_ = options.warning_threshold();
critical_threshold_ = options.critical_threshold();

// 连续帧确认
confirmation_frames_ = options.confirmation_frames();

// 冷却时间(避免频繁告警)
cooldown_ms_ = options.cooldown_ms();

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
uint64 current_time = cc->InputTimestamp().Value() / 1000;

// ========== 检查图像质量 ==========
if (!cc->Inputs().Tag("IMAGE_QUALITY").IsEmpty()) {
const auto& quality = cc->Inputs().Tag("IMAGE_QUALITY").Get<ImageQuality>();

if (!quality.is_valid()) {
// 图像质量差,跳过检测
consecutive_fatigue_frames_ = 0;
return absl::OkStatus();
}
}

// ========== 检查疲劳结果 ==========
if (cc->Inputs().Tag("FATIGUE_RESULT").IsEmpty()) {
return absl::OkStatus();
}

const auto& result = cc->Inputs().Tag("FATIGUE_RESULT").Get<FatigueResult>();

// ========== 连续帧确认 ==========
if (result.fatigue_level() >= 2) {
consecutive_fatigue_frames_++;
} else {
consecutive_fatigue_frames_ = 0;
}

// ========== 告警判断 ==========
Alert alert;
alert.set_timestamp_ms(current_time);

if (consecutive_fatigue_frames_ >= confirmation_frames_) {
// 检查冷却时间
if (current_time - last_alert_time_ >= cooldown_ms_) {
if (result.fatigue_score() >= critical_threshold_) {
alert.set_level(Alert::CRITICAL);
alert.set_type("FATIGUE_CRITICAL");
alert.set_message("严重疲劳!请立即停车休息!");
} else if (result.fatigue_score() >= warning_threshold_) {
alert.set_level(Alert::WARNING);
alert.set_type("FATIGUE_WARNING");
alert.set_message("检测到疲劳,请注意休息");
}

last_alert_time_ = current_time;

LOG(INFO) << "Alert triggered: " << alert.type() << " - " << alert.message();
}
}

cc->Outputs().Tag("ALERT").AddPacket(
MakePacket<Alert>(alert).At(cc->InputTimestamp()));

return absl::OkStatus();
}

private:
float warning_threshold_ = 0.5f;
float critical_threshold_ = 0.8f;
int confirmation_frames_ = 5;
int cooldown_ms_ = 30000; // 30秒

int consecutive_fatigue_frames_ = 0;
uint64 last_alert_time_ = 0;
};

REGISTER_CALCULATOR(AlertManagerCalculator);

} // namespace mediapipe

三十、总结

要点 说明
Calculator 级别 单节点错误处理,重试/忽略/默认值
Graph 级别 降级策略,Gate 选择备用路径
超时机制 异步执行 + 超时控制
IMS 实战 多层验证 + 连续帧确认 + 告警管理

下篇预告

MediaPipe 系列 26:Face Detection——BlazeFace 架构解析

深入讲解 BlazeFace 轻量级人脸检测模型的设计原理、Anchor 机制、IMS DMS 集成实战。


参考资料

  1. MediaPipe. Error Handling
  2. Google AI Edge. Calculator Development

系列进度: 25/55
更新时间: 2026-03-12


MediaPipe 系列 25:错误处理 Calculator——异常恢复机制完整指南
https://dapalm.com/2026/03/13/MediaPipe系列25-错误处理Calculator:异常恢复/
作者
Mars
发布于
2026年3月13日
许可协议