Skip to content

Commit b08653e

Browse files
committed
Events: add support for catalog-level events
1 parent 613b428 commit b08653e

File tree

54 files changed

+1992
-323
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1992
-323
lines changed

catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergCatalogOperation.java

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,28 +26,51 @@
2626
import org.immutables.value.Value;
2727
import org.projectnessie.catalog.formats.iceberg.rest.IcebergUpdateRequirement.AssertCreate;
2828
import org.projectnessie.catalog.model.ops.CatalogOperation;
29+
import org.projectnessie.catalog.model.ops.CatalogOperationType;
2930
import org.projectnessie.model.Content;
31+
import org.projectnessie.model.Content.Type;
3032
import org.projectnessie.model.ContentKey;
3133
import org.projectnessie.nessie.immutables.NessieImmutable;
3234

3335
/** Server-side representation of Iceberg metadata updates. */
3436
@NessieImmutable
3537
@JsonSerialize(as = ImmutableIcebergCatalogOperation.class)
3638
@JsonDeserialize(as = ImmutableIcebergCatalogOperation.class)
37-
public abstract class IcebergCatalogOperation implements CatalogOperation {
39+
public abstract class IcebergCatalogOperation implements CatalogOperation<IcebergMetadataUpdate> {
40+
41+
@Value.Default
3842
@Override
39-
public abstract ContentKey getKey();
43+
public CatalogOperationType getOperationType() {
44+
// note: the default impl cannot detect DROP operations
45+
boolean hasAssertCreate = hasRequirement(AssertCreate.class);
46+
if (getContentType().equals(Type.ICEBERG_TABLE)) {
47+
return hasAssertCreate ? CatalogOperationType.CREATE_TABLE : CatalogOperationType.ALTER_TABLE;
48+
} else if (getContentType().equals(Type.ICEBERG_VIEW)) {
49+
return hasAssertCreate ? CatalogOperationType.CREATE_VIEW : CatalogOperationType.ALTER_VIEW;
50+
} else {
51+
throw new IllegalArgumentException("Unsupported content type: " + getContentType());
52+
}
53+
}
4054

4155
@Override
42-
public abstract Content.Type getType();
56+
public abstract ContentKey getContentKey();
4357

4458
@Override
59+
public abstract Content.Type getContentType();
60+
61+
/**
62+
* The logical warehouse name where this operation will occur. Must correspond to a warehouse
63+
* configured under {@code nessie.catalog.warehouses.<name>}.
64+
*
65+
* <p>If not set, the default warehouse will be used.
66+
*/
4567
@Nullable
4668
public abstract String warehouse();
4769

48-
public abstract List<IcebergMetadataUpdate> updates();
70+
@Override
71+
public abstract List<IcebergMetadataUpdate> getUpdates();
4972

50-
public abstract List<IcebergUpdateRequirement> requirements();
73+
public abstract List<IcebergUpdateRequirement> getRequirements();
5174

5275
public static Builder builder() {
5376
return ImmutableIcebergCatalogOperation.builder();
@@ -56,13 +79,13 @@ public static Builder builder() {
5679
@JsonIgnore
5780
@Value.Derived
5881
public boolean hasRequirement(Class<? extends IcebergUpdateRequirement> requirement) {
59-
return requirements().stream().anyMatch(requirement::isInstance);
82+
return getRequirements().stream().anyMatch(requirement::isInstance);
6083
}
6184

6285
@JsonIgnore
6386
@Value.Derived
6487
public boolean hasUpdate(Class<? extends IcebergMetadataUpdate> update) {
65-
return updates().stream().anyMatch(update::isInstance);
88+
return getUpdates().stream().anyMatch(update::isInstance);
6689
}
6790

6891
/**
@@ -73,7 +96,7 @@ public boolean hasUpdate(Class<? extends IcebergMetadataUpdate> update) {
7396
@Value.Derived
7497
public <T, U extends IcebergMetadataUpdate> T getSingleUpdateValue(
7598
Class<U> update, Function<U, T> mapper) {
76-
return updates().stream()
99+
return getUpdates().stream()
77100
.filter(update::isInstance)
78101
.map(update::cast)
79102
.map(mapper)
@@ -91,10 +114,10 @@ public <T, U extends IcebergMetadataUpdate> T getSingleUpdateValue(
91114
@Value.Check
92115
protected void check() {
93116
if (hasRequirement(AssertCreate.class)) {
94-
if (requirements().size() > 1) {
117+
if (getRequirements().size() > 1) {
95118
throw new IllegalArgumentException(
96119
"Invalid create requirements: "
97-
+ requirements().stream()
120+
+ getRequirements().stream()
98121
.filter(r -> !(r instanceof AssertCreate))
99122
.map(Object::getClass)
100123
.map(Class::getSimpleName)
@@ -105,17 +128,18 @@ protected void check() {
105128

106129
@SuppressWarnings("unused")
107130
public interface Builder {
108-
@CanIgnoreReturnValue
109-
Builder from(CatalogOperation instance);
110131

111132
@CanIgnoreReturnValue
112133
Builder from(IcebergCatalogOperation instance);
113134

114135
@CanIgnoreReturnValue
115-
Builder key(ContentKey key);
136+
Builder operationType(CatalogOperationType type);
137+
138+
@CanIgnoreReturnValue
139+
Builder contentKey(ContentKey key);
116140

117141
@CanIgnoreReturnValue
118-
Builder type(Content.Type type);
142+
Builder contentType(Content.Type type);
119143

120144
@CanIgnoreReturnValue
121145
Builder warehouse(String warehouse);

catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergCatalogOperationTypeResolver.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

0 commit comments

Comments
 (0)