MediaPipe 系列 02:Graph 与 Calculator——核心抽象详解完整指南

前言:为什么需要理解 Graph 和 Calculator?

2.1 MediaPipe 的核心抽象

MediaPipe 的本质:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
┌─────────────────────────────────────────────────────────────┐
│ MediaPipe 核心抽象 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Graph:感知流水线架构 │ │
│ │ │ │
│ │ 定义数据如何流动、如何处理、如何输出 │ │
│ │ │ │
│ │ ┌───────────────────────────────────┐ │ │
│ │ │ │ │ │
│ │ │ node { │ │ │
│ │ │ calculator: "FaceMesh" │ │ │
│ │ │ input_stream: "video" │ │ │
│ │ │ output_stream: "landmarks" │ │ │
│ │ │ } │ │ │
│ │ │ │ │ │
│ │ └───────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌───────────────────────────────────┐ │ │
│ │ │ node { │ │ │
│ │ │ calculator: "GazeEst" │ │ │
│ │ │ input_stream: "landmarks" │ │ │
│ │ │ output_stream: "gaze" │ │ │
│ │ │ } │ │ │
│ │ └───────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Calculator:计算单元 │ │
│ │ │ │
│ │ • 输入:Packet 时间序列 │ │
│ │ • 处理:模型推理、数据转换 │ │
│ │ • 输出:Packet 时间序列 │ │
│ │ • 生命周期:Open → Process → Close │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Stream:数据流通道 │ │
│ │ │ │
│ │ • Packet 的有序序列 │ │
│ │ • 自动时间同步 │ │
│ │ • 支持背压(Backpressure) │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Packet:数据包 │ │
│ │ │ │
│ │ • 携带时间戳 │ │
│ │ • 任意类型(图像、矩阵、结构体) │ │
│ │ • 零拷贝传递 │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

2.2 Graph 与 Calculator 的关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
┌─────────────────────────────────────────────────────────────┐
Graph 与 Calculator 的关系 │
├─────────────────────────────────────────────────────────────┤
│ │
Graph = 配置文件(pbtxt)+ 执行引擎 │
│ Calculator = C++ 代码实现 │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Graph(配置层) │ │
│ │ │ │
│ │ input_stream: "video" │ │
│ │ node { │ │
│ │ calculator: "FaceMesh" │ │
│ │ input_stream: "video" │ │
│ │ output_stream: "landmarks" │ │
│ │ } │ │
│ │ output_stream: "landmarks" │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Calculator(实现层) │ │
│ │ │ │
│ │ class FaceMeshCalculator { │ │
│ │ GetContract() // 定义端口 │ │
│ │ Open() // 初始化 │ │
│ │ Process() // 处理数据 │ │
│ │ Close() // 清理 │ │
│ │ }; │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 编译时:Graph 配置 → Calculator 实现 │
│ 运行时:Calculator 实例 → 处理数据 │
│ │
└─────────────────────────────────────────────────────────────┘

三、Graph:感知流水线配置

3.1 Graph 概念

Graph 是 MediaPipe 的核心架构概念:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
┌─────────────────────────────────────────────────────────────┐
Graph:有向无环图 │
├─────────────────────────────────────────────────────────────┤
│ │
Input ──▶ Calculator A ──▶ Calculator B ──▶ Output
│ │ │
│ └────▶ Calculator C ──▶ Output
│ │ │
│ └────▶ Calculator D ──▶ Calculator E ──▶ Output
│ │
│ 特征: │
1. 有向:数据单向流动 │
2. 无环:不会循环引用 │
3. 模块化:每个 Calculator 是独立单元 │
4. 可组合:任意复杂度 │
│ │
└─────────────────────────────────────────────────────────────┘

3.2 pbtxt 语法详解

Graph 配置文件使用 Protobuf 文本格式(.pbtxt):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# ========== 基本语法 ==========
# 注释:# 或 // 开头
# 空行:忽略
# 缩进:2 空格或 4 空格

# ========== 输入流定义 ==========
input_stream: "input_video"
input_stream: "input_audio"
input_stream: "input_sensor"

# ========== 输出流定义 ==========
output_stream: "output_video"
output_stream: "output_detections"
output_stream: "output_audio"

# ========== 静态输入(Side Packet)==========
input_side_packet: "model_path"
input_side_packet: "config_file"
input_side_packet: "lookup_table"

