MediaPipe 系列 18:滑动窗口 Calculator——时序数据处理完整指南

前言:为什么需要滑动窗口?

18.1 时序数据处理的重要性

疲劳检测、行为识别的核心是时序数据分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
┌─────────────────────────────────────────────────────────────────────────┐
│ 时序数据处理的重要性 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 问题:单帧数据无法判断疲劳或行为? │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ IMS DMS 场景: │ │
│ │ │ │
│ │ 单帧信息: │ │
│ │ • 眼睛闭合(1 帧)→ 无法判断是眨眼还是疲劳 │ │
│ │ • 张嘴(1 帧)→ 无法判断是说话还是打哈欠 │ │
│ │ • 头部倾斜(1 帧)→ 无法判断是看路边还是疲劳 │ │
│ │ │ │
│ │ 需要历史数据: │ │
│ │ • 30 帧内眼睛闭合 15 次 → 疲劳 │ │
│ │ • 2 秒内持续张嘴 → 打哈欠 │ │
│ │ • 50 帧内频繁晃头 → 分心 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 解决方案:滑动窗口 Calculator │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ • 缓存历史数据(1-2 秒) │ │
│ │ • 计算时序特征 │ │
│ │ • 检测时序模式 │ │
│ │ • 判断疲劳/分心状态 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 滑动窗口示例: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 输入流: [1][2][3][4][5][6][7][8][9][10] → │ │
│ │ │ │
│ │ 窗口大小 = 3: │ │
│ │ ┌─────────────────────────────────────┐ │ │
│ │ │ [1] │ │ │
│ │ │ [1][2] │ │ │
│ │ │ [1][2][3] → 输出 │ │ │
│ │ │ [2][3][4] → 输出 │ │ │
│ │ │ [3][4][5] → 输出 │ │ │
│ │ └─────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

18.2 滑动窗口应用场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
┌─────────────────────────────────────────────────────────────┐
│ 滑动窗口应用场景 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 疲劳检测 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 窗口大小:30 帧(1 秒) │ │
│ │ 特征:PERCLOS、眨眼频率 │ │
│ │ 输出:疲劳等级(0-3) │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 2. 打哈欠检测 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 窗口大小:60 帧(2 秒) │ │
│ │ 特征:张嘴持续帧数 │ │
│ │ 输出:是否打哈欠 │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 3. 头部晃动检测 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 窗口大小:50 帧(1.67 秒) │ │
│ │ 特征:yaw/pitch 变化频率 │ │
│ │ 输出:是否分心 │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 4. 视线转移检测 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 窗口大小:20 帧(0.67 秒) │ │
│ │ 特征:视线落点分布 │ │
│ │ 输出:是否分心 │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 5. 平滑滤波 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 窗口大小:5-10 帧 │ │
│ │ 特征:加权平均 │ │
│ │ 输出:平滑后的值 │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

十九、滑动窗口分类

19.1 窗口类型对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
┌─────────────────────────────────────────────────────────────┐
│ 滑动窗口类型对比 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 固定大小窗口 │
│ ┌─────────────────────────────────────────────┐ │
│ │ • 保持 N 个元素 │ │
│ │ • 适合固定帧率 │ │
│ │ • 实现:std::deque │ │
│ │ │ │
│ │ 示例:30 帧窗口 │ │
│ │ [1][2]...[30][2][3]...[31] │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 2. 时间窗口 │
│ ┌─────────────────────────────────────────────┐ │
│ │ • 保持 T 时间内的数据 │ │
│ │ • 适合可变帧率 │ │
│ │ • 实现基于时间戳过滤 │ │
│ │ │ │
│ │ 示例:1 秒窗口 │ │
│ │ [t=0][t=0.03]...[t=1.0] │ │
│ │ [t=0.03]...[t=1.03] │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 3. 环形缓冲区窗口 │
│ ┌─────────────────────────────────────────────┐ │
│ │ • 高性能实现 │ │
│ │ • 避免内存分配 │ │
│ │ • 适合高频数据处理 │ │
│ │ │ │
│ │ 示例:固定大小 30 │ │
│ │ 0→1→2...→29→0→1→... │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 4. 自适应窗口 │
│ ┌─────────────────────────────────────────────┐ │
│ │ • 窗口大小动态调整 │ │
│ │ • 根据事件变化 │ │
│ │ • 复杂度高 │ │
│ │ │ │
│ │ 示例:检测到疲劳时扩大窗口 │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

