@@ -3,135 +3,185 @@ import { BigQueryExport } from './bigquery.js'
3
3
4
4
export class FirestoreBatch {
5
5
constructor ( ) {
6
- this . firestore = new Firestore ( )
6
+ this . firestore = new Firestore ( {
7
+ gaxOptions : {
8
+ grpc : {
9
+ max_receive_message_length : 500 * 1024 * 1024 , // 500MB
10
+ max_send_message_length : 500 * 1024 * 1024 , // 500MB
11
+ 'grpc.max_connection_idle_ms' : 5 * 60 * 1000 , // 5 minutes
12
+ 'grpc.keepalive_time_ms' : 30 * 1000 , // 30 seconds
13
+ 'grpc.keepalive_timeout_ms' : 60 * 1000 , // 1 minute
14
+ 'grpc.keepalive_permit_without_calls' : true
15
+ }
16
+ }
17
+ } )
7
18
this . bigquery = new BigQueryExport ( )
8
- this . batchSize = 500
9
- this . maxConcurrentBatches = 200
19
+
20
+ // Configuration constants
21
+ this . config = {
22
+ timeout : 10 * 60 * 1000 , // 10 minutes
23
+ progressReportInterval : 200000 , // Report progress every N operations
24
+ flushThreshold : 200000 // Flush BulkWriter every N operations
25
+ }
26
+
27
+ this . reset ( )
28
+ }
29
+
30
+ reset ( ) {
31
+ this . processedDocs = 0
32
+ this . totalDocs = 0
33
+ this . bulkWriter = null
10
34
}
11
35
12
- queueBatch ( operation ) {
13
- const batch = this . firestore . batch ( )
14
-
15
- this . currentBatch . forEach ( ( doc ) => {
16
- if ( operation === 'delete' ) {
17
- batch . delete ( doc . ref )
18
- } else if ( operation === 'set' ) {
19
- const docRef = this . firestore . collection ( this . collectionName ) . doc ( )
20
- batch . set ( docRef , doc )
21
- } else {
22
- throw new Error ( 'Invalid operation' )
36
+ createBulkWriter ( operation ) {
37
+ const bulkWriter = this . firestore . bulkWriter ( )
38
+
39
+ // Configure error handling with progress info
40
+ bulkWriter . onWriteError ( ( error ) => {
41
+ const progressInfo = this . totalDocs > 0 ? ` (${ this . processedDocs } /${ this . totalDocs } )` : ''
42
+ console . warn ( `${ operation } operation failed${ progressInfo } :` , error . message )
43
+
44
+ // Retry on transient errors, fail on permanent ones
45
+ const retryableErrors = [ 'deadline-exceeded' , 'unavailable' , 'resource-exhausted' ]
46
+ return retryableErrors . includes ( error . code )
47
+ } )
48
+
49
+ // Track progress on successful writes
50
+ bulkWriter . onWriteResult ( ( ) => {
51
+ this . processedDocs ++
52
+
53
+ // Report progress periodically
54
+ if ( this . processedDocs % this . config . progressReportInterval === 0 ) {
55
+ const progressInfo = this . totalDocs > 0 ? ` (${ this . processedDocs } /${ this . totalDocs } )` : ` (${ this . processedDocs } processed)`
56
+ console . log ( `Progress${ progressInfo } - ${ operation } ing documents in ${ this . collectionName } ` )
23
57
}
24
58
} )
25
- this . batchPromises . push ( batch )
26
- this . currentBatch = [ ]
27
- }
28
59
29
- async commitBatches ( ) {
30
- console . log ( `Committing ${ this . batchPromises . length } batches to ${ this . collectionName } ` )
31
- await Promise . all (
32
- this . batchPromises . map ( async ( batchPromise ) => await batchPromise . commit ( )
33
- . catch ( ( error ) => {
34
- console . error ( 'Error committing batch:' , error )
35
- throw error
36
- } )
37
- )
38
- )
39
- this . batchPromises = [ ]
60
+ return bulkWriter
40
61
}
41
62
42
- async finalFlush ( operation ) {
43
- if ( this . currentBatch . length > 0 ) {
44
- this . queueBatch ( operation )
63
+ buildQuery ( collectionRef ) {
64
+ const queryMap = {
65
+ report : ( ) => {
66
+ console . info ( `Deleting documents from ${ this . collectionName } for date ${ this . date } ` )
67
+ return collectionRef . where ( 'date' , '==' , this . date )
68
+ } ,
69
+ dict : ( ) => {
70
+ console . info ( `Deleting documents from ${ this . collectionName } ` )
71
+ return collectionRef
72
+ }
73
+ }
74
+
75
+ const queryBuilder = queryMap [ this . collectionType ]
76
+ if ( ! queryBuilder ) {
77
+ throw new Error ( `Invalid collection type: ${ this . collectionType } ` )
45
78
}
46
79
47
- if ( this . batchPromises . length > 0 ) {
48
- await this . commitBatches ( )
80
+ return queryBuilder ( )
81
+ }
82
+
83
+ async getDocumentCount ( query ) {
84
+ try {
85
+ const countSnapshot = await query . count ( ) . get ( )
86
+ return countSnapshot . data ( ) . count
87
+ } catch ( error ) {
88
+ console . warn ( 'Could not get document count for progress tracking:' , error . message )
89
+ return 0
49
90
}
50
91
}
51
92
52
93
async batchDelete ( ) {
53
94
console . info ( 'Starting batch deletion...' )
54
95
const startTime = Date . now ( )
55
- this . currentBatch = [ ]
56
- this . batchPromises = [ ]
96
+ this . reset ( )
57
97
58
- let totalDocsDeleted = 0
59
98
const collectionRef = this . firestore . collection ( this . collectionName )
99
+ const collectionQuery = this . buildQuery ( collectionRef )
60
100
61
- let collectionQuery
62
- if ( this . collectionType === 'report' ) {
63
- console . info ( 'Deleting documents from ' + this . collectionName + ' for date ' + this . date )
64
- // Query to fetch monthly documents
65
- collectionQuery = collectionRef . where ( 'date' , '==' , this . date )
66
- } else if ( this . collectionType === 'dict' ) {
67
- console . info ( 'Deleting documents from ' + this . collectionName )
68
- collectionQuery = collectionRef
69
- } else {
70
- throw new Error ( 'Invalid collection type' )
101
+ // Get total count for progress tracking
102
+ this . totalDocs = await this . getDocumentCount ( collectionQuery )
103
+ if ( this . totalDocs > 0 ) {
104
+ console . info ( `Total documents to delete: ${ this . totalDocs } ` )
71
105
}
72
106
73
- while ( true ) {
74
- const snapshot = await collectionQuery . limit ( this . batchSize * this . maxConcurrentBatches ) . get ( )
75
- if ( snapshot . empty ) {
76
- break
77
- }
107
+ // Create BulkWriter for delete operations
108
+ this . bulkWriter = this . createBulkWriter ( 'delet' )
78
109
79
- for await ( const doc of snapshot . docs ) {
80
- this . currentBatch . push ( doc )
110
+ let deletedCount = 0
111
+ const batchSize = this . config . flushThreshold // Process documents in chunks
81
112
82
- if ( this . currentBatch . length >= this . batchSize ) {
83
- this . queueBatch ( 'delete' )
84
- }
85
- if ( this . batchPromises . length >= this . maxConcurrentBatches ) {
86
- await this . commitBatches ( )
87
- }
88
- totalDocsDeleted ++
89
- }
113
+ while ( deletedCount < this . totalDocs || this . totalDocs === 0 ) {
114
+ const snapshot = await collectionQuery . limit ( batchSize ) . get ( )
115
+ if ( snapshot . empty ) break
116
+
117
+ // Add all delete operations to BulkWriter
118
+ snapshot . docs . forEach ( doc => {
119
+ this . bulkWriter . delete ( doc . ref )
120
+ deletedCount ++
121
+ } )
122
+
123
+ // Periodically flush to manage memory
124
+ // if (deletedCount % this.config.flushThreshold === 0) {
125
+ console . log ( `Flushing BulkWriter at ${ deletedCount } operations...` )
126
+ await this . bulkWriter . flush ( )
127
+ // }
90
128
}
91
- await this . finalFlush ( 'delete' )
129
+
130
+ // Final flush and close
131
+ console . log ( 'Finalizing deletion operations...' )
132
+ await this . bulkWriter . close ( )
92
133
93
134
const duration = ( Date . now ( ) - startTime ) / 1000
94
- console . info ( `Deletion complete. Total docs deleted: ${ totalDocsDeleted } . Time: ${ duration } seconds` )
135
+ console . info ( `Deletion complete. Total docs deleted: ${ this . processedDocs } . Time: ${ duration } seconds` )
95
136
}
96
137
97
- /**
98
- * Streams BigQuery query results into a Firestore collection using batch commits.
99
- * @param {string } query - The BigQuery SQL query.
100
- */
101
138
async streamFromBigQuery ( rowStream ) {
102
139
console . info ( 'Starting BigQuery to Firestore transfer...' )
103
140
const startTime = Date . now ( )
104
- let totalRowsProcessed = 0
141
+ this . reset ( )
105
142
106
- this . currentBatch = [ ]
107
- this . batchPromises = [ ]
143
+ // Create BulkWriter for write operations
144
+ this . bulkWriter = this . createBulkWriter ( 'writ' )
145
+
146
+ let rowCount = 0
147
+ const collectionRef = this . firestore . collection ( this . collectionName )
108
148
109
149
for await ( const row of rowStream ) {
110
- this . currentBatch . push ( row )
150
+ // Add document to BulkWriter
151
+ const docRef = collectionRef . doc ( )
152
+ this . bulkWriter . set ( docRef , row )
111
153
112
- // Write batch when it reaches specified size
113
- if ( this . currentBatch . length >= this . batchSize ) {
114
- this . queueBatch ( 'set' )
115
- }
154
+ rowCount ++
155
+ this . totalDocs = rowCount // Update total as we go since we can't predict BigQuery result size
116
156
117
- if ( this . batchPromises . length >= this . maxConcurrentBatches ) {
118
- await this . commitBatches ( )
157
+ // Periodically flush to manage memory
158
+ if ( rowCount % this . config . flushThreshold === 0 ) {
159
+ console . log ( `Flushing BulkWriter at ${ rowCount } operations...` )
160
+ await this . bulkWriter . flush ( )
119
161
}
120
- totalRowsProcessed ++
121
162
}
122
- await this . finalFlush ( 'set' )
163
+
164
+ // Final flush and close
165
+ console . log ( 'Finalizing write operations...' )
166
+ await this . bulkWriter . close ( )
123
167
124
168
const duration = ( Date . now ( ) - startTime ) / 1000
125
- console . info ( `Transfer to ${ this . collectionName } complete. Total rows processed: ${ totalRowsProcessed } . Time: ${ duration } seconds` )
169
+ console . info ( `Transfer to ${ this . collectionName } complete. Total rows processed: ${ this . processedDocs } . Time: ${ duration } seconds` )
126
170
}
127
171
128
172
async export ( query , exportConfig ) {
173
+ // Configure Firestore settings
129
174
this . firestore . settings ( {
130
- databaseId : exportConfig . database
175
+ databaseId : exportConfig . database ,
176
+ timeout : this . config . timeout
177
+ } )
178
+
179
+ // Set instance properties
180
+ Object . assign ( this , {
181
+ collectionName : exportConfig . collection ,
182
+ collectionType : exportConfig . type ,
183
+ date : exportConfig . date
131
184
} )
132
- this . collectionName = exportConfig . collection
133
- this . collectionType = exportConfig . type
134
- this . date = exportConfig . date
135
185
136
186
await this . batchDelete ( )
137
187
0 commit comments