Skip to content

Commit e5c1623

Browse files
authored
output: add metrics for dropped oldest chunk count (#4981)
**Which issue(s) this PR fixes**: Fixes # **What this PR does / why we need it**: When `overflow_action drop_oldest_chunk` is configured, the oldest buffer chunk will be automatically discarded when the total buffer capacity is exhausted. This PR introduces a new metric, `drop_oldest_chunk_count`, which counts the number of discarded chunks. **Docs Changes**: fluent/fluentd-docs-gitbook#582 **Release Note**: Same as the title Signed-off-by: Shizuo Fujita <[email protected]>
1 parent d80dc46 commit e5c1623

File tree

4 files changed

+18
-1
lines changed

4 files changed

+18
-1
lines changed

lib/fluent/plugin/output.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ def initialize
190190
@rollback_count_metrics = nil
191191
@flush_time_count_metrics = nil
192192
@slow_flush_count_metrics = nil
193+
@drop_oldest_chunk_count_metrics = nil
193194
@enable_size_metrics = false
194195

195196
# How to process events is decided here at once, but it will be decided in delayed way on #configure & #start
@@ -259,6 +260,7 @@ def configure(conf)
259260
@rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations")
260261
@flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time")
261262
@slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)")
263+
@drop_oldest_chunk_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "drop_oldest_chunk_count", help_text: "Number of count that old chunk were discarded with drop_oldest_chunk")
262264

263265
if has_buffer_section
264266
unless implement?(:buffered) || implement?(:delayed_commit)
@@ -977,6 +979,7 @@ def write_guard(&block)
977979
if oldest
978980
log.warn "dropping oldest chunk to make space after buffer overflow", chunk_id: dump_unique_id_hex(oldest.unique_id)
979981
@buffer.purge_chunk(oldest.unique_id)
982+
@drop_oldest_chunk_count_metrics.inc
980983
else
981984
log.error "no queued chunks to be dropped for drop_oldest_chunk"
982985
end
@@ -1575,6 +1578,7 @@ def statistics
15751578
'rollback_count' => @rollback_count_metrics.get,
15761579
'slow_flush_count' => @slow_flush_count_metrics.get,
15771580
'flush_time_count' => @flush_time_count_metrics.get,
1581+
'drop_oldest_chunk_count' => @drop_oldest_chunk_count_metrics.get,
15781582
}
15791583

15801584
if @buffer && @buffer.respond_to?(:statistics)

