Skip to content

Commit 0aaff5f

Browse files
committed
Let the vertx-pg-client encoder estimate the capacity of buffer it allocates.
Motivation: The pg encoder allocates Netty buffer without an initial capacity, when pipelining is used this buffer can be reallocated multiple times and in agressive scenario can lead to out of memory errors with the adapative allocator. Changes: Cumulate and encode all the outbound messages at once when flush happens instead of cumulating them in the outbound buffer. Capacity is estimated when a message is enqueued. At flush time, a single buffer of the estimated capacity is created and the outbound messages are written to this buffer. This requires to pre-render some messages to estimate their length, e.g. a json object is pre-rendered to a string that can be then estimated. The client has been modified to handle message preparation which takes care of this and lazy duplicate the tuple / list of tuples when this happens.
1 parent e5f39ea commit 0aaff5f

File tree

30 files changed

+829
-325
lines changed

30 files changed

+829
-325
lines changed

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2ParamDesc.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.vertx.db2client.impl.codec;
1717

18+
import io.vertx.core.VertxException;
1819
import io.vertx.core.buffer.Buffer;
1920
import io.vertx.db2client.impl.drda.ClientTypes;
2021
import io.vertx.db2client.impl.drda.ColumnMetaData;
@@ -35,19 +36,19 @@ ColumnMetaData paramDefinitions() {
3536
return paramDefinitions;
3637
}
3738