# ========== Calculator 节点 ==========
node {
calculator: "FlowLimiterCalculator"
input_stream: "input_video"
input_stream: "detections"
output_stream: "throttled_video"
options {
[mediapipe.FlowLimiterCalculatorOptions.ext] {
max_in_flight: 1
max_in_queue: 1
}
}
}

# ========== 线程池配置(可选)==========
executor {
name: "gpu_executor"
type: "ThreadPool"
num_threads: 4
}

# ========== 延迟配置(可选)==========
delay_input_packets {
seconds: 0.1
}

3.3 pbtxt 语法示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# ========== 输入输出 ==========
input_stream: "IMAGE:input_frame" # 格式:input_stream: "TAG:name"
input_stream: "DETECTIONS:faces"
output_stream: "IMAGE:output_frame"
output_stream: "DETECTIONS:faces"

# ========== Calculator 节点 ==========
node {
calculator: "FaceDetectionCalculator"
input_stream: "IMAGE:input_frame"
input_stream: "DETECTIONS:faces"
output_stream: "IMAGE:output_frame"
output_stream: "DETECTIONS:faces"
options {
[mediapipe.FaceDetectionOptions.ext] {
model_path: "/models/face_detection.tflite"
confidence_threshold: 0.7
max_num_detections: 10
}
}
}

# ========== Stream 标签 ==========
# Tag 用于区分多个输入输出
node {
calculator: "MultiInputCalculator"
input_stream: "IMAGE:video" # Tag: IMAGE
input_stream: "AUDIO:audio" # Tag: AUDIO
input_stream: "SENSOR:accel" # Tag: SENSOR
output_stream: "OUTPUT:combined" # Tag: OUTPUT
}

# ========== Options 扩展 ==========
# Calculator 可以扩展 CalculatorOptions
node {
calculator: "MyCalculator"
input_stream: "input"
output_stream: "output"
options {
[mediapipe.MyCalculatorOptions.ext] {
param1: "value1"
param2: 123
param3: true
repeated_param: "a"
repeated_param: "b"
}
}
}

3.4 常见 Graph 结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# ========== 简单流水线 ==========
input_stream: "input"
output_stream: "output"

node {
calculator: "PassThroughCalculator"
input_stream: "input"
output_stream: "output"
}

# ========== 条件分支 ==========
input_stream: "input"
output_stream: "output"

node {
calculator: "GateCalculator"
input_stream: "input"
output_stream: "output"
options {
[mediapipe.GateCalculatorOptions.ext] {
condition: "input.size() > 10"
}
}
}

# ========== 多路聚合 ==========
input_stream: "input_a"
input_stream: "input_b"
input_stream: "input_c"
output_stream: "output"

node {
calculator: "MergeCalculator"
input_stream: "input_a"
input_stream: "input_b"
input_stream: "input_c"
output_stream: "output"
}

# ========== 时序处理 ==========
input_stream: "input"
output_stream: "output"

node {
calculator: "SlidingWindowCalculator"
input_stream: "input"
output_stream: "window_output"
options {
[mediapipe.SlidingWindowOptions.ext] {
window_size: 30 # 30 帧窗口
step_size: 1 # 每帧步进
}
}
}

四、Calculator:计算节点详解

4.1 Calculator 生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
┌─────────────────────────────────────────────────────────────┐
│ Calculator 生命周期 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Graph 启动 │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ GetContract │ ← 只调用一次(静态验证) │
│ │ │ 定义输入输出端口类型、数量 │
│ │ 返回值: │ - cc->Inputs().Tag("NAME").Set<T>() │
│ │ absl::OkStatus│ - cc->Outputs().Tag("NAME").Set<T>() │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Open │ ← 每个实例调用一次(初始化) │
│ │ │ - 读取 Options │
│ │ 返回值: │ - 分配内存、加载模型 │
│ │ absl::OkStatus│ - 初始化资源 │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Process │ ← 每个输入 Packet 调用一次(处理) │
│ │ │ - 检查输入是否可用 │
│ │ 返回值: │ - 获取输入数据 │
│ │ absl::OkStatus│ - 执行计算逻辑 │
│ │ │ - 输出结果 │
│ │ 调用次数: │ - cc->Inputs().Tag(...).Get<T>() │
│ │ N 次 │ - cc->Outputs().Tag(...).AddPacket(...) │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Close │ ← Graph 关闭时调用一次(清理) │
│ │ │ - 释放模型、内存 │
│ │ 返回值: │ - 关闭文件、连接 │
│ │ absl::OkStatus│ - 清理资源 │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ Graph 关闭 │
│ │
└─────────────────────────────────────────────────────────────┘