二十、固定大小滑动窗口

20.1 基本实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// fixed_window_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_TEMPORAL_FIXED_WINDOW_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_TEMPORAL_FIXED_WINDOW_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include <deque>

namespace mediapipe {

// ========== Proto Options ==========
/*
syntax = "proto3";
package mediapipe;

message FixedWindowOptions {
optional int32 window_size = 1 [default = 10];
optional bool emit_only_if_full = 2 [default = false];
optional bool output_as_vector = 3 [default = false];
}
*/

// ========== 固定大小滑动窗口 Calculator ==========
template <typename T>
class FixedWindowCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).Set<T>();
cc->Outputs().Index(0).Set<std::deque<T>>();
cc->Options<FixedWindowOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<FixedWindowOptions>();

window_size_ = options.window_size();
emit_only_if_full_ = options.emit_only_if_full();
output_as_vector_ = options.output_as_vector();

// 预分配内存
buffer_.reserve(window_size_);

LOG(INFO) << "FixedWindowCalculator initialized: "
<< "window_size=" << window_size_
<< ", emit_only_if_full=" << emit_only_if_full_;

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
if (cc->Inputs().Index(0).IsEmpty()) {
return absl::OkStatus();
}

// ========== 1. 获取输入数据 ==========
const T& data = cc->Inputs().Index(0).Get<T>();

// ========== 2. 添加到窗口 ==========
buffer_.push_back(data);

// ========== 3. 保持窗口大小 ==========
while (buffer_.size() > window_size_) {
buffer_.pop_front();
}

// ========== 4. 是否输出 ==========
if (emit_only_if_full_ && buffer_.size() < window_size_) {
return absl::OkStatus();
}

// ========== 5. 输出窗口数据 ==========
cc->Outputs().Index(0).AddPacket(
MakePacket<std::deque<T>>(buffer_).At(cc->InputTimestamp()));

return absl::OkStatus();
}

private:
std::deque<T> buffer_;
int window_size_ = 10;
bool emit_only_if_full_ = false;
bool output_as_vector_ = false;
};

REGISTER_CALCULATOR(FixedWindowCalculator);

// ========== 模板实例化 ==========
extern template class FixedWindowCalculator<float>;
extern template class FixedWindowCalculator<int>;
extern template class FixedWindowCalculator<cv::Mat>;
extern template class FixedWindowCalculator<EyeState>;

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_TEMPORAL_FIXED_WINDOW_CALCULATOR_H_

20.2 Graph 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 固定窗口 Graph 配置

# PERCLOS 计算
node {
calculator: "FixedWindowCalculator<EyeState>"
input_stream: "eye_state"
output_stream: "eye_state_window"
options {
[mediapipe.FixedWindowOptions.ext] {
window_size: 30
emit_only_if_full: true
}
}
}

# 平滑滤波
node {
calculator: "FixedWindowCalculator<float>"
input_stream: "fatigue_score"
output_stream: "smoothed_score"
options {
[mediapipe.FixedWindowOptions.ext] {
window_size: 5
emit_only_if_full: false
}
}
}

二十一、时间窗口

21.1 基于时间戳的窗口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// time_window_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_TEMPORAL_TIME_WINDOW_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_TEMPORAL_TIME_WINDOW_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/timestamp.h"
#include <deque>

namespace mediapipe {

// ========== Proto Options ==========
/*
syntax = "proto3";
package mediapipe;

message TimeWindowOptions {
optional int32 window_duration_ms = 1 [default = 1000];
optional int32 min_window_size = 2 [default = 1];
}
*/

// ========== 时间窗口 Calculator ==========
template <typename T>
class TimeWindowCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).Set<T>();
cc->Outputs().Index(0).Set<std::vector<TimedData<T>>>();
cc->Options<TimeWindowOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<TimeWindowOptions>();

