rowGroupSize can be an array

This commit is contained in:
Kenny Daniel 2025-07-04 01:25:14 -07:00
parent a15bbc7eef
commit 538740f578
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
3 changed files with 52 additions and 4 deletions

@ -34,13 +34,12 @@ export function ParquetWriter({ writer, schema, compressed = true, statistics =
*
* @param {object} options
* @param {ColumnSource[]} options.columnData
* @param {number} [options.rowGroupSize]
* @param {number | number[]} [options.rowGroupSize]
*/
ParquetWriter.prototype.write = function({ columnData, rowGroupSize = 100000 }) {
const columnDataRows = columnData[0]?.data?.length || 0
for (let groupStartIndex = 0; groupStartIndex < columnDataRows; groupStartIndex += rowGroupSize) {
for (const { groupStartIndex, groupSize } of groupIterator({ columnDataRows, rowGroupSize })) {
const groupStartOffset = this.writer.offset
const groupSize = Math.min(rowGroupSize, columnDataRows - groupStartIndex)
// row group columns
/** @type {ColumnChunk[]} */
@ -93,3 +92,32 @@ ParquetWriter.prototype.finish = function() {
this.writer.appendUint32(0x31524150)
this.writer.finish()
}
/**
* Create an iterator for row groups based on the specified row group size.
* If rowGroupSize is an array, it will return groups based on the sizes in the array.
* When the array runs out, it will continue with the last size.
*
* @param {object} options
* @param {number} options.columnDataRows - Total number of rows in the column data
* @param {number | number[]} options.rowGroupSize - Size of each row group or an array of sizes
* @returns {Array<{groupStartIndex: number, groupSize: number}>}
*/
function groupIterator({ columnDataRows, rowGroupSize }) {
if (Array.isArray(rowGroupSize) && !rowGroupSize.length) {
throw new Error('rowGroupSize array cannot be empty')
}
const groups = []
let groupIndex = 0
let groupStartIndex = 0
while (groupStartIndex < columnDataRows) {
const size = Array.isArray(rowGroupSize)
? rowGroupSize[Math.min(groupIndex, rowGroupSize.length - 1)]
: rowGroupSize
const groupSize = Math.min(size, columnDataRows - groupStartIndex)
groups.push({ groupStartIndex, groupSize })
groupStartIndex += size
groupIndex++
}
return groups
}

2
src/types.d.ts vendored

@ -20,7 +20,7 @@ export interface ParquetWriteOptions {
schema?: SchemaElement[]
compressed?: boolean
statistics?: boolean
rowGroupSize?: number
rowGroupSize?: number | number[]
kvMetadata?: KeyValue[]
}

@ -264,6 +264,26 @@ describe('parquetWriteBuffer', () => {
expect(result[199]).toEqual({ int: 13 })
})
it('splits row groups with custom sizes', async () => {
const data = Array(200).fill(13)
const file = parquetWriteBuffer({ columnData: [{ name: 'int', data }], rowGroupSize: [20, 50] })
const metadata = parquetMetadata(file)
expect(metadata.row_groups.length).toBe(5)
expect(metadata.row_groups[0].num_rows).toBe(20n)
expect(metadata.row_groups[1].num_rows).toBe(50n)
// should use last size for remaining row groups
expect(metadata.row_groups[2].num_rows).toBe(50n)
expect(metadata.row_groups[3].num_rows).toBe(50n)
expect(metadata.row_groups[4].num_rows).toBe(30n)
// round trip
const result = await parquetReadObjects({ file })
expect(result.length).toBe(200)
expect(result[0]).toEqual({ int: 13 })
expect(result[49]).toEqual({ int: 13 })
expect(result[50]).toEqual({ int: 13 })
expect(result[199]).toEqual({ int: 13 })
})
it('throws for wrong type specified', () => {
expect(() => parquetWriteBuffer({ columnData: [{ name: 'int', data: [1, 2, 3], type: 'INT64' }] }))
.toThrow('parquet expected bigint value')