Skip to content

fix(NODE-6955): add missing wallTime property to applicable change stream documents #4541

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
51 changes: 38 additions & 13 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
@@ -205,6 +205,16 @@ export interface ChangeStreamDocumentCommon {
splitEvent?: ChangeStreamSplitEvent;
}

/** @public */
export interface ChangeStreamDocumentWallTime {
/**
* The server date and time of the database operation.
* wallTime differs from clusterTime in that clusterTime is a timestamp taken from the oplog entry associated with the database operation event.
* @sinceServerVersion 6.0.0
*/
wallTime?: Date;
}

/** @public */
export interface ChangeStreamDocumentCollectionUUID {
/**
@@ -239,7 +249,8 @@ export interface ChangeStreamDocumentOperationDescription {
export interface ChangeStreamInsertDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'insert';
/** This key will contain the document being inserted */
@@ -255,7 +266,8 @@ export interface ChangeStreamInsertDocument<TSchema extends Document = Document>
export interface ChangeStreamUpdateDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'update';
/**
@@ -285,7 +297,8 @@ export interface ChangeStreamUpdateDocument<TSchema extends Document = Document>
*/
export interface ChangeStreamReplaceDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema> {
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'replace';
/** The fullDocument of a replace event represents the document after the insert of the replacement document */
@@ -309,7 +322,8 @@ export interface ChangeStreamReplaceDocument<TSchema extends Document = Document
export interface ChangeStreamDeleteDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'delete';
/** Namespace the delete event occurred on */
@@ -330,7 +344,8 @@ export interface ChangeStreamDeleteDocument<TSchema extends Document = Document>
*/
export interface ChangeStreamDropDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'drop';
/** Namespace the drop event occurred on */
@@ -343,7 +358,8 @@ export interface ChangeStreamDropDocument
*/
export interface ChangeStreamRenameDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'rename';
/** The new name for the `ns.coll` collection */
@@ -356,7 +372,9 @@ export interface ChangeStreamRenameDocument
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#dropdatabase-event
*/
export interface ChangeStreamDropDatabaseDocument extends ChangeStreamDocumentCommon {
export interface ChangeStreamDropDatabaseDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'dropDatabase';
/** The database dropped */
@@ -367,7 +385,9 @@ export interface ChangeStreamDropDatabaseDocument extends ChangeStreamDocumentCo
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#invalidate-event
*/
export interface ChangeStreamInvalidateDocument extends ChangeStreamDocumentCommon {
export interface ChangeStreamInvalidateDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'invalidate';
}
@@ -380,7 +400,8 @@ export interface ChangeStreamInvalidateDocument extends ChangeStreamDocumentComm
export interface ChangeStreamCreateIndexDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
ChangeStreamDocumentOperationDescription,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'createIndexes';
}
@@ -393,7 +414,8 @@ export interface ChangeStreamCreateIndexDocument
export interface ChangeStreamDropIndexDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
ChangeStreamDocumentOperationDescription,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'dropIndexes';
}
@@ -405,7 +427,8 @@ export interface ChangeStreamDropIndexDocument
*/
export interface ChangeStreamCollModDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'modify';
}
@@ -416,7 +439,8 @@ export interface ChangeStreamCollModDocument
*/
export interface ChangeStreamCreateDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'create';

@@ -435,7 +459,8 @@ export interface ChangeStreamCreateDocument
export interface ChangeStreamShardCollectionDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
ChangeStreamDocumentOperationDescription,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'shardCollection';
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -208,6 +208,7 @@ export type {
ChangeStreamDocumentCommon,
ChangeStreamDocumentKey,
ChangeStreamDocumentOperationDescription,
ChangeStreamDocumentWallTime,
ChangeStreamDropDatabaseDocument,
ChangeStreamDropDocument,
ChangeStreamDropIndexDocument,
20 changes: 20 additions & 0 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
@@ -170,6 +170,26 @@ describe('Change Streams', function () {
}
});

it('contains a wallTime date property on the change', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=6.0.0' } },
async test() {
const collection = db.collection('wallTimeTest');
const changeStream = collection.watch(pipeline);

const willBeChanges = on(changeStream, 'change');
await once(changeStream.cursor, 'init');

await collection.insertOne({ d: 4 });

const change = (await willBeChanges.next()).value[0];

await changeStream.close();

expect(change).to.have.property('wallTime');
expect(change.wallTime).to.be.instanceOf(Date);
}
});

it('should create a ChangeStream on a collection and emit change events', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
14 changes: 14 additions & 0 deletions test/types/change_stream.test-d.ts
Original file line number Diff line number Diff line change
@@ -75,6 +75,7 @@ declare const crudChange: CrudChangeDoc;
expectType<CrudChangeDoc extends ChangeStreamDocumentKey<Schema> ? true : false>(true);
expectType<number>(crudChange.documentKey._id); // _id will get typed
expectType<any>(crudChange.documentKey.blah); // shard keys could be anything
expectType<Date | undefined>(crudChange.wallTime);

// ChangeStreamFullNameSpace
expectType<ChangeStreamNameSpace>(crudChange.ns);
@@ -87,12 +88,14 @@ switch (change.operationType) {
expectType<number>(change.documentKey._id);
expectType<any>(change.documentKey.blah);
expectType<Schema>(change.fullDocument);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'update': {
expectType<ChangeStreamUpdateDocument<Schema>>(change);
expectType<'update'>(change.operationType);
expectType<Schema | undefined>(change.fullDocument); // Update only attaches fullDocument if configured
expectType<Date | undefined>(change.wallTime);
expectType<UpdateDescription<Schema>>(change.updateDescription);
expectType<Partial<Schema> | undefined>(change.updateDescription.updatedFields);
expectType<string[] | undefined>(change.updateDescription.removedFields);
@@ -104,61 +107,72 @@ switch (change.operationType) {
case 'replace': {
expectType<ChangeStreamReplaceDocument<Schema>>(change);
expectType<'replace'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
expectType<Schema>(change.fullDocument);
break;
}
case 'delete': {
expectType<ChangeStreamDeleteDocument<Schema>>(change);
expectType<Date | undefined>(change.wallTime);
expectType<'delete'>(change.operationType);
break;
}
case 'drop': {
expectType<ChangeStreamDropDocument>(change);
expectType<'drop'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
expectType<{ db: string; coll: string }>(change.ns);
break;
}
case 'rename': {
expectType<ChangeStreamRenameDocument>(change);
expectType<'rename'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
expectType<{ db: string; coll: string }>(change.ns);
expectType<{ db: string; coll: string }>(change.to);
break;
}
case 'dropDatabase': {
expectType<ChangeStreamDropDatabaseDocument>(change);
expectType<'dropDatabase'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
expectError(change.ns.coll);
break;
}
case 'invalidate': {
expectType<ChangeStreamInvalidateDocument>(change);
expectType<'invalidate'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'create': {
expectType<ChangeStreamCreateDocument>(change);
expectType<'create'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'modify': {
expectType<ChangeStreamCollModDocument>(change);
expectType<'modify'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'createIndexes': {
expectType<ChangeStreamCreateIndexDocument>(change);
expectType<'createIndexes'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'dropIndexes': {
expectType<ChangeStreamDropIndexDocument>(change);
expectType<'dropIndexes'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'shardCollection': {
expectType<ChangeStreamShardCollectionDocument>(change);
expectType<'shardCollection'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'reshardCollection': {