@@ -19,7 +19,7 @@ const _ = require('lodash');
1919const grpc = require ( 'grpc' ) ;
2020
2121const config = require ( '../../../config/config' ) ;
22- const servicesConnector = require ( '../../services/servicesConnector' ) ;
22+ const servicesConnector = config . connectors . traces && require ( '../../services/servicesConnector' ) ; // eslint-disable-line
2323
2424const fetcher = require ( '../../operations/grpcFetcher' ) ;
2525const services = require ( '../../../../static_codegen/anomaly/anomalyReader_grpc_pb' ) ;
@@ -28,83 +28,120 @@ const MetricpointNameEncoder = require('../../utils/encoders/MetricpointNameEnco
2828
2929const metricpointNameEncoder = new MetricpointNameEncoder ( config . encoder ) ;
3030
31- const grpcOptions = {
32- 'grpc.max_receive_message_length' : 10485760 , // todo: do I need these?
33- ...config . connectors . traces . grpcOptions
34- } ;
31+ const grpcOptions = config . grpcOptions || { } ;
3532
3633const connector = { } ;
3734const client = new services . AnomalyReaderClient (
3835 `${ config . connectors . alerts . haystackHost } :${ config . connectors . alerts . haystackPort } ` ,
3936 grpc . credentials . createInsecure ( ) ,
4037 grpcOptions ) ; // TODO make client secure
41- const alertTypes = [ 'durationTP99 ' , 'failureCount ' ] ;
38+ const alertTypes = [ 'duration ' , 'failure-span ' ] ;
4239const getAnomaliesFetcher = fetcher ( 'getAnomalies' , client ) ;
43- const alertFreqInSec = config . connectors . alerts . alertFreqInSec ; // TODO make this based on alert type
40+ const alertFreqInSec = config . connectors . alerts . alertFreqInSec || 300 ; // TODO make this based on alert type
4441
4542
4643function fetchOperations ( serviceName ) {
47- return servicesConnector . getOperations ( serviceName ) ;
44+ return servicesConnector && servicesConnector . getOperations ( serviceName ) ;
45+ }
46+
47+ function sameOperationAndType ( alertToCheck , operationName , type ) {
48+ if ( ! alertToCheck ) {
49+ return false ;
50+ }
51+ const operationToCheck = alertToCheck . labelsMap . find ( label => label [ 0 ] === 'operationName' ) ;
52+ const typeToCheck = alertToCheck . labelsMap . find ( label => label [ 0 ] === 'metric_key' ) ;
53+ return ( ( operationToCheck && operationToCheck [ 1 ] === operationName ) && typeToCheck && typeToCheck [ 1 ] === type ) ;
4854}
4955
5056function parseOperationAlertsResponse ( data ) {
51- return data . searchanomalyresponseList . map ( ( anomalyResponse ) => {
52- const labels = anomalyResponse . labels ;
53-
54- const operationName = labels . operationName ;
55- const alertType = labels . alertType ;
56- const latestUnhealthy = _ . maxBy ( anomalyResponse . anomalies , anomaly => anomaly . timestamp ) ;
57-
58- const isUnhealthy = ( latestUnhealthy && latestUnhealthy . timestamp >= ( Date . now ( ) - alertFreqInSec ) ) ;
59- const timestamp = latestUnhealthy && latestUnhealthy . timestamp ;
60- return {
61- operationName,
62- alertType,
63- isUnhealthy,
64- timestamp
65- } ;
57+ const fullAnomalyList = data . searchanomalyresponseList ;
58+ const mappedAndMergedResponse = fullAnomalyList . map ( ( anomalyResponse , baseIterationIndex ) => {
59+ if ( anomalyResponse === null ) return null ;
60+ const operationLabel = anomalyResponse . labelsMap . find ( label => label [ 0 ] === 'operationName' ) ;
61+ if ( operationLabel ) {
62+ const operationName = operationLabel [ 1 ] ;
63+ const type = anomalyResponse . labelsMap . find ( label => label [ 0 ] === 'metric_key' ) [ 1 ] ;
64+ let anomaliesList = anomalyResponse . anomaliesList ;
65+
66+ fullAnomalyList . slice ( baseIterationIndex + 1 , fullAnomalyList . length ) . forEach ( ( alertToCheck , checkIndex ) => {
67+ if ( sameOperationAndType ( alertToCheck , operationName , type ) ) {
68+ anomaliesList = _ . merge ( anomaliesList , alertToCheck . anomaliesList ) ;
69+ fullAnomalyList [ baseIterationIndex + checkIndex + 1 ] = null ;
70+ }
71+ } ) ;
72+
73+ const latestUnhealthy = _ . maxBy ( anomaliesList , anomaly => anomaly . timestamp ) ;
74+ const timestamp = latestUnhealthy && latestUnhealthy . timestamp * 1000 ;
75+ const isUnhealthy = ( timestamp && timestamp >= ( Date . now ( ) - ( alertFreqInSec * 1000 ) ) ) ;
76+
77+ return {
78+ operationName,
79+ type,
80+ isUnhealthy,
81+ timestamp
82+ } ;
83+ }
84+
85+ return null ;
6686 } ) ;
87+
88+ return _ . filter ( mappedAndMergedResponse , a => a !== null ) ;
6789}
6890
69- function fetchOperationAlerts ( serviceName , interval , from ) {
91+ function fetchAlerts ( serviceName , interval , from , stat , key ) {
7092 const request = new messages . SearchAnamoliesRequest ( ) ;
7193 request . getLabelsMap ( )
7294 . set ( 'serviceName' , metricpointNameEncoder . encodeMetricpointName ( decodeURIComponent ( serviceName ) ) )
7395 . set ( 'interval' , interval )
7496 . set ( 'mtype' , 'gauge' )
75- . set ( 'product' , 'haystack' ) ;
76- request . setStarttime ( from ) ;
77- request . setEndtime ( Date . now ( ) ) ;
97+ . set ( 'product' , 'haystack' )
98+ . set ( 'stat' , stat )
99+ . set ( 'metric_key' , key ) ;
100+ request . setStarttime ( Math . trunc ( from / 1000 ) ) ;
101+ request . setEndtime ( Math . trunc ( Date . now ( ) / 1000 ) ) ;
102+ request . setSize ( - 1 ) ;
78103
79104 return getAnomaliesFetcher
80105 . fetch ( request )
81106 . then ( pbResult => parseOperationAlertsResponse ( messages . SearchAnomaliesResponse . toObject ( false , pbResult ) ) ) ;
82107}
83108
84- function mergeOperationsWithAlerts ( { operationAlerts, operations} ) {
85- return _ . flatten ( operations . map ( operation => alertTypes . map ( ( alertType ) => {
86- const operationAlert = operationAlerts . find ( alert => ( alert . operationName . toLowerCase ( ) === operation . toLowerCase ( ) && alert . type === alertType ) ) ;
109+ function fetchOperationAlerts ( serviceName , interval , from ) {
110+ return Q . all ( [ fetchAlerts ( serviceName , interval , from , '*_99' , 'duration' ) , fetchAlerts ( serviceName , interval , from , 'count' , 'failure-span' ) ] )
111+ . then ( stats => ( _ . merge ( stats [ 0 ] , stats [ 1 ] ) ) ) ;
112+ }
87113
88- if ( operationAlert !== undefined ) {
114+ function mergeOperationsWithAlerts ( { operationAlerts, operations} ) {
115+ if ( operations && operations . length ) {
116+ return _ . flatten ( operations . map ( operation => alertTypes . map ( ( alertType ) => {
117+ const operationAlert = operationAlerts . find ( alert => ( alert . operationName . toLowerCase ( ) === operation . toLowerCase ( ) && alert . type === alertType ) ) ;
118+
119+ if ( operationAlert !== undefined ) {
120+ return {
121+ ...operationAlert
122+ } ;
123+ }
89124 return {
90- ...operationAlert
125+ operationName : operation ,
126+ type : alertType ,
127+ isUnhealthy : false ,
128+ timestamp : null
91129 } ;
92- }
93- return {
94- operationName : operation ,
95- type : alertType ,
96- isUnhealthy : false ,
97- timestamp : null
98- } ;
99- } ) ) ) ;
130+ } ) ) ) ;
131+ }
132+
133+ return _ . flatten ( alertTypes . map ( alertType => ( _ . filter ( operationAlerts , alert => ( alert . type === alertType ) ) ) ) ) ;
100134}
101135
102136function returnAnomalies ( data ) {
103- if ( ! data || ! data . length || ! data [ 0 ] . length ) {
137+ if ( ! data || ! data . length || ! data [ 0 ] . anomaliesList . length ) {
104138 return [ ] ;
105139 }
106140
107- return data [ 0 ] . anomalies ;
141+ return _ . flatten ( data . map ( ( anomaly ) => {
142+ const strength = anomaly . labelsMap . find ( label => label [ 0 ] === 'anomalyLevel' ) [ 1 ] ;
143+ return anomaly . anomaliesList . map ( a => ( { strength, ...a } ) ) ;
144+ } ) ) ;
108145}
109146
110147function getActiveAlertCount ( operationAlerts ) {
@@ -113,7 +150,7 @@ function getActiveAlertCount(operationAlerts) {
113150
114151connector . getServiceAlerts = ( serviceName , interval ) => {
115152 // todo: calculate "from" value based on selected interval
116- const oneDayAgo = Math . trunc ( Date . now ( ) - ( 24 * 60 * 60 * 1000 ) ) ;
153+ const oneDayAgo = Math . trunc ( ( Date . now ( ) - ( 24 * 60 * 60 * 1000 ) ) ) ;
117154 return Q . all ( [ fetchOperations ( serviceName ) , fetchOperationAlerts ( serviceName , interval , oneDayAgo ) ] )
118155 . then ( stats => mergeOperationsWithAlerts ( {
119156 operations : stats [ 0 ] ,
@@ -130,20 +167,21 @@ connector.getAnomalies = (serviceName, operationName, alertType, from, interval)
130167 . set ( 'serviceName' , metricpointNameEncoder . encodeMetricpointName ( decodeURIComponent ( serviceName ) ) )
131168 . set ( 'operationName' , metricpointNameEncoder . encodeMetricpointName ( decodeURIComponent ( operationName ) ) )
132169 . set ( 'product' , 'haystack' )
133- . set ( 'name ' , alertType )
170+ . set ( 'metric_key ' , alertType )
134171 . set ( 'stat' , stat )
135172 . set ( 'interval' , interval )
136173 . set ( 'mtype' , 'gauge' ) ;
137- request . setStarttime ( from ) ;
138- request . setEndtime ( Date . now ( ) ) ;
174+ request . setStarttime ( Math . trunc ( from / 1000 ) ) ;
175+ request . setEndtime ( Math . trunc ( Date . now ( ) / 1000 ) ) ;
176+ request . setSize ( - 1 ) ;
139177
140178 return getAnomaliesFetcher
141179 . fetch ( request )
142- . then ( pbResult => returnAnomalies ( messages . SearchAnomaliesResponse . toObject ( false , pbResult ) ) ) ;
180+ . then ( pbResult => returnAnomalies ( messages . SearchAnomaliesResponse . toObject ( false , pbResult ) . searchanomalyresponseList ) ) ;
143181} ;
144182
145- connector . getServiceUnhealthyAlertCount = serviceName =>
146- fetchOperationAlerts ( serviceName , '5m' , Math . trunc ( Date . now ( ) - ( 5 * 60 * 1000 ) ) )
183+ connector . getServiceUnhealthyAlertCount = ( serviceName , interval ) =>
184+ fetchOperationAlerts ( serviceName , interval , Math . trunc ( ( Date . now ( ) - ( 5 * 60 * 1000 ) ) ) )
147185 . then ( result => getActiveAlertCount ( result ) ) ;
148186
149187module . exports = connector ;
0 commit comments