diff --git a/src/parquet-writer.js b/src/parquet-writer.js index 41f5601..4c6ff20 100644 --- a/src/parquet-writer.js +++ b/src/parquet-writer.js @@ -34,13 +34,12 @@ export function ParquetWriter({ writer, schema, compressed = true, statistics = * * @param {object} options * @param {ColumnSource[]} options.columnData - * @param {number} [options.rowGroupSize] + * @param {number | number[]} [options.rowGroupSize] */ ParquetWriter.prototype.write = function({ columnData, rowGroupSize = 100000 }) { const columnDataRows = columnData[0]?.data?.length || 0 - for (let groupStartIndex = 0; groupStartIndex < columnDataRows; groupStartIndex += rowGroupSize) { + for (const { groupStartIndex, groupSize } of groupIterator({ columnDataRows, rowGroupSize })) { const groupStartOffset = this.writer.offset - const groupSize = Math.min(rowGroupSize, columnDataRows - groupStartIndex) // row group columns /** @type {ColumnChunk[]} */ @@ -93,3 +92,32 @@ ParquetWriter.prototype.finish = function() { this.writer.appendUint32(0x31524150) this.writer.finish() } + +/** + * Create an iterator for row groups based on the specified row group size. + * If rowGroupSize is an array, it will return groups based on the sizes in the array. + * When the array runs out, it will continue with the last size. + * + * @param {object} options + * @param {number} options.columnDataRows - Total number of rows in the column data + * @param {number | number[]} options.rowGroupSize - Size of each row group or an array of sizes + * @returns {Array<{groupStartIndex: number, groupSize: number}>} + */ +function groupIterator({ columnDataRows, rowGroupSize }) { + if (Array.isArray(rowGroupSize) && !rowGroupSize.length) { + throw new Error('rowGroupSize array cannot be empty') + } + const groups = [] + let groupIndex = 0 + let groupStartIndex = 0 + while (groupStartIndex < columnDataRows) { + const size = Array.isArray(rowGroupSize) + ? rowGroupSize[Math.min(groupIndex, rowGroupSize.length - 1)] + : rowGroupSize + const groupSize = Math.min(size, columnDataRows - groupStartIndex) + groups.push({ groupStartIndex, groupSize }) + groupStartIndex += size + groupIndex++ + } + return groups +} diff --git a/src/types.d.ts b/src/types.d.ts index b01b870..f5a168e 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -20,7 +20,7 @@ export interface ParquetWriteOptions { schema?: SchemaElement[] compressed?: boolean statistics?: boolean - rowGroupSize?: number + rowGroupSize?: number | number[] kvMetadata?: KeyValue[] } diff --git a/test/write.buffer.test.js b/test/write.buffer.test.js index 4b875b6..0099307 100644 --- a/test/write.buffer.test.js +++ b/test/write.buffer.test.js @@ -264,6 +264,26 @@ describe('parquetWriteBuffer', () => { expect(result[199]).toEqual({ int: 13 }) }) + it('splits row groups with custom sizes', async () => { + const data = Array(200).fill(13) + const file = parquetWriteBuffer({ columnData: [{ name: 'int', data }], rowGroupSize: [20, 50] }) + const metadata = parquetMetadata(file) + expect(metadata.row_groups.length).toBe(5) + expect(metadata.row_groups[0].num_rows).toBe(20n) + expect(metadata.row_groups[1].num_rows).toBe(50n) + // should use last size for remaining row groups + expect(metadata.row_groups[2].num_rows).toBe(50n) + expect(metadata.row_groups[3].num_rows).toBe(50n) + expect(metadata.row_groups[4].num_rows).toBe(30n) + // round trip + const result = await parquetReadObjects({ file }) + expect(result.length).toBe(200) + expect(result[0]).toEqual({ int: 13 }) + expect(result[49]).toEqual({ int: 13 }) + expect(result[50]).toEqual({ int: 13 }) + expect(result[199]).toEqual({ int: 13 }) + }) + it('throws for wrong type specified', () => { expect(() => parquetWriteBuffer({ columnData: [{ name: 'int', data: [1, 2, 3], type: 'INT64' }] })) .toThrow('parquet expected bigint value')