window_duration_ms_ = options.window_duration_ms();
min_window_size_ = options.min_window_size();

LOG(INFO) << "TimeWindowCalculator initialized: "
<< "window_duration=" << window_duration_ms_ << "ms"
<< ", min_size=" << min_window_size_;

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
if (cc->Inputs().Index(0).IsEmpty()) {
return absl::OkStatus();
}

// ========== 1. 获取输入数据和时间戳 ==========
const T& data = cc->Inputs().Index(0).Get<T>();
Timestamp current_ts = cc->InputTimestamp();

// ========== 2. 添加到窗口 ==========
TimedData<T> timed_data;
timed_data.data = data;
timed_data.timestamp = current_ts;

buffer_.push_back(timed_data);

// ========== 3. 移除过期数据 ==========
int64_t current_time = current_ts.Value();
int64_t cutoff_time = current_time - window_duration_ms_ * 1000;

while (!buffer_.empty() &&
buffer_.front().timestamp.Value() < cutoff_time) {
buffer_.pop_front();
}

// ========== 4. 检查最小窗口大小 ==========
if (buffer_.size() < min_window_size_) {
return absl::OkStatus();
}

// ========== 5. 转换为 vector 输出 ==========
std::vector<TimedData<T>> output;
output.reserve(buffer_.size());

for (const auto& item : buffer_) {
output.push_back(item);
}

// ========== 6. 输出 ==========
cc->Outputs().Index(0).AddPacket(
MakePacket<std::vector<TimedData<T>>>(output).At(current_ts));

return absl::OkStatus();
}

private:
// 带时间戳的数据结构
struct TimedData {
T data;
Timestamp timestamp;
};

std::deque<TimedData> buffer_;
int window_duration_ms_ = 1000; // 1 秒窗口
int min_window_size_ = 1;
};

REGISTER_CALCULATOR(TimeWindowCalculator);

// ========== 模板实例化 ==========
extern template class TimeWindowCalculator<float>;
extern template class TimeWindowCalculator<int>;
extern template class TimeWindowCalculator<EyeState>;

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_TEMPORAL_TIME_WINDOW_CALCULATOR_H_

二十二、环形缓冲区

22.1 高性能实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// ring_buffer_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_TEMPORAL_RING_BUFFER_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_TEMPORAL_RING_BUFFER_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include <vector>

namespace mediapipe {

// ========== Proto Options ==========
/*
syntax = "proto3";
package mediapipe;

message RingBufferOptions {
optional int32 capacity = 1 [default = 10];
}
*/

// ========== 环形缓冲区类 ==========
template <typename T>
class RingBuffer {
public:
explicit RingBuffer(size_t capacity)
: capacity_(capacity), buffer_(capacity) {}

void Push(const T& data) {
buffer_[write_index_] = data;
write_index_ = (write_index_ + 1) % capacity_;
size_ = std::min(size_ + 1, capacity_);
}

const T& operator[](size_t i) const {
size_t index = (write_index_ - size_ + i) % capacity_;
return buffer_[index];
}

size_t Size() const { return size_; }
size_t Capacity() const { return capacity_; }

bool IsFull() const { return size_ == capacity_; }
bool IsEmpty() const { return size_ == 0; }

void Clear() {
write_index_ = 0;
size_ = 0;
}

private:
size_t capacity_;
std::vector<T> buffer_;
size_t write_index_ = 0;
size_t size_ = 0;
};

// ========== 环形缓冲区 Calculator ==========
template <typename T>
class RingBufferCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).Set<T>();
cc->Outputs().Index(0).Set<std::vector<T>>();
cc->Options<RingBufferOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<RingBufferOptions>();

capacity_ = options.capacity();
ring_buffer_ = std::make_unique<RingBuffer<T>>(capacity_);

LOG(INFO) << "RingBufferCalculator initialized: capacity=" << capacity_;

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
if (cc->Inputs().Index(0).IsEmpty()) {
return absl::OkStatus();
}

const T& data = cc->Inputs().Index(0).Get<T>();