4.2 Calculator 接口详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// mediapipe/framework/calculator_framework.h
class CalculatorBase {
public:
virtual ~CalculatorBase() = default;

// ========== GetContract:定义接口 ==========
// 在 Graph 初始化时调用,验证配置正确性
// 只调用一次,每个 Calculator 实例调用一次
static absl::Status GetContract(CalculatorContract* cc);

// ========== Open:初始化 ==========
// 在 Graph 启动时调用一次
// 返回值:absl::OkStatus() 成功,其他值失败
// 常用操作:
// - 读取 Options:cc->Options<MyOptions>()
// - 分配内存:new T()
// - 加载模型:TfLiteModel::Load()
// - 初始化资源:cv::Mat(), std::vector<>()
virtual absl::Status Open(CalculatorContext* cc);

// ========== Process:处理数据 ==========
// 每个输入 Packet 调用一次
// 返回值:absl::OkStatus() 成功,其他值失败
// 常用操作:
// - 检查输入:cc->Inputs().Tag("NAME").IsEmpty()
// - 获取输入:cc->Inputs().Tag("NAME").Get<T>()
// - 输出结果:cc->Outputs().Tag("NAME").AddPacket(...)
virtual absl::Status Process(CalculatorContext* cc) = 0;

// ========== Close:清理 ==========
// 在 Graph 关闭时调用一次
// 返回值:absl::OkStatus()
// 常用操作:
// - 释放模型:model_->Close()
// - 释放内存:delete ptr
// - 关闭文件:file_.close()
virtual absl::Status Close(CalculatorContext* cc);
};

4.3 Calculator 开发完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// my_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_MY_MY_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_MY_MY_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/calculators/my/my_calculator_options.pb.h"

namespace mediapipe {

// 自定义 Calculator
class MyCalculator : public CalculatorBase {
public:
// ========== GetContract:定义输入输出 ==========
static absl::Status GetContract(CalculatorContract* cc) {
// 定义输入端口
// Tag("NAME").Set<T>():输入名为 "NAME",类型为 T
cc->Inputs().Tag("INPUT").Set<cv::Mat>();
cc->Inputs().Tag("CONFIG").Set<std::string>();

// 定义输出端口
cc->Outputs().Tag("OUTPUT").Set<std::vector<Detection>>();
cc->Outputs().Tag("METRICS").Set<MyMetrics>();

// 声明使用 Options
cc->Options<MyCalculatorOptions>();

// 定义 Side Packet(可选)
// cc->InputSidePackets().Tag("MODEL").Set<TfLiteModelPtr>();

return absl::OkStatus();
}

// ========== Open:初始化 ==========
absl::Status Open(CalculatorContext* cc) override {
// 读取 Options
const auto& options = cc->Options<MyCalculatorOptions>();

threshold_ = options.threshold();
max_detections_ = options.max_detections();
enable_logging_ = options.enable_logging();

// 从 Side Packet 读取模型
// if (!cc->InputSidePackets().Tag("MODEL").IsEmpty()) {
// model_ = cc->InputSidePackets().Tag("MODEL").Get<TfLiteModelPtr>();
// }

// 分配内存
detections_buffer_.resize(max_detections_);

// 初始化其他资源
cv::namedWindow("Debug Window", cv::WINDOW_NORMAL);

LOG(INFO) << "MyCalculator initialized: threshold=" << threshold_
<< ", max_detections=" << max_detections_;

return absl::OkStatus();
}

// ========== Process:处理数据 ==========
absl::Status Process(CalculatorContext* cc) override {
// ========== 1. 检查输入是否可用 ==========
if (cc->Inputs().Tag("INPUT").IsEmpty()) {
return absl::OkStatus(); // 输入为空,跳过
}

// ========== 2. 获取输入数据 ==========
const cv::Mat& input = cc->Inputs().Tag("INPUT").Get<cv::Mat>();

// 检查输入有效性
if (input.empty()) {
LOG(WARNING) << "Empty input frame received";
return absl::OkStatus();
}

// ========== 3. 执行计算 ==========
auto start_time = std::chrono::high_resolution_clock::now();

std::vector<Detection> detections = DetectObjects(input);

auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time).count();

// ========== 4. 输出结果 ==========
// 输出检测结果
cc->Outputs().Tag("OUTPUT").AddPacket(
MakePacket<std::vector<Detection>>(detections)
.At(cc->InputTimestamp()));

// 输出性能指标
MyMetrics metrics;
metrics.process_time_ms = duration;
metrics.detection_count = detections.size();
metrics.input_size = input.size();

cc->Outputs().Tag("METRICS").AddPacket(
MakePacket<MyMetrics>(metrics).At(cc->InputTimestamp()));

// ========== 5. 日志(可选)==========
if (enable_logging_) {
LOG(INFO) << "Processed frame at timestamp=" << cc->InputTimestamp()
<< ", detections=" << detections.size()
<< ", time=" << duration << "ms";
}

// 更新统计
process_count_++;

return absl::OkStatus();
}

// ========== Close:清理 ==========
absl::Status Close(CalculatorContext* cc) override {
// 释放资源
cv::destroyWindow("Debug Window");

LOG(INFO) << "MyCalculator closed after processing " << process_count_
<< " frames";

return absl::OkStatus();
}

private:
// ========== 配置参数 ==========
float threshold_ = 0.5f;
int max_detections_ = 10;
bool enable_logging_ = false;

// ========== 运行时状态 ==========
int process_count_ = 0;
std::vector<Detection> detections_buffer_;

// ========== 模型指针 ==========
// TfLiteModelPtr model_;

// ========== 检测函数 ==========
std::vector<Detection> DetectObjects(const cv::Mat& frame) {
// TODO: 实现检测逻辑
// 1. 预处理
// 2. 模型推理
// 3. 后处理
// 4. 返回检测结果

std::vector<Detection> detections;
// detections.push_back({...});
return detections;
}
};

