Skip to content

Commit 29a6b85

Browse files
authored
Merge pull request #5203 from sysown/v3.0_FastForwardGracefulClose
Implement Fast Forward Grace Close Feature to Prevent Data Loss
2 parents 831d5fd + 8330037 commit 29a6b85

File tree

11 files changed

+362
-6
lines changed

11 files changed

+362
-6
lines changed

include/MySQL_Data_Stream.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,9 @@ class MySQL_Data_Stream
176176
char kill_type;
177177

178178
bool encrypted;
179+
// defer_close_due_to_fast_forward: Flag to prevent immediate closure of data stream
180+
// during fast forward grace close, allowing buffers to drain.
181+
bool defer_close_due_to_fast_forward;
179182
bool net_failure;
180183

181184
uint8_t pkt_sid;

include/MySQL_Session.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,10 @@ class MySQL_Session: public Base_Session<MySQL_Session, MySQL_Data_Stream, MySQL
403403

404404
ProxySQL_Node_Address * proxysql_node_address; // this is used ONLY for Admin, and only if the other party is another proxysql instance part of a cluster
405405
bool use_ldap_auth;
406+
// Fast forward grace close flags: track backend closure during fast forward mode
407+
// to allow pending client data to drain before closing the session.
408+
bool backend_closed_in_fast_forward;
409+
unsigned long long fast_forward_grace_start_time;
406410

407411
// this variable is relevant only if status == SETTING_VARIABLE
408412
enum mysql_variable_name changing_variable_idx;

include/MySQL_Thread.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,11 @@ struct p_th_gauge {
316316
mysql_monitor_replication_lag_timeout,
317317
mysql_monitor_history,
318318
__size
319+
319320
};
320321
};
321322

