1
+ package org .example ;
2
+
3
+ import com .mongodb .ConnectionString ;
4
+ import com .mongodb .MongoClientSettings ;
5
+
6
+ import com .mongodb .event .*;
7
+ import com .mongodb .reactivestreams .client .*;
8
+ import org .bson .Document ;
9
+ import reactor .core .publisher .Flux ;
10
+
11
+ import java .util .HashMap ;
12
+ import java .util .Map ;
13
+
14
+ public class Monitoring {
15
+
16
+ private static final String COLLECTION = "test_collection" ;
17
+ private static final String DATABASE = "test_db" ;
18
+ private static final ConnectionString URI = new ConnectionString ("<connection string URI>" );
19
+
20
+ public static void main (String [] args ) {
21
+ Monitoring examples = new Monitoring ();
22
+ System .out .println ("\n ---Command Event---\n " );
23
+ examples .monitorCommandEvent ();
24
+ System .out .println ("\n ---Cluster Event---\n " );
25
+ examples .monitorClusterEvent ();
26
+ System .out .println ("\n ---Connection Pool Event---\n " );
27
+ examples .monitorConnectionPoolEvent ();
28
+ }
29
+
30
+ private void monitorCommandEvent () {
31
+ // start-monitor-command-example
32
+ MongoClientSettings settings = MongoClientSettings .builder ()
33
+ .applyConnectionString (URI )
34
+ .addCommandListener (new CommandCounter ())
35
+ .build ();
36
+
37
+ try (MongoClient mongoClient = MongoClients .create (settings )) {
38
+ MongoDatabase database = mongoClient .getDatabase (DATABASE );
39
+ MongoCollection <Document > collection = database .getCollection (COLLECTION );
40
+
41
+ // Run some commands to test the counter
42
+ FindPublisher <Document > findPublisher1 = collection .find ();
43
+ FindPublisher <Document > findPublisher2 = collection .find ();
44
+ Flux .from (findPublisher1 ).blockLast ();
45
+ Flux .from (findPublisher2 ).blockLast ();
46
+ }
47
+ // end-monitor-command-example
48
+ }
49
+
50
+ private void monitorClusterEvent () {
51
+ // start-monitor-cluster-example
52
+ MongoClientSettings settings = MongoClientSettings .builder ()
53
+ .applyConnectionString (URI )
54
+ .applyToClusterSettings (builder ->
55
+ builder .addClusterListener (new IsWritable ()))
56
+ .build ();
57
+
58
+ try (MongoClient mongoClient = MongoClients .create (settings )) {
59
+ MongoDatabase database = mongoClient .getDatabase (DATABASE );
60
+ MongoCollection <Document > collection = database .getCollection (COLLECTION );
61
+
62
+ // Run a command to trigger a ClusterDescriptionChangedEvent
63
+ FindPublisher <Document > findPublisher = collection .find ();
64
+ Flux .from (findPublisher ).blockLast ();
65
+ }
66
+ // end-monitor-cluster-example
67
+ }
68
+
69
+ private void monitorConnectionPoolEvent () {
70
+ // start-monitor-connection-pool-example
71
+ MongoClientSettings settings = MongoClientSettings .builder ()
72
+ .applyConnectionString (URI )
73
+ .applyToConnectionPoolSettings (builder ->
74
+ builder .addConnectionPoolListener (new ConnectionPoolLibrarian ()))
75
+ .build ();
76
+
77
+ try (MongoClient mongoClient = MongoClients .create (settings )) {
78
+ MongoDatabase database = mongoClient .getDatabase (DATABASE );
79
+ MongoCollection <Document > collection = database .getCollection (COLLECTION );
80
+
81
+ // Run a command to trigger connection pool events
82
+ FindPublisher <Document > findPublisher = collection .find ();
83
+ Flux .from (findPublisher ).blockLast ();
84
+ }
85
+ // end-monitor-connection-pool-example
86
+ }
87
+ }
88
+
89
+ // start-command-listener
90
+ class CommandCounter implements CommandListener {
91
+ private final Map <String , Integer > commands = new HashMap <String , Integer >();
92
+
93
+ @ Override
94
+ public synchronized void commandSucceeded (final CommandSucceededEvent event ) {
95
+ String commandName = event .getCommandName ();
96
+ int count = commands .getOrDefault (commandName , 0 );
97
+ commands .put (commandName , count + 1 );
98
+ System .out .println (commands );
99
+ }
100
+
101
+ @ Override
102
+ public void commandFailed (final CommandFailedEvent event ) {
103
+ System .out .printf ("Failed execution of command '%s' with id %s%n" ,
104
+ event .getCommandName (),
105
+ event .getRequestId ());
106
+ }
107
+ }
108
+ // end-command-listener
109
+
110
+ // start-cluster-listener
111
+ class IsWritable implements ClusterListener {
112
+ private boolean isWritable ;
113
+
114
+ @ Override
115
+ public synchronized void clusterDescriptionChanged (final ClusterDescriptionChangedEvent event ) {
116
+ if (!isWritable ) {
117
+ if (event .getNewDescription ().hasWritableServer ()) {
118
+ isWritable = true ;
119
+ System .out .println ("Able to write to server" );
120
+ }
121
+ } else if (!event .getNewDescription ().hasWritableServer ()) {
122
+ isWritable = false ;
123
+ System .out .println ("Unable to write to server" );
124
+ }
125
+ }
126
+ }
127
+ // end-cluster-listener
128
+
129
+ // start-connection-pool-listener
130
+ class ConnectionPoolLibrarian implements ConnectionPoolListener {
131
+ @ Override
132
+ public void connectionCheckedOut (final ConnectionCheckedOutEvent event ) {
133
+ System .out .printf ("Fetching the connection with id %s...%n" ,
134
+ event .getConnectionId ().getLocalValue ());
135
+ }
136
+
137
+ @ Override
138
+ public void connectionCheckOutFailed (final ConnectionCheckOutFailedEvent event ) {
139
+ System .out .println ("Something went wrong! Failed to checkout connection." );
140
+ }
141
+ }
142
+ // end-connection-pool-listener
0 commit comments