Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.

Commit d71c423

Browse files
[PIO-189] fix ES6 integration test (#488)
* [PIO-189] fix ES6 integration test
1 parent f762bee commit d71c423

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.json4s.ext.JodaTimeSerializers
3838
import grizzled.slf4j.Logging
3939
import org.apache.http.message.BasicHeader
4040

41-
class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: String)
41+
class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseIndex: String)
4242
extends LEvents with Logging {
4343
implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
4444

@@ -52,6 +52,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
5252

5353
override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
5454
val estype = getEsType(appId, channelId)
55+
val index = baseIndex + "_" + estype
5556
ESUtils.createIndex(client, index,
5657
ESUtils.getNumberOfShards(config, index.toUpperCase),
5758
ESUtils.getNumberOfReplicas(config, index.toUpperCase))
@@ -77,6 +78,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
7778

7879
override def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
7980
val estype = getEsType(appId, channelId)
81+
val index = baseIndex + "_" + estype
8082
try {
8183
val json =
8284
("query" ->
@@ -107,6 +109,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
107109
channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = {
108110
Future {
109111
val estype = getEsType(appId, channelId)
112+
val index = baseIndex + "_" + estype
110113
try {
111114
val id = event.eventId.getOrElse {
112115
ESEventsUtil.getBase64UUID
@@ -152,6 +155,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
152155
channelId: Option[Int])(implicit ec: ExecutionContext): Future[Seq[String]] = {
153156
Future {
154157
val estype = getEsType(appId, channelId)
158+
val index = baseIndex + "_" + estype
155159
try {
156160
val ids = events.map { event =>
157161
event.eventId.getOrElse(ESEventsUtil.getBase64UUID)
@@ -214,6 +218,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
214218
}
215219

216220
private def exists(client: RestClient, estype: String, id: Int): Boolean = {
221+
val index = baseIndex + "_" + estype
217222
try {
218223
client.performRequest(
219224
"GET",
@@ -242,6 +247,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
242247
channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = {
243248
Future {
244249
val estype = getEsType(appId, channelId)
250+
val index = baseIndex + "_" + estype
245251
try {
246252
val json =
247253
("query" ->
@@ -275,6 +281,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
275281
channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = {
276282
Future {
277283
val estype = getEsType(appId, channelId)
284+
val index = baseIndex + "_" + estype
278285
try {
279286
val json =
280287
("query" ->
@@ -311,6 +318,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
311318
(implicit ec: ExecutionContext): Future[Iterator[Event]] = {
312319
Future {
313320
val estype = getEsType(appId, channelId)
321+
val index = baseIndex + "_" + estype
314322
try {
315323
val query = ESUtils.createEventQuery(
316324
startTime, untilTime, entityType, entityId,

storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.json4s.native.JsonMethods._
4141
import org.json4s.ext.JodaTimeSerializers
4242

4343

44-
class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
44+
class ESPEvents(client: RestClient, config: StorageClientConfig, baseIndex: String)
4545
extends PEvents {
4646
implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
4747

@@ -78,6 +78,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
7878
eventNames, targetEntityType, targetEntityId, None)
7979

8080
val estype = getEsType(appId, channelId)
81+
val index = baseIndex + "_" + estype
8182
val conf = new Configuration()
8283
conf.set("es.resource", s"$index/$estype")
8384
conf.set("es.query", query)
@@ -97,6 +98,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
9798
events: RDD[Event],
9899
appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
99100
val estype = getEsType(appId, channelId)
101+
val index = baseIndex + "_" + estype
100102
val conf = Map("es.resource" -> s"$index/$estype", "es.nodes" -> getESNodes())
101103
events.map { event =>
102104
ESEventsUtil.eventToPut(event, appId)
@@ -107,6 +109,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
107109
eventIds: RDD[String],
108110
appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
109111
val estype = getEsType(appId, channelId)
112+
val index = baseIndex + "_" + estype
110113
eventIds.foreachPartition { iter =>
111114
iter.foreach { eventId =>
112115
try {

0 commit comments

Comments
 (0)