// 添加到环形缓冲区
ring_buffer_->Push(data);

// 转换为 vector
std::vector<T> output;
output.reserve(ring_buffer_->Size());

for (size_t i = 0; i < ring_buffer_->Size(); ++i) {
output.push_back((*ring_buffer_)[i]);
}

cc->Outputs().Index(0).AddPacket(
MakePacket<std::vector<T>>(output).At(cc->InputTimestamp()));

return absl::OkStatus();
}

private:
std::unique_ptr<RingBuffer<T>> ring_buffer_;
size_t capacity_ = 10;
};

REGISTER_CALCULATOR(RingBufferCalculator);

// ========== 模板实例化 ==========
extern template class RingBufferCalculator<float>;
extern template class RingBufferCalculator<EyeState>;

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_TEMPORAL_RING_BUFFER_CALCULATOR_H_

二十三、时序特征统计

23.1 特征计算 Calculator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// temporal_stats_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_TEMPORAL_TEMPORAL_STATS_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_TEMPORAL_TEMPORAL_STATS_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include <numeric>
#include <cmath>
#include <algorithm>

namespace mediapipe {

// ========== 时序统计消息 ==========
message TemporalStats {
bool valid = 1;
int32 count = 2;
float mean = 3;
float stddev = 4;
float min = 5;
float max = 6;
float range = 7;
float variance = 8;
float trend = 9;
float frequency = 10; // 频率(过零率)
float zero_crossings = 11;
}

// ========== 时序统计 Calculator ==========
template <typename T>
class TemporalStatsCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).Set<std::vector<T>>();
cc->Outputs().Index(0).Set<TemporalStats>();
return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
if (cc->Inputs().Index(0).IsEmpty()) {
TemporalStats stats;
stats.set_valid(false);
cc->Outputs().Index(0).AddPacket(
MakePacket<TemporalStats>(stats).At(cc->InputTimestamp()));
return absl::OkStatus();
}

const auto& data = cc->Inputs().Index(0).Get<std::vector<T>>();

TemporalStats stats;

if (data.empty()) {
stats.set_valid(false);
} else {
// ========== 1. 均值 ==========
float sum = std::accumulate(data.begin(), data.end(), 0.0f);
float mean = sum / data.size();
stats.set_mean(mean);

// ========== 2. 标准差 ==========
float sq_sum = 0.0f;
for (const auto& v : data) {
sq_sum += (v - mean) * (v - mean);
}
float variance = sq_sum / data.size();
float stddev = std::sqrt(variance);
stats.set_variance(variance);
stats.set_stddev(stddev);

// ========== 3. 最大最小值 ==========
if (std::is_arithmetic_v<T>) {
T min_val = *std::min_element(data.begin(), data.end());
T max_val = *std::max_element(data.begin(), data.end());
stats.set_min(static_cast<float>(min_val));
stats.set_max(static_cast<float>(max_val));
stats.set_range(static_cast<float>(max_val - min_val));
}

// ========== 4. 变化趋势 ==========
if (data.size() >= 2) {
float first = static_cast<float>(data.front());
float last = static_cast<float>(data.back());
stats.set_trend(last - first);
}

// ========== 5. 过零率(频率)==========
if (data.size() >= 2 && std::is_arithmetic_v<T>) {
int zero_crossings = 0;
for (size_t i = 1; i < data.size(); ++i) {
float prev = static_cast<float>(data[i - 1]) - mean;
float curr = static_cast<float>(data[i]) - mean;
if (prev * curr < 0) {
zero_crossings++;
}
}
stats.set_zero_crossings(static_cast<float>(zero_crossings));
stats.set_frequency(zero_crossings / 2.0f / data.size());
}

stats.set_valid(true);
stats.set_count(data.size());
}

cc->Outputs().Index(0).AddPacket(
MakePacket<TemporalStats>(stats).At(cc->InputTimestamp()));

return absl::OkStatus();
}
};

REGISTER_CALCULATOR(TemporalStatsCalculator);

// ========== 模板实例化 ==========
extern template class TemporalStatsCalculator<float>;
extern template class TemporalStatsCalculator<int>;

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_TEMPORAL_TEMPORAL_STATS_CALCULATOR_H_

