1
+ /**
2
+ * Copyright (c) 2002-2016 "Neo Technology,"
3
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
4
+ *
5
+ * This file is part of Neo4j.
6
+ *
7
+ * Licensed under the Apache License, Version 2.0 (the "License");
8
+ * you may not use this file except in compliance with the License.
9
+ * You may obtain a copy of the License at
10
+ *
11
+ * http://www.apache.org/licenses/LICENSE-2.0
12
+ *
13
+ * Unless required by applicable law or agreed to in writing, software
14
+ * distributed under the License is distributed on an "AS IS" BASIS,
15
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
+ * See the License for the specific language governing permissions and
17
+ * limitations under the License.
18
+ */
19
+ package org .neo4j .driver .internal ;
20
+
21
+ import java .util .List ;
22
+
23
+ import org .neo4j .driver .internal .net .BoltServerAddress ;
24
+ import org .neo4j .driver .internal .net .pooling .PoolSettings ;
25
+ import org .neo4j .driver .internal .net .pooling .SocketConnectionPool ;
26
+ import org .neo4j .driver .internal .security .SecurityPlan ;
27
+ import org .neo4j .driver .internal .spi .Connection ;
28
+ import org .neo4j .driver .internal .util .Consumer ;
29
+ import org .neo4j .driver .internal .util .Supplier ;
30
+ import org .neo4j .driver .v1 .Logging ;
31
+ import org .neo4j .driver .v1 .Record ;
32
+ import org .neo4j .driver .v1 .Session ;
33
+ import org .neo4j .driver .v1 .SessionMode ;
34
+ import org .neo4j .driver .v1 .StatementResult ;
35
+ import org .neo4j .driver .v1 .exceptions .ClientException ;
36
+ import org .neo4j .driver .v1 .exceptions .ConnectionFailureException ;
37
+ import org .neo4j .driver .v1 .exceptions .ServiceUnavailableException ;
38
+
39
+ import static java .lang .String .format ;
40
+
41
+ public class ClusterDriver extends BaseDriver
42
+ {
43
+ private static final String DISCOVER_MEMBERS = "dbms.cluster.discoverEndpointAcquisitionServers" ;
44
+ private static final String ACQUIRE_ENDPOINTS = "dbms.cluster.acquireEndpoints" ;
45
+
46
+ private final Endpoints endpoints = new Endpoints ();
47
+ private final ClusterSettings clusterSettings ;
48
+ private boolean discoverable = true ;
49
+
50
+ public ClusterDriver ( BoltServerAddress seedAddress , ConnectionSettings connectionSettings ,
51
+ ClusterSettings clusterSettings ,
52
+ SecurityPlan securityPlan ,
53
+ PoolSettings poolSettings , Logging logging )
54
+ {
55
+ super ( new SocketConnectionPool ( connectionSettings , securityPlan , poolSettings , logging ),seedAddress , securityPlan , logging );
56
+ this .clusterSettings = clusterSettings ;
57
+ discover ();
58
+ }
59
+
60
+ synchronized void discover ()
61
+ {
62
+ if (!discoverable )
63
+ {
64
+ return ;
65
+ }
66
+
67
+ try
68
+ {
69
+ boolean success = false ;
70
+ while ( !connections .isEmpty () && !success )
71
+ {
72
+ success = call ( DISCOVER_MEMBERS , new Consumer <Record >()
73
+ {
74
+ @ Override
75
+ public void accept ( Record record )
76
+ {
77
+ connections .add (new BoltServerAddress ( record .get ( "address" ).asString () ));
78
+ }
79
+ } );
80
+ }
81
+ if ( !success )
82
+ {
83
+ throw new ServiceUnavailableException ( "Run out of servers" );
84
+ }
85
+ }
86
+ catch ( ClientException ex )
87
+ {
88
+ if ( ex .code ().equals ( "Neo.ClientError.Procedure.ProcedureNotFound" ) )
89
+ {
90
+ //no procedure there, not much to do, stick with what we've got
91
+ //this may happen because server is running in standalone mode
92
+ log .warn ( "Could not find procedure %s" , DISCOVER_MEMBERS );
93
+ discoverable = false ;
94
+ }
95
+ else
96
+ {
97
+ throw ex ;
98
+ }
99
+ }
100
+ }
101
+
102
+ //must be called from a synchronized method
103
+ private boolean call ( String procedureName , Consumer <Record > recorder )
104
+ {
105
+ Connection acquire = null ;
106
+ Session session = null ;
107
+ try {
108
+ acquire = connections .acquire ();
109
+ session = new NetworkSession ( acquire , log );
110
+
111
+ StatementResult records = session .run ( format ( "CALL %s" , procedureName ) );
112
+ while ( records .hasNext () )
113
+ {
114
+ recorder .accept ( records .next () );
115
+ }
116
+ }
117
+ catch ( ConnectionFailureException e )
118
+ {
119
+ if (acquire != null )
120
+ {
121
+ forget ( acquire .address () );
122
+ }
123
+ return false ;
124
+ }
125
+ finally
126
+ {
127
+ if (acquire != null )
128
+ {
129
+ acquire .close ();
130
+ }
131
+ if (session != null )
132
+ {
133
+ session .close ();
134
+ }
135
+ }
136
+ return true ;
137
+ }
138
+
139
+ //must be called from a synchronized method
140
+ private void callWithRetry (String procedureName , Consumer <Record > recorder )
141
+ {
142
+ while ( !connections .isEmpty () )
143
+ {
144
+ Connection acquire = null ;
145
+ Session session = null ;
146
+ try {
147
+ acquire = connections .acquire ();
148
+ session = new NetworkSession ( acquire , log );
149
+ List <Record > list = session .run ( format ( "CALL %s" , procedureName ) ).list ();
150
+ for ( Record record : list )
151
+ {
152
+ recorder .accept ( record );
153
+ }
154
+ //we found results give up
155
+ return ;
156
+ }
157
+ catch ( ConnectionFailureException e )
158
+ {
159
+ if (acquire != null )
160
+ {
161
+ forget ( acquire .address () );
162
+ }
163
+ }
164
+ finally
165
+ {
166
+ if (acquire != null )
167
+ {
168
+ acquire .close ();
169
+ }
170
+ if (session != null )
171
+ {
172
+ session .close ();
173
+ }
174
+ }
175
+ }
176
+
177
+ throw new ServiceUnavailableException ( "Failed to communicate with any of the cluster members" );
178
+ }
179
+
180
+ private synchronized void forget ( BoltServerAddress address )
181
+ {
182
+ connections .purge ( address );
183
+ }
184
+
185
+ @ Override
186
+ public Session session ()
187
+ {
188
+ return session ( SessionMode .WRITE );
189
+ }
190
+
191
+ @ Override
192
+ public Session session ( final SessionMode mode )
193
+ {
194
+ switch ( mode )
195
+ {
196
+ case READ :
197
+ return new ReadNetworkSession ( new Supplier <Connection >()
198
+ {
199
+ @ Override
200
+ public Connection get ()
201
+ {
202
+ return acquireConnection ( mode );
203
+ }
204
+ }, new Consumer <Connection >()
205
+ {
206
+ @ Override
207
+ public void accept ( Connection connection )
208
+ {
209
+ forget ( connection .address () );
210
+ }
211
+ }, clusterSettings , log );
212
+ case WRITE :
213
+ return new WriteNetworkSession ( acquireConnection ( mode ), clusterSettings , log );
214
+ default :
215
+ throw new UnsupportedOperationException ();
216
+ }
217
+ }
218
+
219
+ private synchronized Connection acquireConnection ( SessionMode mode )
220
+ {
221
+ if (!discoverable )
222
+ {
223
+ return connections .acquire ();
224
+ }
225
+
226
+ //if we are short on servers, find new ones
227
+ if ( connections .addressCount () < clusterSettings .minimumNumberOfServers () )
228
+ {
229
+ discover ();
230
+ }
231
+
232
+ endpoints .clear ();
233
+ try
234
+ {
235
+ callWithRetry ( ACQUIRE_ENDPOINTS , new Consumer <Record >()
236
+ {
237
+ @ Override
238
+ public void accept ( Record record )
239
+ {
240
+ String serverMode = record .get ( "role" ).asString ();
241
+ if ( serverMode .equals ( "READ" ) )
242
+ {
243
+ endpoints .readServer = new BoltServerAddress ( record .get ( "address" ).asString () );
244
+ }
245
+ else if ( serverMode .equals ( "WRITE" ) )
246
+ {
247
+ endpoints .writeServer = new BoltServerAddress ( record .get ( "address" ).asString () );
248
+ }
249
+ }
250
+ } );
251
+ }
252
+ catch (ClientException e )
253
+ {
254
+ if ( e .code ().equals ( "Neo.ClientError.Procedure.ProcedureNotFound" ) )
255
+ {
256
+ log .warn ( "Could not find procedure %s" , ACQUIRE_ENDPOINTS );
257
+ discoverable = false ;
258
+ return connections .acquire ();
259
+ }
260
+ throw e ;
261
+ }
262
+
263
+ if ( !endpoints .valid () )
264
+ {
265
+ throw new ServiceUnavailableException ("Could not establish any endpoints for the call" );
266
+ }
267
+
268
+
269
+ switch ( mode )
270
+ {
271
+ case READ :
272
+ return connections .acquire ( endpoints .readServer );
273
+ case WRITE :
274
+ return connections .acquire ( endpoints .writeServer );
275
+ default :
276
+ throw new ClientException ( mode + " is not supported for creating new sessions" );
277
+ }
278
+ }
279
+
280
+ @ Override
281
+ public void close ()
282
+ {
283
+ try
284
+ {
285
+ connections .close ();
286
+ }
287
+ catch ( Exception ex )
288
+ {
289
+ log .error ( format ( "~~ [ERROR] %s" , ex .getMessage () ), ex );
290
+ }
291
+ }
292
+
293
+ private static class Endpoints
294
+ {
295
+ BoltServerAddress readServer ;
296
+ BoltServerAddress writeServer ;
297
+
298
+ public boolean valid ()
299
+ {
300
+ return readServer != null && writeServer != null ;
301
+ }
302
+
303
+ public void clear ()
304
+ {
305
+ readServer = null ;
306
+ writeServer = null ;
307
+ }
308
+ }
309
+
310
+ }
0 commit comments