323+
322324
struct th_metrics_map_idx {
323325
enum index {
324326
counters = 0,
@@ -428,6 +430,7 @@ class MySQL_Threads_Handler
428430
char * monitor_replication_lag_use_percona_heartbeat;
429431
int ping_interval_server_msec;
430432
int ping_timeout_server;
433+
int fast_forward_grace_close_ms;
431434
int shun_on_failures;
432435
int shun_recovery_time_sec;
433436
int unshun_algorithm;

include/proxysql_structs.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1226,6 +1226,7 @@ __thread int mysql_thread___throttle_connections_per_sec_to_hostgroup;
12261226
__thread int mysql_thread___max_transaction_idle_time;
12271227
__thread int mysql_thread___max_transaction_time;
12281228
__thread int mysql_thread___threshold_query_length;
1229+
__thread int mysql_thread___fast_forward_grace_close_ms;
12291230
__thread int mysql_thread___threshold_resultset_size;
12301231
__thread int mysql_thread___wait_timeout;
12311232
__thread int mysql_thread___throttle_max_bytes_per_second_to_client;
@@ -1529,6 +1530,7 @@ extern __thread int mysql_thread___throttle_connections_per_sec_to_hostgroup;
15291530
extern __thread int mysql_thread___max_transaction_idle_time;
15301531
extern __thread int mysql_thread___max_transaction_time;
15311532
extern __thread int mysql_thread___threshold_query_length;
1533+
extern __thread int mysql_thread___fast_forward_grace_close_ms;
15321534
extern __thread int mysql_thread___threshold_resultset_size;
15331535
extern __thread int mysql_thread___wait_timeout;
15341536
extern __thread int mysql_thread___throttle_max_bytes_per_second_to_client;

lib/MySQL_Session.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,8 @@ MySQL_Session::MySQL_Session() {
684684
last_HG_affected_rows = -1; // #1421 : advanced support for LAST_INSERT_ID()
685685
proxysql_node_address = NULL;
686686
use_ldap_auth = false;
687+
backend_closed_in_fast_forward = false;
688+
fast_forward_grace_start_time = 0;
687689
}
688690

689691
/**
@@ -715,6 +717,8 @@ void MySQL_Session::reset() {
715717
mybe=NULL;
716718

717719
with_gtid = false;
720+
backend_closed_in_fast_forward = false;
721+
fast_forward_grace_start_time = 0;
718722

719723
//gtid_trxid = 0;
720724
gtid_hid = -1;
@@ -3778,6 +3782,38 @@ int MySQL_Session::GPFC_Statuses2(bool& wrong_pass, PtrSize_t& pkt) {
37783782
break;
37793783
case FAST_FORWARD:
37803784
mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size);
3785+
/*
3786+
* Fast Forward Grace Close Logic:
3787+
* In fast forward mode, ProxySQL forwards packets without buffering them.
3788+
* If the backend connection closes unexpectedly while client data is still
3789+
* being sent, we risk data loss. To mitigate this, we implement a grace
3790+
* period where the session remains open until all pending client output
3791+
* buffers are drained or a timeout (mysql_thread___fast_forward_grace_close_ms)
3792+
* is reached.
3793+
*
3794+
* This logic detects backend closure, starts the grace timer, and waits
3795+
* for buffers to empty before closing the session.
3796+
*/
3797+
// Detect if backend closed during fast forward
3798+
if (mybe->server_myds->status == MYSQL_SERVER_STATUS_OFFLINE_HARD || mybe->server_myds->fd == -1) {
3799+
if (!backend_closed_in_fast_forward) {
3800+
backend_closed_in_fast_forward = true;
3801+
fast_forward_grace_start_time = thread->curtime;
3802+
}
3803+
}
3804+
if (backend_closed_in_fast_forward) {
3805+
if (
3806+
( mybe->server_myds == nullptr || ( mybe->server_myds && mybe->server_myds->PSarrayIN->len == 0 ) )
3807+
&&
3808+
(client_myds->PSarrayOUT->len == 0 && (client_myds->queueOUT.head - client_myds->queueOUT.tail) == 0)
3809+
) {
3810+
// buffers empty, close
3811+
handler_ret = -1;
3812+
} else if (thread->curtime - fast_forward_grace_start_time > (unsigned long long)mysql_thread___fast_forward_grace_close_ms * 1000) {
3813+
// timeout, close
3814+
handler_ret = -1;
3815+
}
3816+
}
37813817
break;
37823818
// This state is required because it covers the following situation:
37833819
// 1. A new connection is created by a client and the 'FAST_FORWARD' mode is enabled.

lib/MySQL_Thread.cpp

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ static char * mysql_thread_variables_names[]= {
511511
(char *)"proxy_protocol_networks",
512512
(char *)"protocol_compression_level",
513513
(char *)"ignore_min_gtid_annotations",
514+
(char *)"fast_forward_grace_close_ms",
514515
NULL
515516
};
516517

@@ -1073,8 +1074,12 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
10731074
variables.default_variables[i]=strdup(mysql_tracked_variables[i].default_value);
10741075
}
10751076
variables.default_session_track_gtids=strdup((char *)MYSQL_DEFAULT_SESSION_TRACK_GTIDS);
1077+
// fast_forward_grace_close_ms: Configurable timeout (in milliseconds) for the "fast forward grace close" feature.
1078+
// This feature prevents data loss in fast forward mode by deferring session closure when the backend
1079+
// connection closes unexpectedly, allowing time for pending client output to drain.
10761080
variables.ping_interval_server_msec=10000;
10771081
variables.ping_timeout_server=200;
1082+
variables.fast_forward_grace_close_ms=5000;
10781083
variables.default_schema=strdup((char *)"information_schema");
10791084
variables.handle_unknown_charset=1;
10801085
variables.interfaces=strdup((char *)"");
@@ -2283,6 +2288,7 @@ char ** MySQL_Threads_Handler::get_variables_list() {
22832288
VariablesPointers_int["handle_unknown_charset"] = make_tuple(&variables.handle_unknown_charset, 0, HANDLE_UNKNOWN_CHARSET__MAX_HANDLE_VALUE, false);
22842289
VariablesPointers_int["ping_interval_server_msec"] = make_tuple(&variables.ping_interval_server_msec, 1000, 7*24*3600*1000, false);
22852290
VariablesPointers_int["ping_timeout_server"] = make_tuple(&variables.ping_timeout_server, 10, 600*1000, false);
2291+
VariablesPointers_int["fast_forward_grace_close_ms"] = make_tuple(&variables.fast_forward_grace_close_ms, 0, 3600*1000, false);
22862292
VariablesPointers_int["client_host_cache_size"] = make_tuple(&variables.client_host_cache_size, 0, 1024*1024, false);
22872293
VariablesPointers_int["client_host_error_counts"] = make_tuple(&variables.client_host_error_counts, 0, 1024*1024, false);
22882294
VariablesPointers_int["handle_warnings"] = make_tuple(&variables.handle_warnings, 0, 1, false);
@@ -3748,7 +3754,25 @@ bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned
37483754
// if this is a backend without fast_forward, do not set unhealthy: it will be handled by client library
37493755
if (myds->sess->session_fast_forward) { // if fast forward
37503756
if (myds->myds_type==MYDS_BACKEND) { // and backend
3751-
myds->sess->set_unhealthy(); // set unhealthy
3757+
// myds->sess->set_unhealthy(); // set unhealthy
3758+
// Fast Forward Grace Close Logic:
3759+
// If the backend closed during fast forward mode, we defer session closure to allow
3760+
// pending client output buffers to drain, preventing data loss.
3761+
// Detect if backend closed during fast forward
3762+
if (myds->sess->backend_closed_in_fast_forward == false) {
3763+
myds->sess->backend_closed_in_fast_forward = true;
3764+
//cerr << __FILE__ << ":" << __LINE__ << " grace_start_time from " << myds->sess->fast_forward_grace_start_time << " to " << curtime << endl;
3765+
myds->sess->fast_forward_grace_start_time = curtime;
3766+
}
3767+
if (myds->sess->backend_closed_in_fast_forward) {
3768+
if (myds->PSarrayIN->len == 0 && myds->sess->client_myds->PSarrayOUT->len == 0 && (myds->sess->client_myds->queueOUT.head - myds->sess->client_myds->queueOUT.tail) == 0) {
3769+
// buffers empty, close
3770+
myds->sess->set_unhealthy(); // set unhealthy
3771+
} else if (curtime - myds->sess->fast_forward_grace_start_time > (unsigned long long)mysql_thread___fast_forward_grace_close_ms * 1000) {
3772+
// timeout, close
3773+
myds->sess->set_unhealthy(); // set unhealthy
3774+
}
3775+
}
37523776
}
37533777
}
37543778
}
@@ -3925,9 +3949,17 @@ void MySQL_Thread::ProcessAllSessions_Healthy0(MySQL_Session *sess, unsigned int
39253949
sess->client_myds->addr.port
39263950
);
39273951
} else {
3952+
string extra_info = "";
3953+
if (sess->backend_closed_in_fast_forward == true) {
3954+
unsigned long long lapse = curtime - sess->fast_forward_grace_start_time;
3955+
extra_info = "Yes , " + to_string(lapse/1000) + " ms ago";
3956+
} else {
3957+
extra_info = "No";
3958+
}
39283959
proxy_warning(
3929-
"Closing 'fast_forward' client connection %s:%d\n", sess->client_myds->addr.addr,
3930-
sess->client_myds->addr.port
3960+
"Closing 'fast_forward' client connection %s:%d . Backend already close: %s\n",
3961+
sess->client_myds->addr.addr, sess->client_myds->addr.port,
3962+
extra_info.c_str()
39313963
);
39323964
}
39333965
}
@@ -4137,6 +4169,7 @@ void MySQL_Thread::refresh_variables() {
41374169
REFRESH_VARIABLE_INT(connect_timeout_server);
41384170
REFRESH_VARIABLE_INT(connect_timeout_server_max);
41394171
REFRESH_VARIABLE_INT(free_connections_pct);
4172+
REFRESH_VARIABLE_INT(fast_forward_grace_close_ms);
41404173
#ifdef IDLE_THREADS
41414174
REFRESH_VARIABLE_INT(session_idle_ms);
41424175
#endif // IDLE_THREADS

lib/mysql_data_stream.cpp

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,26 @@ int MySQL_Data_Stream::read_from_net() {
576576
} else {
577577
// Shutdown if we either received the EOF, or operation failed with non-retryable error.
578578
if (ssl_recv_bytes==0 || (ssl_recv_bytes==-1 && errno != EINTR && errno != EAGAIN)) {
579-
proxy_debug(PROXY_DEBUG_NET, 5, "Received EOF, shutting down soft socket -- Session=%p, Datastream=%p\n", sess, this);
579+
/*
580+
* Fast Forward Grace Close Logic:
581+
* When the backend connection closes unexpectedly (EOF) during fast forward mode,
582+
* instead of immediately closing the session, we check if there are pending
583+
* client output buffers. If so, we initiate a grace period to allow the
584+
* buffers to drain before closing the session.
585+
*
586+
* This prevents data loss in fast forward scenarios where ProxySQL forwards
587+
* packets without buffering, and the backend closes before all data is sent.
588+
*/
589+
if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward && ssl_recv_bytes==0) {
590+
if (PSarrayIN->len > 0 || sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) {
591+
if (sess->backend_closed_in_fast_forward == false) {
592+
sess->backend_closed_in_fast_forward = true;
593+
sess->fast_forward_grace_start_time = sess->thread->curtime;
594+
sess->client_myds->defer_close_due_to_fast_forward = true;
595+
}
596+
}
597+
}
598+
proxy_debug(PROXY_DEBUG_NET, 5, "Received EOF, shutting down soft socket -- Session=%p, Datastream=%p", sess, this);
580599
shut_soft();
581600
return -1;
582601
}
@@ -590,6 +609,19 @@ int MySQL_Data_Stream::read_from_net() {
590609
if (encrypted==false) {
591610
int myds_errno=errno;
592611
if (r==0 || (r==-1 && myds_errno != EINTR && myds_errno != EAGAIN)) {
612+
/*
613+
* Fast Forward Grace Close Logic:
614+
* Similar check for non-encrypted connections when backend closes with EOF.
615+
*/
616+
if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward && r==0) {
617+
if (PSarrayIN->len > 0 || sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) {
618+
if (sess->backend_closed_in_fast_forward == false) {
619+
sess->backend_closed_in_fast_forward = true;
620+
sess->fast_forward_grace_start_time = sess->thread->curtime;
621+
sess->client_myds->defer_close_due_to_fast_forward = true;
622+
}
623+
}
624+
}
593625
shut_soft();
594626
}
595627
} else {
@@ -622,7 +654,20 @@ int MySQL_Data_Stream::read_from_net() {
622654
if ( (revents & POLLHUP) ) {
623655
// this is a final check
624656
// Only if the amount of data read is 0 or less, then we check POLLHUP
625-
proxy_debug(PROXY_DEBUG_NET, 5, "Session=%p, Datastream=%p -- shutdown soft. revents=%d , bytes read = %d\n", sess, this, revents, r);
657+
/*
658+
* Fast Forward Grace Close Logic:
659+
* Handle POLLHUP event similarly, initiating grace close if buffers are pending.
660+
*/
661+
if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward) {
662+
if (PSarrayIN->len > 0 || sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) {
663+
if (sess->backend_closed_in_fast_forward == false) {
664+
sess->backend_closed_in_fast_forward = true;
665+
sess->fast_forward_grace_start_time = sess->thread->curtime;
666+
sess->client_myds->defer_close_due_to_fast_forward = true;
667+
}
668+
}
669+
}
670+
proxy_debug(PROXY_DEBUG_NET, 5, "Session=%p, Datastream=%p -- shutdown soft. revents=%d , bytes read = %d", sess, this, revents, r);
626671
shut_soft();
627672
}
628673
} else {
@@ -765,6 +810,22 @@ void MySQL_Data_Stream::set_pollout() {
765810
_pollfd->events = myconn->wait_events;
766811
} else {
767812
_pollfd->events = POLLIN;
813+
if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward && sess->backend_closed_in_fast_forward == true) {
814+
/*
815+
* Fast Forward Grace Close Logic:
816+
* During the grace period after backend closure, we manage polling to avoid busy-waiting.
817+
* If POLLIN is set, poll() will return immediately since the socket is closed,
818+
* causing the thread to spin. To prevent this, we clear POLLIN during the grace period
819+
* and rely on timeouts to eventually close the session.
820+
*/
821+
// this is a fast forward session where the backend connection was already closed
822+
// if we set POLLIN : the thread will spin on poll() until the socket is closed
823+
// if we do not set POLLIN : we won't be able to timeout
824+
if (sess->thread->curtime - sess->fast_forward_grace_start_time < (unsigned long long)mysql_thread___fast_forward_grace_close_ms * 1000) {
825+
// for the reason listed above, we remove POLLIN unless the timeout has reached
826+
_pollfd->events = 0;
827+
}
828+
}
768829
//if (PSarrayOUT->len || available_data_out() || queueOUT.partial || (encrypted && !SSL_is_init_finished(ssl))) {
769830
if (PSarrayOUT->len || available_data_out() || queueOUT.partial) {
770831
_pollfd->events |= POLLOUT;

test/tap/groups/groups.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@
137137
"test_cluster_sync_mysql_servers-t" : [ "default-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
138138
"test_cluster_sync-t" : [ "default-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
139139
"test_com_binlog_dump_enables_fast_forward-t" : [ "default-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
140+
"fast_forward_grace_close_libmysql-t" : [ "default-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
140141
"test_com_register_slave_enables_fast_forward-t" : [ "default-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
141142
"test_com_reset_connection_com_change_user-t" : [ "default-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
142143
"test_connection_annotation-t" : [ "default-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],

test/tap/tests/Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ tests: tests-cpp \
121121
reg_test_stmt_resultset_err_no_rows_libmysql-t \
122122
prepare_statement_err3024_libmysql-t \
123123
prepare_statement_err3024_async-t \
124+
fast_forward_grace_close_libmysql-t \
124125
reg_test_mariadb_stmt_store_result_libmysql-t \
125126
reg_test_mariadb_stmt_store_result_async-t
126127
tests:
@@ -229,6 +230,9 @@ mysql_reconnect_libmariadb-t: mysql_reconnect.cpp $(TAP_LDIR)/libtap.so
229230
mysql_reconnect_libmysql-t: mysql_reconnect.cpp $(TAP_LDIR)/libtap_mysql8.a
230231
$(CXX) -DLIBMYSQL_HELPER8 -DDISABLE_WARNING_COUNT_LOGGING $< -I$(TEST_MYSQL8_IDIR) -I$(TEST_MYSQL8_EDIR) -L$(TEST_MYSQL8_LDIR) -lmysqlclient -ltap_mysql8 -lresolv $(CUSTOMARGS) -o $@
231232

233+
fast_forward_grace_close_libmysql-t: fast_forward_grace_close.cpp $(TAP_LDIR)/libtap_mysql8.a
234+
$(CXX) -DLIBMYSQL_HELPER8 -DDISABLE_WARNING_COUNT_LOGGING $< -I$(TEST_MYSQL8_IDIR) -I$(TEST_MYSQL8_EDIR) -L$(TEST_MYSQL8_LDIR) -lmysqlclient -ltap_mysql8 -lresolv $(CUSTOMARGS) -o $@
235+
232236
test_match_eof_conn_cap_libmysql-t: test_match_eof_conn_cap.cpp $(TAP_LDIR)/libtap_mysql8.a
233237
$(CXX) -DLIBMYSQL_HELPER8 -DDISABLE_WARNING_COUNT_LOGGING $< -I$(TEST_MYSQL8_IDIR) -I$(TEST_MYSQL8_EDIR) -L$(TEST_MYSQL8_LDIR) -lmysqlclient -ltap_mysql8 -lresolv $(CUSTOMARGS) -o $@
234238

0 commit comments

Comments
 (0)