二十四、IMS 实战:PERCLOS 计算

24.1 PERCLOS Calculator 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// perclos_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_IMS_PERCLOS_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_IMS_PERCLOS_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include <deque>

namespace mediapipe {

// ========== Proto Options ==========
/*
syntax = "proto3";
package mediapipe;

message PERCLOSOptions {
optional int32 window_frames = 1 [default = 30];
optional float closed_threshold = 2 [default = 0.3];
optional bool use_both_eyes = 3 [default = true];

// 疲劳等级阈值
optional float mild_threshold = 10 [default = 0.3];
optional float moderate_threshold = 11 [default = 0.5];
optional float severe_threshold = 12 [default = 0.8];
}
*/

// ========== PERCLOS Calculator ==========
class PERCLOSCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("EYE_STATE").Set<EyeState>();

cc->Outputs().Tag("PERCLOS").Set<float>();
cc->Outputs().Tag("FATIGUE_LEVEL").Set<int>();
cc->Outputs().Tag("DETAILS").Set<PERCLOSDetails>();

cc->Options<PERCLOSOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<PERCLOSOptions>();

window_frames_ = options.window_frames();
closed_threshold_ = options.closed_threshold();
use_both_eyes_ = options.use_both_eyes();
mild_threshold_ = options.mild_threshold();
moderate_threshold_ = options.moderate_threshold();
severe_threshold_ = options.severe_threshold();

// 初始化缓冲区
closure_buffer_.resize(window_frames_, 0.0f);
buffer_index_ = 0;
buffer_count_ = 0;

LOG(INFO) << "PERCLOSCalculator initialized: "
<< "window=" << window_frames_
<< ", threshold=" << closed_threshold_;

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
if (cc->Inputs().Tag("EYE_STATE").IsEmpty()) {
return absl::OkStatus();
}

const EyeState& eye = cc->Inputs().Tag("EYE_STATE").Get<EyeState>();

// ========== 1. 计算眼睛闭合程度 ==========
float closure = ComputeEyeClosure(eye);

// ========== 2. 添加到环形缓冲区 ==========
closure_buffer_[buffer_index_] = closure;
buffer_index_ = (buffer_index_ + 1) % window_frames_;
buffer_count_ = std::min(buffer_count_ + 1, window_frames_);

// ========== 3. 如果窗口未满,不输出 ==========
if (buffer_count_ < window_frames_) {
return absl::OkStatus();
}

// ========== 4. 计算 PERCLOS ==========
float perclos = ComputePERCLOS();

// ========== 5. 判断疲劳等级 ==========
int fatigue_level = ComputeFatigueLevel(perclos);

// ========== 6. 创建详细信息 ==========
PERCLOSDetails details;
details.set_perclos(perclos);
details.set_fatigue_level(fatigue_level);
details.set_window_frames(buffer_count_);

// 计算闭眼次数
int closed_frames = 0;
for (size_t i = 0; i < buffer_count_; ++i) {
if (closure_buffer_[i] >= closed_threshold_) {
closed_frames++;
}
}
details.set_closed_frames(closed_frames);
details.set_open_frames(buffer_count_ - closed_frames);

// ========== 7. 输出 ==========
cc->Outputs().Tag("PERCLOS").AddPacket(
MakePacket<float>(perclos).At(cc->InputTimestamp()));

cc->Outputs().Tag("FATIGUE_LEVEL").AddPacket(
MakePacket<int>(fatigue_level).At(cc->InputTimestamp()));

cc->Outputs().Tag("DETAILS").AddPacket(
MakePacket<PERCLOSDetails>(details).At(cc->InputTimestamp()));

return absl::OkStatus();
}

private:
// ========== 配置 ==========
int window_frames_ = 30;
float closed_threshold_ = 0.3f;
bool use_both_eyes_ = true;

float mild_threshold_ = 0.3f;
float moderate_threshold_ = 0.5f;
float severe_threshold_ = 0.8f;

// ========== 环形缓冲区 ==========
std::vector<float> closure_buffer_;
int buffer_index_ = 0;
int buffer_count_ = 0;