// ========== 注册 Calculator ==========
// 必须在 .cc 文件末尾注册
REGISTER_CALCULATOR(MyCalculator);

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_MY_MY_CALCULATOR_H_

五、Stream:数据流机制

5.1 Stream 概念

Stream 是连接 Calculator 的数据通道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
┌─────────────────────────────────────────────────────────────┐
Stream:数据流 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Calculator A ──┬── Stream ──▶ Calculator B │
│ │ │
│ ├── Stream ──▶ Calculator C │
│ │ │
│ └── Stream ──▶ Calculator D │
│ │
│ 特征: │
1. 有序:Packet 按时间戳顺序排列 │
2. 同步:自动时间戳对齐 │
3. 背压:处理速度慢时自动阻塞输入 │
4. 零拷贝:共享数据所有权 │
│ │
└─────────────────────────────────────────────────────────────┘

5.2 Stream 标签(Tag)详解

使用 Tag 区分多个输入输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# ========== 多输入示例 ==========
node {
calculator: "MultiInputCalculator"
input_stream: "IMAGE:video" # Tag: IMAGE
input_stream: "AUDIO:audio" # Tag: AUDIO
input_stream: "SENSOR:accel" # Tag: SENSOR
input_stream: "SENSOR:gyro" # Tag: GYRO
output_stream: "OUTPUT:combined" # Tag: OUTPUT
output_stream: "OUTPUT:separate" # Tag: SEPARATE
}

# ========== 多输出示例 ==========
node {
calculator: "MultiOutputCalculator"
input_stream: "input"
output_stream: "DETECTIONS:faces" # Tag: DETECTIONS
output_stream: "LANDMARKS:landmarks" # Tag: LANDMARKS
output_stream: "POSE:pose" # Tag: POSE
}

5.3 C++ 中访问 Stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// ========== 获取输入 ==========
// 方法 1:使用 Tag
const cv::Mat& image = cc->Inputs().Tag("IMAGE").Get<cv::Mat>();

// 方法 2:使用 Index
const cv::Mat& image = cc->Inputs().Index(0).Get<cv::Mat>();

// 检查输入是否为空
if (cc->Inputs().Tag("IMAGE").IsEmpty()) {
return absl::OkStatus();
}

