Skip to content

Commit ee61b71

Browse files
authored
Merge pull request #79 from civitaspo/develop
v0.3.2
2 parents eb918f9 + 9119cc1 commit ee61b71

File tree

8 files changed

+142
-9
lines changed

8 files changed

+142
-9
lines changed

CHANGELOG.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1+
0.3.2 (2019-08-06)
2+
==================
3+
4+
* [Fix] `aws.glue.list` bug: `limit` does not work correctly.
5+
* [New feature] Add `athena.each_database>` operator.
6+
17
0.3.1 (2019-08-05)
28
==================
39

410
* [Fix -- `athena.ctas>`] When using `save_mode: overwrite`, delete the specified table and location, not the table location that the data catalog has.
5-
* [New featuere -- `athena.drop_table_multi>`] `protect` option.
11+
* [New feature -- `athena.drop_table_multi>`] `protect` option.
612

713
0.3.0 (2019-07-30)
814
==================

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ _export:
1515
repositories:
1616
- https://jitpack.io
1717
dependencies:
18-
- pro.civitaspo:digdag-operator-athena:0.3.1
18+
- pro.civitaspo:digdag-operator-athena:0.3.2
1919
athena:
2020
auth_method: profile
2121

@@ -282,6 +282,17 @@ Nothing
282282
- **athena.last_partition_exists.table_exists**: `true` if the table exists, or `false` (boolean)
283283
- **athena.last_partition_exists.location_exists**: `true` if the table location exists, or `false`. `null` if not set **with_location** option is `true`. (boolean)
284284

285+
## Configuration for `athena.each_database>` operator
286+
287+
- **catalog_id**: Glue data catalog id if you use a catalog different from account/region default catalog. (string, optional)
288+
- **parallel_slice**: The slice number of parallelism execution for **_do** subtasks. (integer, default: `1`)
289+
- **_do**: The definition of subtasks with exported database information. (config, required)
290+
- exported parameters are below.
291+
- **athena.each_database.export.name**: database name (string)
292+
- **athena.each_database.export.created_at**: epoch millis of creation time (integer)
293+
- **athena.each_database.export.description**: database description (string)
294+
- **athena.each_database.export.parameters**: database parameters that are defined in glue data catalog (conifg)
295+
285296

286297
# Development
287298

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ plugins {
55
}
66

77
group = 'pro.civitaspo'
8-
version = '0.3.1'
8+
version = '0.3.2'
99

1010
def digdagVersion = '0.9.37'
1111
def awsSdkVersion = "1.11.587"

example/example.dig

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ _export:
44
- file://${repos}
55
# - https://jitpack.io
66
dependencies:
7-
- pro.civitaspo:digdag-operator-athena:0.3.1
7+
- pro.civitaspo:digdag-operator-athena:0.3.2
88
athena:
99
auth_method: profile
1010
value: 5
@@ -193,3 +193,12 @@ _export:
193193
c: "10"
194194
+step4:
195195
echo>: ${athena.last_partition_exists}
196+
197+
+step21:
198+
athena.each_database>:
199+
parallel_slice: 8
200+
_do:
201+
+echo1:
202+
echo>: ${athena.each_database}
203+
+echo2:
204+
echo>: ${athena.each_database.name}

src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import pro.civitaspo.digdag.plugin.athena.ctas.AthenaCtasOperator
1313
import pro.civitaspo.digdag.plugin.athena.drop_partition.AthenaDropPartitionOperator
1414
import pro.civitaspo.digdag.plugin.athena.drop_table.AthenaDropTableOperator
1515
import pro.civitaspo.digdag.plugin.athena.drop_table_multi.AthenaDropTableMultiOperator
16+
import pro.civitaspo.digdag.plugin.athena.each_database.AthenaEachDatabaseOperator
1617
import pro.civitaspo.digdag.plugin.athena.partition_exists.AthenaPartitionExistsOperator
1718
import pro.civitaspo.digdag.plugin.athena.preview.AthenaPreviewOperator
1819
import pro.civitaspo.digdag.plugin.athena.query.AthenaQueryOperator
@@ -42,7 +43,8 @@ object AthenaPlugin
4243
operatorFactory("athena.drop_table", classOf[AthenaDropTableOperator]),
4344
operatorFactory("athena.drop_table_multi", classOf[AthenaDropTableMultiOperator]),
4445
operatorFactory("athena.partition_exists?", classOf[AthenaPartitionExistsOperator]),
45-
operatorFactory("athena.table_exists?", classOf[AthenaTableExistsOperator])
46+
operatorFactory("athena.table_exists?", classOf[AthenaTableExistsOperator]),
47+
operatorFactory("athena.each_database", classOf[AthenaEachDatabaseOperator])
4648
)
4749
}
4850

src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/glue/catalog/DatabaseCatalog.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package pro.civitaspo.digdag.plugin.athena.aws.glue.catalog
22

33

4-
import com.amazonaws.services.glue.model.{Database, GetDatabaseRequest}
4+
import com.amazonaws.services.glue.model.{Database, GetDatabaseRequest, GetDatabasesRequest}
55
import pro.civitaspo.digdag.plugin.athena.aws.glue.Glue
66

7+
import scala.jdk.CollectionConverters._
78
import scala.util.Try
89

910

@@ -25,4 +26,29 @@ case class DatabaseCatalog(glue: Glue)
2526
Try(describe(catalogIdOption, database)).isSuccess
2627
}
2728