// ========== 方法 ==========
float ComputeEyeClosure(const EyeState& eye) {
float left_closure = 1.0f - eye.left_eye_open();
float right_closure = 1.0f - eye.right_eye_open();

if (use_both_eyes_) {
return (left_closure + right_closure) / 2.0f;
} else {
return std::max(left_closure, right_closure);
}
}

float ComputePERCLOS() {
int closed_count = 0;

for (size_t i = 0; i < buffer_count_; ++i) {
if (closure_buffer_[i] >= closed_threshold_) {
closed_count++;
}
}

return static_cast<float>(closed_count) / buffer_count_ * 100.0f;
}

int ComputeFatigueLevel(float perclos) {
if (perclos >= severe_threshold_) {
return 3; // 极度疲劳
} else if (perclos >= moderate_threshold_) {
return 2; // 明显疲劳
} else if (perclos >= mild_threshold_) {
return 1; // 轻度疲劳
} else {
return 0; // 正常
}
}
};

REGISTER_CALCULATOR(PERCLOSCalculator);

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_IMS_PERCLOS_CALCULATOR_H_

24.2 Graph 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# ims_perclos_graph.pbtxt

input_stream: "IR_IMAGE:ir_image"
output_stream: "PERCLOS:perclos"
output_stream: "FATIGUE_LEVEL:fatigue_level"

# 眼睛状态检测
node {
calculator: "EyeStateCalculator"
input_stream: "IMAGE:ir_image"
output_stream: "EYE_STATE:eye_state"
}

# PERCLOS 计算
node {
calculator: "PERCLOSCalculator"
input_stream: "EYE_STATE:eye_state"
output_stream: "PERCLOS:perclos"
output_stream: "FATIGUE_LEVEL:fatigue_level"
output_stream: "DETAILS:details"
options {
[mediapipe.PERCLOSOptions.ext] {
window_frames: 30
closed_threshold: 0.3
use_both_eyes: true
mild_threshold: 0.3
moderate_threshold: 0.5
severe_threshold: 0.8
}
}
}

二十五、性能优化

25.1 增量计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// ========== 增量计算均值和方差 ==========
class IncrementalStats {
public:
void Add(float value) {
count_++;
sum_ += value;

// Welford 算法更新方差
float delta = value - mean_;
mean_ += delta / count_;
float delta2 = value - mean_;
m2_ += delta * delta2;
}

float Mean() const { return mean_; }
float Variance() const { return count_ > 1 ? m2_ / (count_ - 1) : 0; }
float StdDev() const { return std::sqrt(Variance()); }
int Count() const { return count_; }

private:
int count_ = 0;
float sum_ = 0;
float mean_ = 0;
float m2_ = 0; // 平方和
};

25.2 内存池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// ========== 内存池实现 ==========
template <typename T, size_t N>
class FixedPool {
public:
T* Allocate() {
if (free_list_.empty()) {
return nullptr;
}
T* ptr = free_list_.back();
free_list_.pop_back();
return ptr;
}

void Deallocate(T* ptr) {
free_list_.push_back(ptr);
}

private:
std::vector<T*> free_list_;
T pool_[N];
};

二十六、总结

窗口类型 实现 适用场景
固定大小 std::deque 稳定帧率
时间窗口 时间戳过滤 可变帧率
环形缓冲区 RingBuffer 高性能
特征统计 TemporalStats 时序分析
PERCLOS 闭眼比例 疲劳检测

下篇预告

MediaPipe 系列 19:条件分支 Calculator——动态路由

深入讲解条件分支、动态路由、状态机实现。


参考资料

  1. Google AI Edge. Temporal Data Processing
  2. Wikipedia. Moving Average
  3. Wikipedia. Zero-Crossing Rate

系列进度: 18/55
更新时间: 2026-03-12


MediaPipe 系列 18:滑动窗口 Calculator——时序数据处理完整指南
https://dapalm.com/2026/03/13/MediaPipe系列18-滑动窗口Calculator:时序数据处理/
作者
Mars
发布于
2026年3月13日
许可协议