// ========== 输出结果 ==========
// 方法 1:使用 Tag
cc->Outputs().Tag("OUTPUT").AddPacket(
MakePacket<std::vector<Detection>>(detections)
.At(cc->InputTimestamp()));

// 方法 2:使用 Index
cc->Outputs().Index(0).AddPacket(
MakePacket<std::vector<Detection>>(detections)
.At(cc->InputTimestamp()));

// ========== 添加多个输出 ==========
cc->Outputs().Tag("DETECTIONS").AddPacket(detection_packet);
cc->Outputs().Tag("CONFIDENCES").AddPacket(confidence_packet);
cc->Outputs().Tag("BBOXES").AddPacket(bbox_packet);

5.4 Stream 传递示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// ========== 多输入 Calculator ==========
class MultiInputCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
// 多个输入
cc->Inputs().Tag("IMAGE").Set<cv::Mat>();
cc->Inputs().Tag("AUDIO").Set<std::vector<float>>();
cc->Inputs().Tag("SENSOR").Set<SensorData>();

// 多个输出
cc->Outputs().Tag("OUTPUT_A").Set<std::string>();
cc->Outputs().Tag("OUTPUT_B").Set<int>();

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// 获取所有输入
if (!cc->Inputs().Tag("IMAGE").IsEmpty()) {
const cv::Mat& image = cc->Inputs().Tag("IMAGE").Get<cv::Mat>();
// 处理图像
}

if (!cc->Inputs().Tag("AUDIO").IsEmpty()) {
const auto& audio = cc->Inputs().Tag("AUDIO").Get<std::vector<float>>();
// 处理音频
}

// 输出所有结果
cc->Outputs().Tag("OUTPUT_A").AddPacket(
MakePacket<std::string>("result").At(cc->InputTimestamp()));

cc->Outputs().Tag("OUTPUT_B").AddPacket(
MakePacket<int>(42).At(cc->InputTimestamp()));

return absl::OkStatus();
}
};

REGISTER_CALCULATOR(MultiInputCalculator);

六、Packet:数据包详解

6.1 Packet 概念

Packet 是 Stream 传输的数据单元:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
┌─────────────────────────────────────────────────────────────┐
│ Packet:数据包 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Packet 结构 │ │
│ │ │ │
│ │ ┌───────────────────────────────────┐ │ │
│ │ │ Payload (数据负载) │ │ │
│ │ │ • 任意 C++ 类型 │ │ │
│ │ │ • std::shared_ptr 共享所有权 │ │ │
│ │ │ • 零拷贝传递 │ │ │
│ │ └───────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌───────────────────────────────────┐ │ │
│ │ │ Timestamp (时间戳) │ │ │
│ │ │ • 微秒精度(int64_t) │ │ │
│ │ │ • 单调递增 │ │ │
│ │ │ • 用于时间同步 │ │ │
│ │ └───────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 示例: │
│ ┌─────────────────────────────────────────────┐ │
│ │ Packet<std::string> │ │
│ │ • payload: "hello" │ │
│ │ • timestamp: 1000000 (1 秒) │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

6.2 Packet 创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// ========== 创建带时间戳的 Packet ==========
auto packet1 = MakePacket<std::string>("hello").At(Timestamp(1000000));
// • payload: "hello"
// • timestamp: 1000000 (1 秒)

// ========== 从已有值创建 ==========
std::string value = "world";
auto packet2 = MakePacket<std::string>(value).At(cc->InputTimestamp());

// ========== 使用 Adopt(转移所有权)==========
auto packet3 = Adopt(new MyData()).At(Timestamp(0));
// • payload: new MyData()(所有权转移)
// • timestamp: 0

// ========== 创建空 Packet ==========
auto empty_packet = MakePacket<int>().At(Timestamp(0));
// • payload: nullptr

6.3 Packet 访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// ========== 获取数据 ==========
// 方法 1:使用 Get() 模板方法
const std::string& data = packet.Get<std::string>();

// 方法 2:使用 ValidateAsType() 检查类型
if (packet.ValidateAsType<std::string>().ok()) {
const std::string& data = packet.Get<std::string>();
}

// ========== 检查数据是否存在 ==========
if (!packet.IsEmpty()) {
// Packet 有数据
}

// ========== 获取时间戳 ==========
Timestamp ts = packet.Timestamp();
int64_t us = ts.Value(); // 微秒

