6
6
#include " commconstants.h"
7
7
#include " convertutils.h"
8
8
9
- namespace server {
10
-
11
- GateIO::GateIO () {
12
- m_is_running.store (false );
13
- }
14
-
15
- GateIO::~GateIO () {
16
- stop ();
17
- }
9
+ #include " sockpp/tcp6_acceptor.h"
18
10
19
- void GateIO::start (int port_num) {
20
- if (!m_is_running.load ()) {
21
- m_port_num = port_num;
22
- VTR_LOG (" starting server" );
23
- m_is_running.store (true );
24
- m_thread = std::thread (&GateIO::start_listening, this );
25
- }
26
- }
27
-
28
- void GateIO::stop () {
29
- if (m_is_running.load ()) {
30
- m_is_running.store (false );
31
- if (m_thread.joinable ()) {
32
- m_thread.join ();
33
- }
34
- }
35
- }
36
-
37
- void GateIO::take_received_tasks (std::vector<TaskPtr>& tasks) {
38
- std::unique_lock<std::mutex> lock (m_tasks_mutex);
39
- for (TaskPtr& task : m_received_tasks) {
40
- m_logger.queue (LogLevel::Debug, " move task id=" , task->job_id (), " for processing" );
41
- tasks.push_back (std::move (task));
42
- }
43
- m_received_tasks.clear ();
44
- }
45
-
46
- void GateIO::move_tasks_to_send_queue (std::vector<TaskPtr>& tasks) {
47
- std::unique_lock<std::mutex> lock (m_tasks_mutex);
48
- for (TaskPtr& task : tasks) {
49
- m_logger.queue (LogLevel::Debug, " move task id=" , task->job_id (), " finished" , (task->has_error () ? " with error" : " successfully" ), task->error (), " to send queue" );
50
- m_send_tasks.push_back (std::move (task));
51
- }
52
- tasks.clear ();
53
- }
11
+ namespace server {
54
12
55
- GateIO:: ActivityStatus GateIO:: check_client_connection (sockpp::tcp6_acceptor& tcp_server, std::optional<sockpp::tcp6_socket>& client_opt) {
13
+ static ActivityStatus check_client_connection (sockpp::tcp6_acceptor& tcp_server, std::optional<sockpp::tcp6_socket>& client_opt, TLogger& logger ) {
56
14
ActivityStatus status = ActivityStatus::WAITING_ACTIVITY;
57
15
58
16
sockpp::inet6_address peer;
59
17
sockpp::tcp6_socket client = tcp_server.accept (&peer);
60
18
if (client) {
61
- m_logger .queue (LogLevel::Info, " client" , client.address ().to_string (), " connection accepted" );
19
+ logger .queue (LogLevel::Info, " client" , client.address ().to_string (), " connection accepted" );
62
20
client.set_non_blocking (true );
63
21
client_opt = std::move (client);
64
22
@@ -68,67 +26,67 @@ GateIO::ActivityStatus GateIO::check_client_connection(sockpp::tcp6_acceptor& tc
68
26
return status;
69
27
}
70
28
71
- GateIO:: ActivityStatus GateIO:: handle_sending_data (sockpp::tcp6_socket& client) {
29
+ static ActivityStatus handle_sending_data (sockpp::tcp6_socket& client, std::mutex& tasks_mutex, std::vector<TaskPtr>& send_tasks, TLogger& logger ) {
72
30
ActivityStatus status = ActivityStatus::WAITING_ACTIVITY;
73
- std::unique_lock<std::mutex> lock (m_tasks_mutex );
31
+ std::unique_lock<std::mutex> lock (tasks_mutex );
74
32
75
- if (!m_send_tasks .empty ()) {
76
- const TaskPtr& task = m_send_tasks .at (0 );
33
+ if (!send_tasks .empty ()) {
34
+ const TaskPtr& task = send_tasks .at (0 );
77
35
try {
78
36
std::size_t bytes_to_send = std::min (CHUNK_MAX_BYTES_NUM, task->response_buffer ().size ());
79
37
std::size_t bytes_sent = client.write_n (task->response_buffer ().data (), bytes_to_send);
80
38
if (bytes_sent <= task->orig_reponse_bytes_num ()) {
81
39
task->chop_num_sent_bytes_from_response_buffer (bytes_sent);
82
- m_logger .queue (LogLevel::Detail,
83
- " sent chunk:" , get_pretty_size_str_from_bytes_num (bytes_sent),
84
- " from" , get_pretty_size_str_from_bytes_num (task->orig_reponse_bytes_num ()),
85
- " left:" , get_pretty_size_str_from_bytes_num (task->response_buffer ().size ()));
40
+ logger .queue (LogLevel::Detail,
41
+ " sent chunk:" , get_pretty_size_str_from_bytes_num (bytes_sent),
42
+ " from" , get_pretty_size_str_from_bytes_num (task->orig_reponse_bytes_num ()),
43
+ " left:" , get_pretty_size_str_from_bytes_num (task->response_buffer ().size ()));
86
44
status = ActivityStatus::CLIENT_ACTIVITY;
87
45
}
88
46
} catch (...) {
89
- m_logger .queue (LogLevel::Detail, " error while writing chunk" );
47
+ logger .queue (LogLevel::Detail, " error while writing chunk" );
90
48
status = ActivityStatus::COMMUNICATION_PROBLEM;
91
49
}
92
50
93
51
if (task->is_response_fully_sent ()) {
94
- m_logger .queue (LogLevel::Info, " sent:" , task->telegram_header ().info (), task->info ());
52
+ logger .queue (LogLevel::Info, " sent:" , task->telegram_header ().info (), task->info ());
95
53
}
96
54
}
97
55
98
56
// remove reported tasks
99
- std::size_t tasks_num_before_removing = m_send_tasks .size ();
57
+ std::size_t tasks_num_before_removing = send_tasks .size ();
100
58
101
- auto partition_iter = std::partition (m_send_tasks .begin (), m_send_tasks .end (),
59
+ auto partition_iter = std::partition (send_tasks .begin (), send_tasks .end (),
102
60
[](const TaskPtr& task) { return !task->is_response_fully_sent (); });
103
- m_send_tasks .erase (partition_iter, m_send_tasks .end ());
104
- bool is_removing_took_place = tasks_num_before_removing != m_send_tasks .size ();
105
- if (!m_send_tasks .empty () && is_removing_took_place) {
106
- m_logger .queue (LogLevel::Detail, " left tasks num to send " , m_send_tasks .size ());
61
+ send_tasks .erase (partition_iter, send_tasks .end ());
62
+ bool is_removing_took_place = tasks_num_before_removing != send_tasks .size ();
63
+ if (!send_tasks .empty () && is_removing_took_place) {
64
+ logger .queue (LogLevel::Detail, " left tasks num to send " , send_tasks .size ());
107
65
}
108
66
109
67
return status;
110
68
}
111
69
112
- GateIO:: ActivityStatus GateIO:: handle_receiving_data (sockpp::tcp6_socket& client, comm::TelegramBuffer& telegram_buff, std::string& received_message) {
70
+ static ActivityStatus handle_receiving_data (sockpp::tcp6_socket& client, comm::TelegramBuffer& telegram_buff, std::string& received_message, TLogger& logger ) {
113
71
ActivityStatus status = ActivityStatus::WAITING_ACTIVITY;
114
72
std::size_t bytes_actually_received{0 };
115
73
try {
116
74
bytes_actually_received = client.read_n (&received_message[0 ], CHUNK_MAX_BYTES_NUM);
117
75
} catch (...) {
118
- m_logger .queue (LogLevel::Error, " fail to receiving" );
76
+ logger .queue (LogLevel::Error, " fail to receiving" );
119
77
status = ActivityStatus::COMMUNICATION_PROBLEM;
120
78
}
121
79
122
80
if ((bytes_actually_received > 0 ) && (bytes_actually_received <= CHUNK_MAX_BYTES_NUM)) {
123
- m_logger .queue (LogLevel::Detail, " received chunk:" , get_pretty_size_str_from_bytes_num (bytes_actually_received));
81
+ logger .queue (LogLevel::Detail, " received chunk:" , get_pretty_size_str_from_bytes_num (bytes_actually_received));
124
82
telegram_buff.append (comm::ByteArray{received_message.c_str (), bytes_actually_received});
125
83
status = ActivityStatus::CLIENT_ACTIVITY;
126
84
}
127
85
128
86
return status;
129
87
}
130
88
131
- GateIO:: ActivityStatus GateIO:: handle_telegrams (std::vector<comm::TelegramFramePtr>& telegram_frames, comm::TelegramBuffer& telegram_buff) {
89
+ static ActivityStatus handle_telegrams (std::vector<comm::TelegramFramePtr>& telegram_frames, comm::TelegramBuffer& telegram_buff, std::mutex& tasks_mutex, std::vector<TaskPtr>& received_tasks, TLogger& logger ) {
132
90
ActivityStatus status = ActivityStatus::WAITING_ACTIVITY;
133
91
telegram_frames.clear ();
134
92
telegram_buff.take_telegram_frames (telegram_frames);
@@ -137,32 +95,32 @@ GateIO::ActivityStatus GateIO::handle_telegrams(std::vector<comm::TelegramFrameP
137
95
std::string message{telegram_frame->body };
138
96
bool is_echo_telegram = false ;
139
97
if ((message.size () == comm::ECHO_TELEGRAM_BODY.size ()) && (message == comm::ECHO_TELEGRAM_BODY)) {
140
- m_logger .queue (LogLevel::Detail, " received" , comm::ECHO_TELEGRAM_BODY);
98
+ logger .queue (LogLevel::Detail, " received" , comm::ECHO_TELEGRAM_BODY);
141
99
is_echo_telegram = true ;
142
100
status = ActivityStatus::CLIENT_ACTIVITY;
143
101
}
144
102
145
103
if (!is_echo_telegram) {
146
- m_logger .queue (LogLevel::Detail, " received composed" , get_pretty_size_str_from_bytes_num (message.size ()), " :" , get_truncated_middle_str (message));
104
+ logger .queue (LogLevel::Detail, " received composed" , get_pretty_size_str_from_bytes_num (message.size ()), " :" , get_truncated_middle_str (message));
147
105
std::optional<int > job_id_opt = comm::TelegramParser::try_extract_field_job_id (message);
148
106
std::optional<int > cmd_opt = comm::TelegramParser::try_extract_field_cmd (message);
149
107
std::optional<std::string> options_opt = comm::TelegramParser::try_extract_field_options (message);
150
108
if (job_id_opt && cmd_opt && options_opt) {
151
109
TaskPtr task = std::make_unique<Task>(job_id_opt.value (), static_cast <comm::CMD>(cmd_opt.value ()), options_opt.value ());
152
110
const comm::TelegramHeader& header = telegram_frame->header ;
153
- m_logger .queue (LogLevel::Info, " received:" , header.info (), task->info (/* skipDuration*/ true ));
154
- std::unique_lock<std::mutex> lock (m_tasks_mutex );
155
- m_received_tasks .push_back (std::move (task));
111
+ logger .queue (LogLevel::Info, " received:" , header.info (), task->info (/* skipDuration*/ true ));
112
+ std::unique_lock<std::mutex> lock (tasks_mutex );
113
+ received_tasks .push_back (std::move (task));
156
114
} else {
157
- m_logger .queue (LogLevel::Error, " broken telegram detected, fail extract options from" , message);
115
+ logger .queue (LogLevel::Error, " broken telegram detected, fail extract options from" , message);
158
116
}
159
117
}
160
118
}
161
119
162
120
return status;
163
121
}
164
122
165
- GateIO:: ActivityStatus GateIO:: handle_client_alive_tracker (sockpp::tcp6_socket& client, std::unique_ptr<ClientAliveTracker>& client_alive_tracker_ptr) {
123
+ static ActivityStatus handle_client_alive_tracker (sockpp::tcp6_socket& client, std::unique_ptr<ClientAliveTracker>& client_alive_tracker_ptr, TLogger& logger ) {
166
124
ActivityStatus status = ActivityStatus::WAITING_ACTIVITY;
167
125
if (client_alive_tracker_ptr) {
168
126
// / handle sending echo to client
@@ -173,26 +131,26 @@ GateIO::ActivityStatus GateIO::handle_client_alive_tracker(sockpp::tcp6_socket&
173
131
try {
174
132
std::size_t bytes_sent = client.write (message);
175
133
if (bytes_sent == message.size ()) {
176
- m_logger .queue (LogLevel::Detail, " sent" , comm::ECHO_TELEGRAM_BODY);
134
+ logger .queue (LogLevel::Detail, " sent" , comm::ECHO_TELEGRAM_BODY);
177
135
client_alive_tracker_ptr->on_echo_sent ();
178
136
}
179
137
} catch (...) {
180
- m_logger .queue (LogLevel::Debug, " fail to sent" , comm::ECHO_TELEGRAM_BODY);
138
+ logger .queue (LogLevel::Debug, " fail to sent" , comm::ECHO_TELEGRAM_BODY);
181
139
status = ActivityStatus::COMMUNICATION_PROBLEM;
182
140
}
183
141
}
184
142
185
143
// / handle client timeout
186
144
if (client_alive_tracker_ptr->is_client_timeout ()) {
187
- m_logger .queue (LogLevel::Error, " client didn't respond too long" );
145
+ logger .queue (LogLevel::Error, " client didn't respond too long" );
188
146
status = ActivityStatus::COMMUNICATION_PROBLEM;
189
147
}
190
148
}
191
149
192
150
return status;
193
151
}
194
152
195
- void GateIO:: handle_activity_status (ActivityStatus status, std::unique_ptr<ClientAliveTracker>& client_alive_tracker_ptr, bool & is_communication_problem_detected) {
153
+ static void handle_activity_status (ActivityStatus status, std::unique_ptr<ClientAliveTracker>& client_alive_tracker_ptr, bool & is_communication_problem_detected) {
196
154
if (status == ActivityStatus::CLIENT_ACTIVITY) {
197
155
if (client_alive_tracker_ptr) {
198
156
client_alive_tracker_ptr->on_client_activity ();
@@ -202,9 +160,54 @@ void GateIO::handle_activity_status(ActivityStatus status, std::unique_ptr<Clien
202
160
}
203
161
}
204
162
163
+ GateIO::GateIO () {
164
+ m_is_running.store (false );
165
+ }
166
+
167
+ GateIO::~GateIO () {
168
+ stop ();
169
+ }
170
+
171
+ void GateIO::start (int port_num) {
172
+ if (!m_is_running.load ()) {
173
+ m_port_num = port_num;
174
+ VTR_LOG (" starting server" );
175
+ m_is_running.store (true );
176
+ m_thread = std::thread (&GateIO::start_listening, this );
177
+ }
178
+ }
179
+
180
+ void GateIO::stop () {
181
+ if (m_is_running.load ()) {
182
+ m_is_running.store (false );
183
+ if (m_thread.joinable ()) {
184
+ m_thread.join ();
185
+ }
186
+ }
187
+ }
188
+
189
+ void GateIO::take_received_tasks (std::vector<TaskPtr>& tasks) {
190
+ std::unique_lock<std::mutex> lock (m_tasks_mutex);
191
+ for (TaskPtr& task : m_received_tasks) {
192
+ m_logger.queue (LogLevel::Debug, " move task id=" , task->job_id (), " for processing" );
193
+ tasks.push_back (std::move (task));
194
+ }
195
+ m_received_tasks.clear ();
196
+ }
197
+
198
+ void GateIO::move_tasks_to_send_queue (std::vector<TaskPtr>& tasks) {
199
+ std::unique_lock<std::mutex> lock (m_tasks_mutex);
200
+ for (TaskPtr& task : tasks) {
201
+ m_logger.queue (LogLevel::Debug, " move task id=" , task->job_id (), " finished" , (task->has_error () ? " with error" : " successfully" ), task->error (), " to send queue" );
202
+ m_send_tasks.push_back (std::move (task));
203
+ }
204
+ tasks.clear ();
205
+ }
206
+
205
207
void GateIO::start_listening () {
206
208
#ifdef ENABLE_CLIENT_ALIVE_TRACKER
207
- std::unique_ptr<ClientAliveTracker> client_alive_tracker_ptr = std::make_unique<ClientAliveTracker>(std::chrono::milliseconds{5000 }, std::chrono::milliseconds{20000 });
209
+ std::unique_ptr<ClientAliveTracker> client_alive_tracker_ptr =
210
+ std::make_unique<ClientAliveTracker>(std::chrono::milliseconds{5000 }, std::chrono::milliseconds{20000 });
208
211
#else
209
212
std::unique_ptr<ClientAliveTracker> client_alive_tracker_ptr;
210
213
#endif
@@ -232,7 +235,7 @@ void GateIO::start_listening() {
232
235
bool is_communication_problem_detected = false ;
233
236
234
237
if (!client_opt) {
235
- ActivityStatus status = check_client_connection (tcp_server, client_opt);
238
+ ActivityStatus status = check_client_connection (tcp_server, client_opt, m_logger );
236
239
if (status == ActivityStatus::CLIENT_ACTIVITY) {
237
240
if (client_alive_tracker_ptr) {
238
241
client_alive_tracker_ptr->reset ();
@@ -244,15 +247,15 @@ void GateIO::start_listening() {
244
247
sockpp::tcp6_socket& client = client_opt.value (); // shortcut
245
248
246
249
// / handle sending
247
- ActivityStatus status = handle_sending_data (client);
250
+ ActivityStatus status = handle_sending_data (client, m_tasks_mutex, m_send_tasks, m_logger );
248
251
handle_activity_status (status, client_alive_tracker_ptr, is_communication_problem_detected);
249
252
250
253
// / handle receiving
251
- status = handle_receiving_data (client, telegram_buff, received_message);
254
+ status = handle_receiving_data (client, telegram_buff, received_message, m_logger );
252
255
handle_activity_status (status, client_alive_tracker_ptr, is_communication_problem_detected);
253
256
254
257
// / handle telegrams
255
- status = handle_telegrams (telegram_frames, telegram_buff);
258
+ status = handle_telegrams (telegram_frames, telegram_buff, m_tasks_mutex, m_received_tasks, m_logger );
256
259
handle_activity_status (status, client_alive_tracker_ptr, is_communication_problem_detected);
257
260
258
261
// forward telegramBuffer errors
@@ -263,7 +266,7 @@ void GateIO::start_listening() {
263
266
}
264
267
265
268
// / handle client alive tracker
266
- status = handle_client_alive_tracker (client, client_alive_tracker_ptr);
269
+ status = handle_client_alive_tracker (client, client_alive_tracker_ptr, m_logger );
267
270
handle_activity_status (status, client_alive_tracker_ptr, is_communication_problem_detected);
268
271
269
272
// / handle communication problem
0 commit comments