Skip to content

Add template_table config #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 15, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/main/java/org/embulk/output/BigqueryOutputPlugin.java
Original file line number Diff line number Diff line change
@@ -116,6 +116,10 @@ public interface PluginTask
Optional<LocalFile> getSchemaFile();
void setSchemaFile(Optional<LocalFile> schemaFile);

@Config("template_table")
@ConfigDefault("null")
Optional<String> getTemplateTable();

@Config("prevent_duplicate_insert")
@ConfigDefault("false")
boolean getPreventDuplicateInsert();
@@ -209,8 +213,12 @@ else if (task.getAuthMethod().getString().equals("private_key")) {
task.getP12Keyfile().transform(localFileToPathString()),
task.getJsonKeyfile().transform(localFileToPathString()),
task.getApplicationName())
.setProject(task.getProject())
.setDataset(task.getDataset())
.setTable(task.getTable())
.setAutoCreateTable(task.getAutoCreateTable())
.setSchemaPath(task.getSchemaFile().transform(localFileToPathString()))
.setTemplateTable(task.getTemplateTable())
.setSourceFormat(task.getSourceFormat().getString())
.setFieldDelimiter(String.valueOf(task.getFieldDelimiter()))
.setMaxBadRecords(task.getMaxBadrecords())
@@ -222,8 +230,6 @@ else if (task.getAuthMethod().getString().equals("private_key")) {
.setIgnoreUnknownValues(task.getIgnoreUnknownValues())
.setAllowQuotedNewlines(task.getAllowQuotedNewlines())
.build();

bigQueryWriter.checkConfig(task.getProject(), task.getDataset(), task.getTable());
}
catch (IOException | GeneralSecurityException ex) {
throw new ConfigException(ex);
64 changes: 58 additions & 6 deletions src/main/java/org/embulk/output/BigqueryWriter.java
Original file line number Diff line number Diff line change
@@ -41,8 +41,12 @@
public class BigqueryWriter
{
private final Logger log = Exec.getLogger(BigqueryWriter.class);
private final String project;
private final String dataset;
private final String table;
private final boolean autoCreateTable;
private final Optional<String> schemaPath;
private final Optional<String> templateTable;
private final TableSchema tableSchema;
private final String sourceFormat;
private final String fieldDelimiter;
@@ -59,8 +63,12 @@ public class BigqueryWriter
public BigqueryWriter(Builder builder)
throws IOException, GeneralSecurityException
{
this.project = builder.project;
this.dataset = builder.dataset;
this.table = builder.table;
this.autoCreateTable = builder.autoCreateTable;
this.schemaPath = builder.schemaPath;
this.templateTable = builder.templateTable;
this.sourceFormat = builder.sourceFormat.toUpperCase();
this.fieldDelimiter = builder.fieldDelimiter;
this.maxBadRecords = builder.maxBadRecords;
@@ -78,8 +86,15 @@ public BigqueryWriter(Builder builder)
);
this.bigQueryClient = auth.getBigqueryClient();

checkConfig();

if (autoCreateTable) {
this.tableSchema = createTableSchema();
if (schemaPath.isPresent()) {
this.tableSchema = createTableSchema();
}
else {
this.tableSchema = fetchTableSchema();
}
}
else {
this.tableSchema = null;
@@ -322,6 +337,15 @@ public TableSchema createTableSchema() throws IOException
}
}

public TableSchema fetchTableSchema() throws IOException
{
String fetchTarget = templateTable.orNull();
log.info(String.format("Fetch table schema from project:%s dataset:%s table:%s", project, dataset, fetchTarget));
Tables tableRequest = bigQueryClient.tables();
Table tableData = tableRequest.get(project, dataset, fetchTarget).execute();
return tableData.getSchema();
}

public boolean isExistTable(String project, String dataset, String table) throws IOException
{
Tables tableRequest = bigQueryClient.tables();
@@ -334,18 +358,18 @@ public boolean isExistTable(String project, String dataset, String table) throws
return true;
}

public void checkConfig(String project, String dataset, String table) throws IOException
public void checkConfig() throws IOException
{
if (autoCreateTable) {
if (!schemaPath.isPresent()) {
throw new FileNotFoundException("schema_file is empty");
}
else {
if (schemaPath.isPresent()) {
File file = new File(schemaPath.orNull());
if (!file.exists()) {
throw new FileNotFoundException("Can not load schema file.");
}
}
else if (!templateTable.isPresent()) {
throw new FileNotFoundException("schema_file or template_table must be present");
}
}
else {
if (!isExistTable(project, dataset, table)) {
@@ -412,8 +436,12 @@ public static class Builder
private Optional<String> p12KeyFilePath;
private Optional<String> jsonKeyFilePath;
private String applicationName;
private String project;
private String dataset;
private String table;
private boolean autoCreateTable;
private Optional<String> schemaPath;
private Optional<String> templateTable;
private String sourceFormat;
private String fieldDelimiter;
private int maxBadRecords;
@@ -435,6 +463,24 @@ public Builder(String authMethod, Optional<String> serviceAccountEmail, Optional
this.applicationName = applicationName;
}

public Builder setProject(String project)
{
this.project = project;
return this;
}

public Builder setDataset(String dataset)
{
this.dataset = dataset;
return this;
}

public Builder setTable(String table)
{
this.table = table;
return this;
}

public Builder setAutoCreateTable(boolean autoCreateTable)
{
this.autoCreateTable = autoCreateTable;
@@ -447,6 +493,12 @@ public Builder setSchemaPath(Optional<String> schemaPath)
return this;
}

public Builder setTemplateTable(Optional<String> templateTable)
{
this.templateTable = templateTable;
return this;
}

public Builder setSourceFormat(String sourceFormat)
{
this.sourceFormat = sourceFormat;