// ========== 比较时间戳 ==========
Timestamp ts1 = Timestamp(1000000);
Timestamp ts2 = Timestamp(2000000);

if (ts1.IsEarlierThan(ts2)) {
LOG(INFO) << "ts1 < ts2";
}

// ========== 时间戳加减 ==========
Timestamp ts3 = ts1.Plus(500000); // +0.5 秒
Timestamp ts4 = ts2.Minus(1000000); // -1 秒

6.4 Packet 常见操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// ========== 输出 Packet ==========
// 输出检测框
cc->Outputs().Tag("DETECTIONS").AddPacket(
MakePacket<std::vector<BoundingBox>>(bbox_list)
.At(cc->InputTimestamp()));

// 输出图像
cc->Outputs().Tag("IMAGE").AddPacket(
MakePacket<cv::Mat>(image).At(cc->InputTimestamp()));

// 输出元数据
cc->Outputs().Tag("METRICS").AddPacket(
MakePacket<PerformanceMetrics>(metrics)
.At(cc->InputTimestamp()));

// ========== 多个 Packet 输出 ==========
cc->Outputs().Tag("DETECTIONS").AddPacket(detection_packet);
cc->Outputs().Tag("CONFIDENCES").AddPacket(confidence_packet);
cc->Outputs().Tag("BBOXES").AddPacket(bbox_packet);

// ========== 输出多个实例 ==========
for (const auto& detection : detections) {
cc->Outputs().Tag("DETECTIONS").AddPacket(
MakePacket<Detection>(detection).At(cc->InputTimestamp()));
}

// ========== 输出到多个 Stream ==========
cc->Outputs().Tag("OUTPUT_A").AddPacket(packet1);
cc->Outputs().Tag("OUTPUT_B").AddPacket(packet2);

七、Graph 执行流程

7.1 完整执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
┌─────────────────────────────────────────────────────────────┐
│ Graph 执行流程 │
├─────────────────────────────────────────────────────────────┤
│ │
1. Graph 初始化 │
│ └─ 解析 pbtxt 配置文件 │
│ └─ 创建 Calculator 实例 │
│ └─ 调用 GetContract() 验证端口 │
│ │
2. Graph 启动 │
│ └─ 调用 Open() 初始化每个 Calculator │
│ └─ 启动 Executor 线程池 │
│ └─ 准备接收输入 Packet │
│ │
3. 接收输入 Packet │
│ └─ AddPacketToInputStream("input", packet) │
│ └─ MediaPipe 自动路由到对应 Calculator │
│ │
4. Calculator 处理 │
│ └─ 调用 Process() │
│ └─ 获取输入 Packet │
│ └─ 执行计算逻辑 │
│ └─ 输出结果 Packet │
│ └─ MediaPipe 自动路由到下游 Calculator │
│ └─ 重复步骤 3-4
│ │
5. 关闭输入 Stream │
│ └─ CloseInputStream("input") │
│ └─ 等待所有 Calculator 处理完成 │
│ │
6. 关闭 Graph │
│ └─ 调用 Close() 清理资源 │
│ └─ 释放所有 Calculator 实例 │
│ └─ 停止 Executor 线程池 │
│ │
└─────────────────────────────────────────────────────────────┘

7.2 C++ 运行 Graph 完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// graph_runner.cpp
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/port/parse_text_proto.h"
#include "mediapipe/framework/port/status.h"
#include <opencv2/opencv.hpp>
#include <iostream>

// ========== 定义 Options ==========
message MyCalculatorOptions {
extend mediapipe.CalculatorOptions {
optional MyCalculatorOptions ext = 1000000;
}
optional float threshold = 1 [default = 0.5];
}

// ========== 定义输入输出 ==========
input_stream: "input"
output_stream: "output"

node {
calculator: "PassThroughCalculator"
input_stream: "input"
output_stream: "output"
}