test/plugin/test_in_monitor_agent.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ def test_enable_input_metrics(with_config)
185185
"rollback_count" => Integer,
186186
"slow_flush_count" => Integer,
187187
"flush_time_count" => Integer,
188+
"drop_oldest_chunk_count" => Integer,
188189
}
189190
output_info["config"] = {"@id" => "test_out", "@type" => "test_out"} if with_config
190191
error_label_info = {
@@ -208,6 +209,7 @@ def test_enable_input_metrics(with_config)
208209
"rollback_count" => Integer,
209210
"slow_flush_count" => Integer,
210211
"flush_time_count" => Integer,
212+
"drop_oldest_chunk_count" => Integer,
211213
}
212214
error_label_info["config"] = {"@id"=>"null", "@type" => "null"} if with_config
213215
opts = {with_config: with_config}
@@ -326,6 +328,7 @@ def test_enable_input_metrics(with_config)
326328
"rollback_count" => Integer,
327329
"slow_flush_count" => Integer,
328330
"flush_time_count" => Integer,
331+
"drop_oldest_chunk_count" => Integer,
329332
}
330333
output_info["config"] = {"@id" => "test_out", "@type" => "test_out"} if with_config
331334
error_label_info = {
@@ -349,6 +352,7 @@ def test_enable_input_metrics(with_config)
349352
"rollback_count" => Integer,
350353
"slow_flush_count" => Integer,
351354
"flush_time_count" => Integer,
355+
"drop_oldest_chunk_count" => Integer,
352356
}
353357
error_label_info["config"] = {"@id"=>"null", "@type" => "null"} if with_config
354358
opts = {with_config: with_config}
@@ -424,6 +428,7 @@ def test_enable_input_metrics(with_config)
424428
"rollback_count" => Integer,
425429
"slow_flush_count" => Integer,
426430
"flush_time_count" => Integer,
431+
"drop_oldest_chunk_count" => Integer,
427432
}
428433
expect_test_out_record = {
429434
"plugin_id" => "test_out",
@@ -439,6 +444,7 @@ def test_enable_input_metrics(with_config)
439444
"rollback_count" => Integer,
440445
"slow_flush_count" => Integer,
441446
"flush_time_count" => Integer,
447+
"drop_oldest_chunk_count" => Integer,
442448
}
443449
assert_fuzzy_equal(expect_relabel_record, d.events[1][2])
444450
assert_fuzzy_equal(expect_test_out_record, d.events[3][2])
@@ -578,6 +584,7 @@ def get(uri, header = {})
578584
"rollback_count" => Integer,
579585
"slow_flush_count" => Integer,
580586
"flush_time_count" => Integer,
587+
"drop_oldest_chunk_count" => Integer,
581588
}
582589
expected_null_response["config"] = {"@id" => "null", "@type" => "null"} if with_config
583590
expected_null_response["retry"] = {} if with_retry
@@ -643,6 +650,7 @@ def get(uri, header = {})
643650
"rollback_count" => Integer,
644651
"slow_flush_count" => Integer,
645652
"flush_time_count" => Integer,
653+
"drop_oldest_chunk_count" => Integer,
646654
}
647655
expected_null_response["config"] = {"@id" => "null", "@type" => "null"} if with_config
648656
expected_null_response["retry"] = {} if with_retry
@@ -693,6 +701,7 @@ def get(uri, header = {})
693701
"rollback_count" => Integer,
694702
"slow_flush_count" => Integer,
695703
"flush_time_count" => Integer,
704+
"drop_oldest_chunk_count" => Integer,
696705
}
697706
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json?with_config=no&with_retry=no&with_ivars=id,num_errors").body)
698707
test_in_response = response["plugins"][0]
@@ -825,6 +834,7 @@ def write(chunk)
825834
"rollback_count" => Integer,
826835
'slow_flush_count' => Integer,
827836
'flush_time_count' => Integer,
837+
"drop_oldest_chunk_count" => Integer,
828838
}
829839
output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]]))
830840
# flush few times to check steps

test/plugin/test_output.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,8 @@ def waiting(seconds)
227227
@i.configure(config_element())
228228

229229
%w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics write_count_metrics
230-
write_secondary_count_metrics rollback_count_metrics flush_time_count_metrics slow_flush_count_metrics].each do |metric_name|
230+
write_secondary_count_metrics rollback_count_metrics flush_time_count_metrics slow_flush_count_metrics
231+
drop_oldest_chunk_count_metrics].each do |metric_name|
231232
assert_true @i.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics)
232233
end
233234

@@ -241,6 +242,7 @@ def waiting(seconds)
241242
assert_equal 0, @i.rollback_count
242243
assert_equal 0, @i.flush_time_count
243244
assert_equal 0, @i.slow_flush_count
245+
assert_equal 0, @i.drop_oldest_chunk_count
244246
end
245247

246248
data(:new_api => :chunk,

test/plugin/test_output_as_buffered_overflow.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ def waiting(seconds)
217217
logs = @i.log.out.logs
218218
assert{ logs.any?{|line| line.include?("failed to write data into buffer by buffer overflow") } }
219219
assert{ logs.any?{|line| line.include?("dropping oldest chunk to make space after buffer overflow") } }
220+
assert{ @i.drop_oldest_chunk_count > 0 }
220221
end
221222

222223
test '#emit_events raises OverflowError if all buffer spaces are used by staged chunks' do

0 commit comments

Comments
 (0)