29+
def list(catalogIdOption: Option[String],
30+
limit: Option[Int] = None): Seq[Database] =
31+
{
32+
val req = new GetDatabasesRequest()
33+
catalogIdOption.foreach(req.setCatalogId)
34+
limit.foreach(l => req.setMaxResults(l))
35+
36+
def recursiveGetDatabases(nextToken: Option[String] = None,
37+
lastDatabases: Seq[Database] = Seq()): Seq[Database] =
38+
{
39+
nextToken.foreach(req.setNextToken)
40+
val results = glue.withGlue(_.getDatabases(req))
41+
val databases = lastDatabases ++ results.getDatabaseList.asScala.toSeq
42+
limit.foreach { i =>
43+
if (databases.length >= i) return databases.slice(0, i)
44+
}
45+
Option(results.getNextToken) match {
46+
case Some(nt) => recursiveGetDatabases(nextToken = Option(nt), lastDatabases = databases)
47+
case None => databases
48+
}
49+
}
50+
51+
recursiveGetDatabases()
52+
}
53+
2854
}

src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/glue/catalog/TableCatalog.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,17 @@ case class TableCatalog(glue: Glue)
5757
expression.foreach(req.setExpression)
5858
limit.foreach(l => req.setMaxResults(l))
5959

60-
def recursiveGetTables(nextToken: Option[String] = None): Seq[Table] =
60+
def recursiveGetTables(nextToken: Option[String] = None,
61+
lastTables: Seq[Table] = Seq()): Seq[Table] =
6162
{
6263
nextToken.foreach(req.setNextToken)
6364
val results = glue.withGlue(_.getTables(req))
64-
val tables = results.getTableList.asScala.toSeq
65+
val tables = lastTables ++ results.getTableList.asScala.toSeq
6566
limit.foreach { i =>
6667
if (tables.length >= i) return tables.slice(0, i)
6768
}
6869
Option(results.getNextToken) match {
69-
case Some(nt) => tables ++ recursiveGetTables(nextToken = Option(nt))
70+
case Some(nt) => recursiveGetTables(nextToken = Option(nt), lastTables = tables)
7071
case None => tables
7172
}
7273
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package pro.civitaspo.digdag.plugin.athena.each_database
2+
3+
4+
import com.amazonaws.services.glue.model.Database
5+
import io.digdag.client.config.Config
6+
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}
7+
import pro.civitaspo.digdag.plugin.athena.AbstractAthenaOperator
8+
9+
import scala.jdk.CollectionConverters._
10+
import scala.util.chaining._
11+
12+
13+
class AthenaEachDatabaseOperator(operatorName: String,
14+
context: OperatorContext,
15+
systemConfig: Config,
16+
templateEngine: TemplateEngine)
17+
extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine)
18+
{
19+
protected val catalogId: Option[String] = Option(params.getOptional("catalog_id", classOf[String]).orNull())
20+
protected val parallelSlice: Int = params.get("parallel_slice", classOf[Int], 1)
21+
protected val doConfig: Config = params.getNested("_do")
22+
23+
override def runTask(): TaskResult =
24+
{
25+
val doConfigs = aws.glue.database.list(catalogId).map { db =>
26+
cf.create().tap { newDoConfig =>
27+
newDoConfig.getNestedOrSetEmpty(s"+${db.getName}").tap { c =>
28+
c.setAll(doConfig)
29+
c.setNested("_export", convertDatabaseToExport(db))
30+
}
31+
}
32+
}
33+
34+
val builder = TaskResult.defaultBuilder(cf)
35+
builder.subtaskConfig(generateParallelTasks(doConfigs = doConfigs))
36+
builder.build()
37+
}
38+
39+
protected def generateParallelTasks(doConfigs: Seq[Config]): Config =
40+
{
41+
def gen(remainingDoConfigs: Seq[Config],
42+
result: Config = cf.create(),
43+
idx: Int = 0): Config =
44+
{
45+
result.tap { r =>
46+
val (left, right) = remainingDoConfigs.splitAt(parallelSlice)
47+
val subTaskConfig = cf.create().tap { c =>
48+
val taskGroup: Config = c.getNestedOrSetEmpty(s"+p$idx")
49+
taskGroup.set("_parallel", true)
50+
left.foreach(taskGroup.setAll)
51+
}
52+
r.setAll(subTaskConfig)
53+
if (right.nonEmpty) gen(remainingDoConfigs = right, result = r, idx = idx + 1)
54+
}
55+
}
56+
57+
gen(remainingDoConfigs = doConfigs)
58+
}
59+
60+
protected def convertDatabaseToExport(database: Database): Config =
61+
{
62+
cf.create().tap { ret =>
63+
val export = ret
64+
.getNestedOrSetEmpty("athena")
65+
.getNestedOrSetEmpty("each_database")
66+
.getNestedOrSetEmpty("export")
67+
68+
export.set("name", database.getName)
69+
Option(database.getCreateTime).foreach(ct => export.set("created_at", ct.toInstant.toEpochMilli))
70+
Option(database.getDescription).foreach(desc => export.set("description", desc))
71+
Option(database.getParameters).foreach { p =>
72+
p.asScala.foreach {
73+
case (k: String, v: String) => export.getNestedOrSetEmpty("parameters").set(k, v)
74+
}
75+
}
76+
}
77+
}
78+
}

0 commit comments

Comments
 (0)