// ========== 主函数 ==========
int main() {
// 1. 解析 Graph 配置
mediapipe::CalculatorGraphConfig config =
mediapipe::ParseTextProtoOrDie<mediapipe::CalculatorGraphConfig>(R"(
input_stream: "input"
output_stream: "output"
node {
calculator: "PassThroughCalculator"
input_stream: "input"
output_stream: "output"
}
)");

// 2. 初始化 Graph
mediapipe::CalculatorGraph graph;
MP_RETURN_IF_ERROR(graph.Initialize(config));

// 3. 添加输出回调
MP_RETURN_IF_ERROR(graph.ObserveOutputStream(
"output",
[](const mediapipe::Packet& packet) {
// 收到输出 Packet
// • packet.Timestamp(): 时间戳
// • packet.Get<T>(): 获取数据
LOG(INFO) << "Received output at timestamp: " << packet.Timestamp();
return absl::OkStatus();
}));

// 4. 启动 Graph
MP_RETURN_IF_ERROR(graph.StartRun({}));

// 5. 发送输入数据
for (int i = 0; i < 10; ++i) {
// 创建输入 Packet
auto packet = MakePacket<int>(i * 10).At(mediapipe::Timestamp(i * 1000000));

// 添加到输入 Stream
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
"input",
packet));

// 等待处理完成(可选)
// std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

// 6. 关闭输入 Stream
MP_RETURN_IF_ERROR(graph.CloseInputStream("input"));

// 7. 等待 Graph 完成
MP_RETURN_IF_ERROR(graph.WaitUntilDone());

// 8. 清理
LOG(INFO) << "Graph execution completed successfully";

return 0;
}

八、常见错误与调试

8.1 输入为空错误

1
2
3
4
5
6
7
8
9
// ❌ 错误:直接访问可能为空的输入
const cv::Mat& image = cc->Inputs().Tag("IMAGE").Get<cv::Mat>();

// ✅ 正确:先检查
if (cc->Inputs().Tag("IMAGE").IsEmpty()) {
LOG(WARNING) << "Input is empty, skipping";
return absl::OkStatus();
}
const cv::Mat& image = cc->Inputs().Tag("IMAGE").Get<cv::Mat>();

8.2 时间戳错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// ❌ 错误:时间戳不递增
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
"input",
MakePacket<int>(1).At(Timestamp(1000000)))); // t=1s

MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
"input",
MakePacket<int>(2).At(Timestamp(500000)))); // t=0.5s ❌

// ✅ 正确:时间戳必须递增
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
"input",
MakePacket<int>(1).At(Timestamp(1000000)))); // t=1s

MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
"input",
MakePacket<int>(2).At(Timestamp(2000000)))); // t=2s ✅

8.3 忘记注册 Calculator

1
2
3
4
5
6
7
8
9
10
11
// ❌ 错误:没有注册
class MyCalculator : public CalculatorBase {
// ...
};

// ✅ 正确:在 .cc 文件末尾注册
class MyCalculator : public CalculatorBase {
// ...
};

REGISTER_CALCULATOR(MyCalculator); // 必须在 .cc 文件中

8.4 Options 类型错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// pbtxt 中配置错误
node {
calculator: "MyCalculator"
input_stream: "input"
output_stream: "output"
options {
[mediapipe.MyCalculatorOptions.ext] {
threshold: 0.5
// 缺少 required 字段
}
}
}

// C++ 中定义 Options
message MyCalculatorOptions {
extend CalculatorOptions {
optional MyCalculatorOptions ext = 1000000;
}
required float threshold = 1; // required 字段
optional string name = 2;
}

8.5 调试技巧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// ========== 添加日志 ==========
LOG(INFO) << "Processing frame at timestamp: " << cc->InputTimestamp();
LOG(WARNING) << "Empty input detected";
LOG(ERROR) << "Calculation failed";

// ========== 调试输出 ==========
cv::imshow("Debug", image);
cv::waitKey(1);

// ========== 统计信息 ==========
static int process_count = 0;
process_count++;
LOG(INFO) << "Processed " << process_count << " frames";

// ========== 性能分析 ==========
auto start = std::chrono::high_resolution_clock::now();
// ... 计算逻辑 ...
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
end - start).count();
LOG(INFO) << "Process time: " << duration << "us";

九、实战:完整 Graph 示例

9.1 pbtxt 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# dms_graph.pbtxt
# IMS DMS 完整流水线配置

# ========== 输入输出 ==========
input_stream: "IR_IMAGE:ir_frame"
input_stream: "VEHICLE_SPEED:speed"
input_stream: "TIMESTAMP:timestamp"
output_stream: "DMS_RESULT:dms_output"
output_stream: "ALERT:alert"