38-
public String prepare(TupleInternal values) {
39+
public TupleInternal prepare(TupleInternal values) {
3940
if (values.size() != paramDefinitions.columns_) {
40-
return ErrorMessageFactory.buildWhenArgumentsLengthNotMatched(paramDefinitions.columns_, values.size());
41+
throw new VertxException(ErrorMessageFactory.buildWhenArgumentsLengthNotMatched(paramDefinitions.columns_, values.size()), true);
4142
}
4243
for (int i = 0; i < paramDefinitions.columns_; i++) {
4344
Object val = values.getValue(i);
4445
int type = paramDefinitions.types_[i];
4546
if (!canConvert(val, type)) {
4647
Class<?> preferredType = ClientTypes.preferredJavaType(type);
47-
return ErrorMessageFactory.buildWhenArgumentsTypeNotMatched(preferredType, i, val);
48+
throw new VertxException(ErrorMessageFactory.buildWhenArgumentsTypeNotMatched(preferredType, i, val), true);
4849
}
4950
}
50-
return null;
51+
return values;
5152
}
5253

5354
private static boolean canConvert(Object val, int type) {

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2PreparedStatement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public String sql() {
7474
}
7575

7676
@Override
77-
public String prepare(TupleInternal values) {
77+
public TupleInternal prepare(TupleInternal values) {
7878
return paramDesc.prepare(values);
7979
}
8080

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/MSSQLPreparedStatement.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,4 @@ public String sql() {
4040
return sql;
4141
}
4242

43-
@Override
44-
public String prepare(TupleInternal values) {
45-
return null;
46-
}
4743
}

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package io.vertx.mysqlclient.impl.codec;
1919

20+
import io.vertx.core.VertxException;
2021
import io.vertx.mysqlclient.impl.MySQLParamDesc;
2122
import io.vertx.mysqlclient.impl.MySQLRowDesc;
2223
import io.vertx.mysqlclient.impl.datatype.DataType;
@@ -71,13 +72,13 @@ public String sql() {
7172
}
7273

7374
@Override
74-
public String prepare(TupleInternal values) {
75+
public TupleInternal prepare(TupleInternal values) {
7576
int numberOfParameters = values.size();
7677
int paramDescLength = paramDesc.paramDefinitions().length;
7778
if (numberOfParameters != paramDescLength) {
78-
return ErrorMessageFactory.buildWhenArgumentsLengthNotMatched(paramDescLength, numberOfParameters);
79+
throw new VertxException(ErrorMessageFactory.buildWhenArgumentsLengthNotMatched(paramDescLength, numberOfParameters), true);
7980
} else {
80-
return null;
81+
return values;
8182
}
8283
}
8384

vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/commands/OraclePreparedStatement.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,4 @@ public RowDesc rowDesc() {
5050
public String sql() {
5151
return sql;
5252
}
53-
54-
@Override
55-
public String prepare(TupleInternal values) {
56-
return null;
57-
}
58-
5953
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Bind.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
/**
2121
* @author <a href="mailto:[email protected]">Emad Alblueshi</a>
2222
*/
23-
final class Bind {
23+
final class Bind extends OutboundMessage {
2424

2525
final byte[] statement;
2626
final DataType[] paramTypes;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.vertx.pgclient.impl.codec;
2+
3+
class ClosePortalMessage extends OutboundMessage {
4+
5+
static final ClosePortalMessage INSTANCE = new ClosePortalMessage();
6+
7+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.vertx.pgclient.impl.codec;
2+
3+
class ClosePreparedStatementMessage extends OutboundMessage {
4+
5+
static final ClosePreparedStatementMessage INSTANCE = new ClosePreparedStatementMessage();
6+
7+
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataType.java

Lines changed: 93 additions & 85 deletions
Large diffs are not rendered by default.

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataTypeCodec.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -154,14 +154,14 @@ static void encodeText(DataType id, Object value, ByteBuf buff) {
154154
private static void textEncode(DataType id, Object value, ByteBuf buff) {
155155
switch (id) {
156156
case NUMERIC:
157-
textEncodeNUMERIC((Number) value, buff);
157+
textEncodeNUMERIC((String) value, buff);
158158
break;
159159
case NUMERIC_ARRAY:
160-
textEncodeNUMERIC_ARRAY((Number[]) value, buff);
160+
textEncodeNUMERIC_ARRAY((Object[]) value, buff);
161161
break;
162162
case UNKNOWN:
163163
//default to treating unknown as a string
164-
buff.writeCharSequence(String.valueOf(value), StandardCharsets.UTF_8);
164+
buff.writeCharSequence((CharSequence) value, StandardCharsets.UTF_8);
165165
break;
166166
default:
167167
logger.debug("Data type " + id + " does not support text encoding");
@@ -281,13 +281,13 @@ public static void encodeBinary(DataType id, Object value, ByteBuf buff) {
281281
binaryEncodeArray((UUID[]) value, DataType.UUID, buff);
282282
break;
283283
case JSON:
284-
binaryEncodeJSON((Object) value, buff);
284+
binaryEncodeJSON((CharSequence) value, buff);
285285
break;
286286
case JSON_ARRAY:
287287
binaryEncodeArray((Object[]) value, DataType.JSON, buff);
288288
break;
289289
case JSONB:
290-
binaryEncodeJSONB((Object) value, buff);
290+
binaryEncodeJSONB((CharSequence) value, buff);
291291
break;
292292
case JSONB_ARRAY:
293293
binaryEncodeArray((Object[]) value, DataType.JSONB, buff);
@@ -938,12 +938,11 @@ private static Interval textDecodeINTERVAL(int index, int len, ByteBuf buff) {
938938
return new Interval(years, months, days, hours, minutes, seconds, microseconds);
939939
}
940940

941-
private static void textEncodeNUMERIC(Number value, ByteBuf buff) {
942-
String s = value.toString();
943-
buff.writeCharSequence(s, StandardCharsets.UTF_8);
941+
private static void textEncodeNUMERIC(String value, ByteBuf buff) {
942+
buff.writeCharSequence(value, StandardCharsets.UTF_8);
944943
}
945944

946-
private static void textEncodeNUMERIC_ARRAY(Number[] value, ByteBuf buff) {
945+
private static void textEncodeNUMERIC_ARRAY(Object[] value, ByteBuf buff) {
947946
textEncodeArray(value, DataType.NUMERIC, buff);
948947
}
949948

@@ -1366,14 +1365,8 @@ private static Object binaryDecodeJSON(int index, int len, ByteBuf buff) {
13661365
return textDecodeJSONB(index, len, buff);
13671366
}
13681367

1369-
private static void binaryEncodeJSON(Object value, ByteBuf buff) {
1370-
String s;
1371-
if (value == Tuple.JSON_NULL) {
1372-
s = "null";
1373-
} else {
1374-
s = Json.encode(value);
1375-
}
1376-
buff.writeCharSequence(s, StandardCharsets.UTF_8);
1368+
private static void binaryEncodeJSON(CharSequence value, ByteBuf buff) {
1369+
buff.writeCharSequence(value, StandardCharsets.UTF_8);
13771370
}
13781371

13791372
private static Object textDecodeJSONB(int index, int len, ByteBuf buff) {
@@ -1410,7 +1403,7 @@ private static Object binaryDecodeJSONB(int index, int len, ByteBuf buff) {
14101403
return textDecodeJSONB(index + 1, len - 1, buff);
14111404
}
14121405

1413-
private static void binaryEncodeJSONB(Object value, ByteBuf buff) {
1406+
private static void binaryEncodeJSONB(CharSequence value, ByteBuf buff) {
14141407
buff.writeByte(1); // version
14151408
binaryEncodeJSON(value, buff);
14161409
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package io.vertx.pgclient.impl.codec;
2+
3+
import io.netty.handler.codec.DecoderException;
4+
import io.netty.util.CharsetUtil;
5+
import io.vertx.core.buffer.Buffer;
6+
import io.vertx.pgclient.data.Cidr;
7+
import io.vertx.pgclient.data.Inet;
8+
import io.vertx.pgclient.data.Path;
9+
import io.vertx.pgclient.data.Polygon;
10+
11+
import java.net.Inet4Address;
12+
import java.net.Inet6Address;
13+
import java.net.InetAddress;
14+
15+
/**
16+
*/
17+
class DataTypeEstimator {
18+
19+
static final int UNSUPPORTED = 0;
20+
private static final int UTF8 = -1;
21+
22+
static final int NUMERIC = -2;
23+
static final int NUMERIC_ARRAY = -7;
24+
static final int BUFFER = -3;
25+
26+
static final int UNKNOWN = -6;
27+
28+
static final int BOOL = 1;
29+
static final int INT2 = 2;
30+
static final int INT4 = 4;
31+
static final int INT8 = 8;
32+
static final int FLOAT4 = 4;
33+
static final int FLOAT8 = 8;
34+
35+
static final int CHAR = UTF8;
36+
static final int VARCHAR = UTF8;
37+
static final int BPCHAR = UTF8;
38+
static final int TEXT = UTF8;
39+
static final int NAME = UTF8;
40+
41+
static final int DATE = 4;
42+
static final int TIME = 8;
43+
static final int TIMETZ = 12;
44+
static final int TIMESTAMP = 8;
45+
static final int TIMESTAMPTZ = 8;
46+
static final int INTERVAL = 16;
47+
48+
static final int BYTEA = BUFFER;
49+
50+
static final int INET = -10;
51+
static final int CIDR = -9;
52+
static final int UUID = 16;
53+
54+
static final int JSON = UTF8;
55+
static final int JSONB = -8;
56+
57+
static final int MONEY = 8;
58+
59+
static final int POINT = 16;
60+
static final int LINE = 24;
61+
static final int LSEG = 32;
62+
static final int BOX = 32;
63+
static final int CIRCLE = 24;
64+
static final int POLYGON = -4;
65+
static final int PATH = -5;
66+
67+
// Eventually make this configurable per options
68+
private static final float AVG_BYTES_PER_CHAR_UTF8 = CharsetUtil.encoder(CharsetUtil.UTF_8).averageBytesPerChar();
69+
70+
static int estimateUTF8(String s) {
71+
return (int)(s.length() * AVG_BYTES_PER_CHAR_UTF8);
72+
}
73+
74+
static int estimateByteArray(byte[] b) {
75+
return b.length;
76+
}
77+
78+
static int estimateCStringUTF8(String s) {
79+
return estimateUTF8(s) + 1;
80+
}
81+
82+
private static int estimateUnknown(String value) {
83+
return estimateUTF8(value);
84+
}
85+
86+
private static int estimateJSONB(String value) {
87+
return 1 + estimateUTF8(value);
88+
}
89+
90+
private static int estimateNumeric(String value) {
91+
return estimateUTF8(value);
92+
}
93+
94+
private static int estimateInetOrCidr(Cidr value) {
95+
return estimateInetOrCidr(value.getAddress());
96+
}
97+
98+
private static int estimateInetOrCidr(Inet value) {
99+
return estimateInetOrCidr(value.getAddress());
100+
}
101+
102+
private static int estimateInetOrCidr(InetAddress address) {
103+
int len;
104+
if (address instanceof Inet6Address) {
105+
Inet6Address inet6Address = (Inet6Address) address;
106+
len = inet6Address.getAddress().length;
107+
} else if (address instanceof Inet4Address) {
108+
Inet4Address inet4Address = (Inet4Address) address;
109+
len = inet4Address.getAddress().length;
110+
} else {
111+
// Invalid
112+
len = 0;
113+
}
114+
return 1 + 1 + 1 + 1 + len;
115+
}
116+
117+
private static int estimateNumericArray(Object[] value) {
118+
int length = 1;
119+
for (Object elt : value) {
120+
length += elt == null ? 4 : estimateNumeric((String) elt);
121+
}
122+
length += value.length;
123+
return length;
124+
}
125+
126+
private static int estimateBuffer(Buffer b) {
127+
return b.length();
128+
}
129+
130+
private static int estimatePolygon(Polygon p) {
131+
return 4 + p.getPoints().size() * 16;
132+
}
133+
134+
private static int estimatePath(Path p) {
135+
return 1 + 4 + p.getPoints().size() * 16;
136+
}
137+
138+
static int estimate(int estimator, Object o) {
139+
if (estimator > 0) {
140+
return estimator;
141+
} else {
142+
switch (estimator) {
143+
case DataTypeEstimator.CIDR:
144+
return estimateInetOrCidr((Cidr) o);
145+
case DataTypeEstimator.INET:
146+
return estimateInetOrCidr((Inet) o);
147+
case DataTypeEstimator.JSONB:
148+
return estimateJSONB((String) o);
149+
case DataTypeEstimator.UNKNOWN:
150+
return estimateUnknown((String) o);
151+
case DataTypeEstimator.NUMERIC:
152+
return estimateNumeric((String) o);
153+
case DataTypeEstimator.NUMERIC_ARRAY:
154+
return estimateNumericArray((Object[]) o);
155+
case DataTypeEstimator.UTF8:
156+
return estimateUTF8((String) o);
157+
case DataTypeEstimator.BUFFER:
158+
return estimateBuffer((Buffer) o);
159+
case DataTypeEstimator.POLYGON:
160+
return estimatePolygon((Polygon) o);
161+
case DataTypeEstimator.PATH:
162+
return estimatePath((Path) o);
163+
default:
164+
throw new UnsupportedOperationException();
165+
}
166+
}
167+
}
168+
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Describe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
/**
2121
* @author <a href="mailto:[email protected]">Emad Alblueshi</a>
2222
*/
23-
class Describe {
23+
class Describe extends OutboundMessage {
2424

2525
final byte[] statement;
2626
final String portal;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.vertx.pgclient.impl.codec;
2+
3+
class ExecuteMessage extends OutboundMessage {
4+
5+
static final ExecuteMessage INSTANCE = new ExecuteMessage();
6+
7+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package io.vertx.pgclient.impl.codec;
2+
3+
public class OutboundMessage {
4+
}

0 commit comments

Comments
 (0)