|
1 | 1 | # basically we are porting this https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
|
2 | 2 |
|
3 |
| -require "java" |
| 3 | +require 'java' |
4 | 4 |
|
5 |
| -require "jruby-kafka/namespace" |
6 |
| -require "jruby-kafka/consumer" |
7 |
| -require "jruby-kafka/error" |
8 |
| - |
9 |
| -java_import 'java.util.concurrent.ExecutorService' |
10 |
| -java_import 'java.util.concurrent.Executors' |
11 |
| -java_import 'org.I0Itec.zkclient.exception.ZkException' |
| 5 | +require 'jruby-kafka/namespace' |
| 6 | +require 'jruby-kafka/consumer' |
| 7 | +require 'jruby-kafka/error' |
12 | 8 |
|
| 9 | +# noinspection JRubyStringImportInspection |
13 | 10 | class Kafka::Group
|
| 11 | + java_import 'java.util.concurrent.ExecutorService' |
| 12 | + java_import 'java.util.concurrent.Executors' |
| 13 | + java_import 'org.I0Itec.zkclient.exception.ZkException' |
| 14 | + |
14 | 15 | @consumer
|
15 | 16 | @executor
|
16 | 17 | @topic
|
@@ -136,81 +137,80 @@ def initialize(options={})
|
136 | 137 | end
|
137 | 138 | end
|
138 | 139 |
|
139 |
| - private |
140 |
| - def validate_required_arguments(options={}) |
141 |
| - [:zk_connect, :group_id, :topic_id].each do |opt| |
142 |
| - raise(ArgumentError, "#{opt} is required.") unless options[opt] |
143 |
| - end |
144 |
| - end |
145 |
| - |
146 | 140 | public
|
147 |
| - def shutdown() |
| 141 | + |
| 142 | + def shutdown |
148 | 143 | if @consumer
|
149 |
| - @consumer.shutdown() |
| 144 | + @consumer.shutdown |
150 | 145 | end
|
151 | 146 | if @executor
|
152 |
| - @executor.shutdown() |
| 147 | + @executor.shutdown |
153 | 148 | end
|
154 | 149 | @running = false
|
155 | 150 | end
|
156 | 151 |
|
157 |
| - public |
158 |
| - def run(a_numThreads, a_queue) |
| 152 | + def run(a_num_threads, a_queue) |
159 | 153 | begin
|
160 | 154 | if @auto_offset_reset == 'smallest'
|
161 | 155 | Java::kafka::utils::ZkUtils.maybeDeletePath(@zk_connect, "/consumers/#{@group_id}")
|
162 | 156 | end
|
163 | 157 |
|
164 |
| - @consumer = Java::kafka::consumer::Consumer.createJavaConsumerConnector(createConsumerConfig()) |
| 158 | + @consumer = Java::kafka::consumer::Consumer.createJavaConsumerConnector(create_consumer_config) |
165 | 159 | rescue ZkException => e
|
166 | 160 | raise KafkaError.new(e), "Got ZkException: #{e}"
|
167 | 161 | end
|
168 |
| - topicCountMap = java.util.HashMap.new() |
169 |
| - thread_value = a_numThreads.to_java Java::int |
170 |
| - topicCountMap.put(@topic, thread_value) |
171 |
| - consumerMap = @consumer.createMessageStreams(topicCountMap) |
172 |
| - streams = Array.new(consumerMap[@topic]) |
| 162 | + topic_count_map = java.util.HashMap.new |
| 163 | + thread_value = a_num_threads.to_java Java::int |
| 164 | + topic_count_map.put(@topic, thread_value) |
| 165 | + consumer_map = @consumer.createMessageStreams(topic_count_map) |
| 166 | + streams = Array.new(consumer_map[@topic]) |
173 | 167 |
|
174 |
| - @executor = Executors.newFixedThreadPool(a_numThreads) |
| 168 | + @executor = Executors.newFixedThreadPool(a_num_threads) |
175 | 169 | @executor_submit = @executor.java_method(:submit, [Java::JavaLang::Runnable.java_class])
|
176 | 170 |
|
177 |
| - threadNumber = 0 |
178 |
| - for stream in streams |
179 |
| - @executor_submit.call(Kafka::Consumer.new(stream, threadNumber, a_queue, @consumer_restart_on_error, @consumer_restart_sleep_ms)) |
180 |
| - threadNumber += 1 |
| 171 | + thread_number = 0 |
| 172 | + streams.each do |stream| |
| 173 | + @executor_submit.call(Kafka::Consumer.new(stream, thread_number, a_queue, @consumer_restart_on_error, @consumer_restart_sleep_ms)) |
| 174 | + thread_number += 1 |
181 | 175 | end
|
182 | 176 | @running = true
|
183 | 177 | end
|
184 | 178 |
|
185 |
| - public |
186 | 179 | def running?
|
187 | 180 | @running
|
188 | 181 | end
|
189 | 182 |
|
190 | 183 | private
|
191 |
| - def createConsumerConfig() |
192 |
| - properties = java.util.Properties.new() |
193 |
| - properties.put("zookeeper.connect", @zk_connect) |
194 |
| - properties.put("group.id", @group_id) |
195 |
| - properties.put("zookeeper.connection.timeout.ms", @zk_connect_timeout) |
196 |
| - properties.put("zookeeper.session.timeout.ms", @zk_session_timeout) |
197 |
| - properties.put("zookeeper.sync.time.ms", @zk_sync_time) |
198 |
| - properties.put("auto.commit.interval.ms", @auto_commit_interval) |
199 |
| - properties.put("auto.offset.reset", @auto_offset_reset) |
200 |
| - properties.put("rebalance.max.retries", @rebalance_max_retries) |
201 |
| - properties.put("rebalance.backoff.ms", @rebalance_backoff_ms) |
202 |
| - properties.put("socket.timeout.ms", @socket_timeout_ms) |
203 |
| - properties.put("socket.receive.buffer.bytes", @socket_receive_buffer_bytes) |
204 |
| - properties.put("fetch.message.max.bytes", @fetch_message_max_bytes) |
205 |
| - properties.put("auto.commit.enable", @auto_commit_enable) |
206 |
| - properties.put("queued.max.message.chunks", @queued_max_message_chunks) |
207 |
| - properties.put("fetch.min.bytes", @fetch_min_bytes) |
208 |
| - properties.put("fetch.wait.max.ms", @fetch_wait_max_ms) |
209 |
| - properties.put("refresh.leader.backoff.ms", @refresh_leader_backoff_ms) |
210 |
| - properties.put("consumer.timeout.ms", @consumer_timeout_ms) |
| 184 | + |
| 185 | + def validate_required_arguments(options={}) |
| 186 | + [:zk_connect, :group_id, :topic_id].each do |opt| |
| 187 | + raise(ArgumentError, "#{opt} is required.") unless options[opt] |
| 188 | + end |
| 189 | + end |
| 190 | + |
| 191 | + def create_consumer_config |
| 192 | + properties = java.util.Properties.new |
| 193 | + properties.put('zookeeper.connect', @zk_connect) |
| 194 | + properties.put('group.id', @group_id) |
| 195 | + properties.put('zookeeper.connection.timeout.ms', @zk_connect_timeout) |
| 196 | + properties.put('zookeeper.session.timeout.ms', @zk_session_timeout) |
| 197 | + properties.put('zookeeper.sync.time.ms', @zk_sync_time) |
| 198 | + properties.put('auto.commit.interval.ms', @auto_commit_interval) |
| 199 | + properties.put('auto.offset.reset', @auto_offset_reset) |
| 200 | + properties.put('rebalance.max.retries', @rebalance_max_retries) |
| 201 | + properties.put('rebalance.backoff.ms', @rebalance_backoff_ms) |
| 202 | + properties.put('socket.timeout.ms', @socket_timeout_ms) |
| 203 | + properties.put('socket.receive.buffer.bytes', @socket_receive_buffer_bytes) |
| 204 | + properties.put('fetch.message.max.bytes', @fetch_message_max_bytes) |
| 205 | + properties.put('auto.commit.enable', @auto_commit_enable) |
| 206 | + properties.put('queued.max.message.chunks', @queued_max_message_chunks) |
| 207 | + properties.put('fetch.min.bytes', @fetch_min_bytes) |
| 208 | + properties.put('fetch.wait.max.ms', @fetch_wait_max_ms) |
| 209 | + properties.put('refresh.leader.backoff.ms', @refresh_leader_backoff_ms) |
| 210 | + properties.put('consumer.timeout.ms', @consumer_timeout_ms) |
211 | 211 | unless @consumer_id.nil?
|
212 | 212 | properties.put('consumer.id', @consumer_id)
|
213 | 213 | end
|
214 |
| - return Java::kafka::consumer::ConsumerConfig.new(properties) |
| 214 | + Java::kafka::consumer::ConsumerConfig.new(properties) |
215 | 215 | end
|
216 | 216 | end
|
0 commit comments