# ========== 1. 人脸检测 ==========
node {
calculator: "FaceDetectionCalculator"
input_stream: "IMAGE:ir_frame"
output_stream: "FACE_DETECTIONS:faces"
options {
[mediapipe.FaceDetectionOptions.ext] {
model_path: "/models/face_detection.tflite"
confidence_threshold: 0.7
max_num_detections: 10
}
}
}

# ========== 2. 人脸关键点 ==========
node {
calculator: "FaceMeshCalculator"
input_stream: "IMAGE:ir_frame"
input_stream: "FACE_DETECTIONS:faces"
output_stream: "FACE_LANDMARKS:landmarks"
options {
[mediapipe.FaceMeshOptions.ext] {
model_path: "/models/face_landmark.tflite"
num_faces: 1
enable_iris: true
}
}
}

# ========== 3. EAR 计算 ==========
node {
calculator: "ARCalculator"
input_stream: "FACE_LANDMARKS:landmarks"
output_stream: "EAR:left_ear"
output_stream: "EAR:right_ear"
output_stream: "EAR:avg_ear"
options {
[mediapipe.ARCalculatorOptions.ext] {
ear_threshold: 0.2
}
}
}

# ========== 4. PERCLOS 计算 ==========
node {
calculator: "PERCLOSCalculator"
input_stream: "EAR:avg_ear"
input_stream: "TIMESTAMP:timestamp"
output_stream: "PERCLOS:perclos"
output_stream: "PERCLOS_WINDOW:window_stats"
options {
[mediapipe.PERCLOSOptions.ext] {
window_seconds: 30
ear_threshold: 0.2
}
}
}

# ========== 5. 头部姿态估计 ==========
node {
calculator: "HeadPoseCalculator"
input_stream: "FACE_LANDMARKS:landmarks"
output_stream: "HEAD_POSE:head_pose"
options {
[mediapipe.HeadPoseOptions.ext] {
model_path: "/models/head_pose.tflite"
}
}
}

# ========== 6. 疲劳评分计算 ==========
node {
calculator: "FatigueScoreCalculator"
input_stream: "EAR:avg_ear"
input_stream: "PERCLOS:perclos"
input_stream: "HEAD_POSE:head_pose"
input_stream: "VEHICLE_SPEED:speed"
output_stream: "FATIGUE_SCORE:fatigue_score"
output_stream: "FATIGUE_LEVEL:fatigue_level"
options {
[mediapipe.FatigueOptions.ext] {
perclos_weight: 0.5
ear_weight: 0.3
head_pose_weight: 0.2
perclos_low: 15.0
perclos_high: 30.0
ear_low: 0.2
ear_high: 0.25
speed_factor_enabled: true
speed_threshold: 80.0
}
}
}

# ========== 7. 告警触发 ==========
node {
calculator: "FatigueAlertCalculator"
input_stream: "FATIGUE_LEVEL:fatigue_level"
input_stream: "TIMESTAMP:timestamp"
output_stream: "ALERT:alert"
options {
[mediapipe.FatigueAlertOptions.ext] {
level_1_threshold: 1
level_2_threshold: 2
level_3_threshold: 3
alert_cooldown_seconds: 30
}
}
}

# ========== 线程池配置 ==========
executor {
name: "cpu_executor"
type: "ThreadPool"
num_threads: 4
}

十、总结

概念 说明 关键点
Graph 感知流水线 pbtxt 配置、DAG 结构
Calculator 计算节点 Open/Process/Close 生命周期
Stream 数据流 Packet 序列、时间戳同步
Packet 数据包 Timestamp + Payload、零拷贝

下篇预告

MediaPipe 系列 03:Packet 与 Stream——时间序列数据的流动

深入讲解 Packet 传递机制、时间戳对齐、背压控制。


参考资料

  1. Google AI Edge. MediaPipe Framework Documentation
  2. Lugaresi et al. (2019). MediaPipe: A Framework for Building Perception Pipelines. arXiv:1906.08172
  3. LearnOpenCV. MediaPipe: The Ultimate Guide to Video Processing

系列进度: 2/55
更新时间: 2026-03-12


MediaPipe 系列 02:Graph 与 Calculator——核心抽象详解完整指南
https://dapalm.com/2026/03/12/MediaPipe系列02-Graph与Calculator:核心抽象详解/
作者
Mars
发布于
2026年3月12日
许可协议