Skip to content

Commit b8cf1cf

Browse files
committed
feat(analysis): occasionally unblock thread while analysing documents to allow abort
1 parent 0e9bcd7 commit b8cf1cf

File tree

5 files changed

+32
-13
lines changed

5 files changed

+32
-13
lines changed

src/schema-analyzer.ts

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818

1919
import semanticTypes from './semantic-types';
2020
import { AnyIterable } from './types';
21+
import { allowAbort, ALLOW_ABORT_INTERVAL_COUNT } from './util';
2122

2223
type TypeCastMap = {
2324
Array: unknown[];
@@ -484,6 +485,10 @@ export class SchemaAnalyzer {
484485
fields: []
485486
};
486487

488+
// Increments when every field or type is analyzed.
489+
// Useful for occasionally checking if the analysis should be aborted.
490+
fieldAndTypeAnalysisCounter = 0;
491+
487492
constructor(options?: SchemaParseOptions) {
488493
// Set default options.
489494
this.options = { ...defaultSchemaParseOptions, ...options };
@@ -512,6 +517,13 @@ export class SchemaAnalyzer {
512517
}
513518
}
514519

520+
allowAbort() {
521+
// Allow aborting the analysis.
522+
if (this.fieldAndTypeAnalysisCounter++ % ALLOW_ABORT_INTERVAL_COUNT === 0) {
523+
allowAbort();
524+
}
525+
}
526+
515527
increaseFieldCount() {
516528
if (!this.options.distinctFieldsAbortThreshold) return;
517529
this.fieldsCount++;
@@ -531,14 +543,15 @@ export class SchemaAnalyzer {
531543
return returnValue;
532544
}
533545

534-
analyzeDoc(doc: Document) {
546+
async analyzeDoc(doc: Document) {
535547
this.finalized = false;
536548
/**
537549
* Takes a field value, determines the correct type, handles recursion into
538550
* nested arrays and documents, and passes the value down to `addToValue`.
539551
* Note: This mutates the `schema` argument.
540552
*/
541-
const addToType = (path: string[], value: BSONValue, schema: SchemaAnalysisFieldTypes) => {
553+
const addToType = async(path: string[], value: BSONValue, schema: SchemaAnalysisFieldTypes) => {
554+
await this.allowAbort();
542555
const bsonType = getBSONType(value);
543556
// If semantic type detection is enabled, the type is the semantic type
544557
// or the original bson type if no semantic type was detected. If disabled,
@@ -560,13 +573,16 @@ export class SchemaAnalyzer {
560573
type.types = type.types ?? Object.create(null);
561574
type.lengths = type.lengths ?? [];
562575
type.lengths.push((value as BSONValue[]).length);
563-
(value as BSONValue[]).forEach((v: BSONValue) => addToType(path, v, type.types));
576+
for (const v of (value as BSONValue[])) {
577+
await addToType(path, v, type.types);
578+
}
564579
} else if (isDocumentType(type)) {
565580
// Recurse into nested documents by calling `addToField` for all sub-fields.
566581
type.fields = type.fields ?? Object.create(null);
567-
Object.entries(value as Document).forEach(
568-
([fieldName, v]) => addToField(fieldName, [...path, fieldName], v, type.fields)
569-
);
582+
583+
for (const [fieldName, v] of Object.entries(value as Document)) {
584+
await addToField(fieldName, [...path, fieldName], v, type.fields);
585+
}
570586
} else if (this.options.storeValues && !isNullType(type)) {
571587
// When the `storeValues` option is enabled, store some example values.
572588
if (!type.values) {
@@ -584,7 +600,8 @@ export class SchemaAnalyzer {
584600
* Handles a field from a document. Passes the value to `addToType`.
585601
* Note: This mutates the `schema` argument.
586602
*/
587-
const addToField = (fieldName: string, path: string[], value: BSONValue, schema: SchemaAnalysisFieldsMap) => {
603+
const addToField = async(fieldName: string, path: string[], value: BSONValue, schema: SchemaAnalysisFieldsMap) => {
604+
await this.allowAbort();
588605
if (!schema[fieldName]) {
589606
schema[fieldName] = {
590607
name: fieldName,
@@ -597,11 +614,11 @@ export class SchemaAnalyzer {
597614
const field = schema[fieldName];
598615

599616
field.count++;
600-
addToType(path, value, field.types);
617+
await addToType(path, value, field.types);
601618
};
602619

603620
for (const key of Object.keys(doc)) {
604-
addToField(key, [key], doc[key], this.schemaAnalysisRoot.fields);
621+
await addToField(key, [key], doc[key], this.schemaAnalysisRoot.fields);
605622
}
606623
this.schemaAnalysisRoot.count += 1;
607624
}
@@ -652,7 +669,7 @@ export async function getCompletedSchemaAnalyzer(
652669
const analyzer = new SchemaAnalyzer(options);
653670
for await (const doc of verifyStreamSource(source)) {
654671
if (options?.signal?.aborted) throw options.signal.reason;
655-
analyzer.analyzeDoc(doc);
672+
await analyzer.analyzeDoc(doc);
656673
}
657674
return analyzer;
658675
}

src/schema-converters/internalToExpanded.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { ArraySchemaType, DocumentSchemaType, Schema as InternalSchema, SchemaTy
22
import { type ExpandedJSONSchema } from '../types';
33
import { InternalTypeToStandardTypeMap, RELAXED_EJSON_DEFINITIONS } from './internalToStandard';
44
import { InternalTypeToBsonTypeMap } from './internalToMongoDB';
5-
import { allowAbort } from './util';
5+
import { allowAbort } from '../util';
66

77
const createConvertInternalToExpanded = function() {
88
const usedDefinitions = new Set<string>();

src/schema-converters/internalToMongoDB.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
*/
44
import { ArraySchemaType, DocumentSchemaType, Schema as InternalSchema, SchemaType } from '../schema-analyzer';
55
import { MongoDBJSONSchema } from '../types';
6-
import { allowAbort } from './util';
6+
import { allowAbort } from '../util';
77

88
export const InternalTypeToBsonTypeMap: Record<
99
SchemaType['name'] | 'Double' | 'BSONSymbol',

src/schema-converters/internalToStandard.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { JSONSchema4TypeName } from 'json-schema';
22
import { ArraySchemaType, DocumentSchemaType, Schema as InternalSchema, SchemaType } from '../schema-analyzer';
33
import { StandardJSONSchema } from '../types';
4-
import { allowAbort } from './util';
4+
import { allowAbort } from '../util';
55

66
type StandardTypeDefinition = { type: JSONSchema4TypeName, $ref?: never; } | { $ref: string, type?: never };
77

src/schema-converters/util.ts renamed to src/util.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
export const ALLOW_ABORT_INTERVAL_COUNT = 1000;
2+
13
export async function allowAbort(signal?: AbortSignal) {
24
return new Promise<void>((resolve, reject) =>
35
setTimeout(() => {

0 commit comments

Comments
 (0)