mirror of
https://github.com/asadbek064/hyparquet-writer.git
synced 2025-12-05 23:31:54 +00:00
rowGroupSize option
This commit is contained in:
parent
66d8b0358a
commit
6727628aad
@ -7,6 +7,8 @@
|
||||

|
||||
[](https://www.npmjs.com/package/hyparquet-writer?activeTab=dependencies)
|
||||
|
||||
Hyparquet Writer is a JavaScript library for writing [Apache Parquet](https://parquet.apache.org) files. It is designed to be lightweight, fast and store data very efficiently. It is a companion to the [hyparquet](https://github.com/hyparam/hyparquet) library, which is a JavaScript library for reading parquet files.
|
||||
|
||||
## Usage
|
||||
|
||||
Call `parquetWrite` with a list of columns, each column is an object with a `name` and `data` field. The `data` field should be an array of same-type values.
|
||||
|
||||
@ -40,11 +40,11 @@
|
||||
"test": "vitest run"
|
||||
},
|
||||
"dependencies": {
|
||||
"hyparquet": "1.10.2"
|
||||
"hyparquet": "1.10.3"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@babel/eslint-parser": "7.27.0",
|
||||
"@types/node": "22.13.17",
|
||||
"@types/node": "22.14.0",
|
||||
"@vitest/coverage-v8": "3.1.1",
|
||||
"eslint": "9.23.0",
|
||||
"eslint-plugin-jsdoc": "50.6.9",
|
||||
|
||||
85
src/write.js
85
src/write.js
@ -6,74 +6,79 @@ import { getSchemaElementForValues } from './schema.js'
|
||||
/**
|
||||
* Write data as parquet to an ArrayBuffer
|
||||
*
|
||||
* @import {ColumnChunk, DecodedArray, FileMetaData, SchemaElement, SchemaTree} from 'hyparquet'
|
||||
* @import {ColumnChunk, DecodedArray, FileMetaData, RowGroup, SchemaElement, SchemaTree} from 'hyparquet'
|
||||
* @import {KeyValue} from 'hyparquet/src/types.js'
|
||||
* @import {ColumnData} from '../src/types.js'
|
||||
* @param {object} options
|
||||
* @param {ColumnData[]} options.columnData
|
||||
* @param {boolean} [options.compressed]
|
||||
* @param {number} [options.rowGroupSize]
|
||||
* @param {KeyValue[]} [options.kvMetadata]
|
||||
* @returns {ArrayBuffer}
|
||||
*/
|
||||
export function parquetWrite({ columnData, compressed = true, kvMetadata }) {
|
||||
export function parquetWrite({ columnData, compressed = true, rowGroupSize = 100000, kvMetadata }) {
|
||||
const num_rows = columnData.length ? BigInt(columnData[0].data.length) : 0n
|
||||
const writer = new Writer()
|
||||
|
||||
// Check if all columns have the same length
|
||||
const num_rows = columnData.length ? BigInt(columnData[0].data.length) : 0n
|
||||
for (const { data } of columnData) {
|
||||
if (BigInt(data.length) !== num_rows) {
|
||||
throw new Error('columns must have the same length')
|
||||
}
|
||||
}
|
||||
|
||||
// Write header PAR1
|
||||
writer.appendUint32(0x31524150)
|
||||
|
||||
// schema
|
||||
// construct schema
|
||||
/** @type {SchemaElement[]} */
|
||||
const schema = [{
|
||||
name: 'root',
|
||||
num_children: columnData.length,
|
||||
}]
|
||||
|
||||
// row group columns
|
||||
/** @type {ColumnChunk[]} */
|
||||
const columns = []
|
||||
|
||||
// Write columns
|
||||
for (const { name, data, type } of columnData) {
|
||||
// check if all columns have the same length
|
||||
if (BigInt(data.length) !== num_rows) {
|
||||
throw new Error('columns must have the same length')
|
||||
}
|
||||
// auto-detect type
|
||||
const schemaElement = getSchemaElementForValues(name, data, type)
|
||||
if (!schemaElement.type) throw new Error(`column ${name} cannot determine type`)
|
||||
const file_offset = BigInt(writer.offset)
|
||||
/** @type {SchemaElement[]} */
|
||||
const schemaPath = [
|
||||
schema[0],
|
||||
schemaElement,
|
||||
]
|
||||
const meta_data = writeColumn(writer, schemaPath, data, compressed)
|
||||
|
||||
// save metadata
|
||||
schema.push(schemaElement)
|
||||
columns.push({
|
||||
file_path: name,
|
||||
file_offset,
|
||||
meta_data,
|
||||
}
|
||||
|
||||
// write header PAR1
|
||||
writer.appendUint32(0x31524150)
|
||||
|
||||
/** @type {RowGroup[]} */
|
||||
const row_groups = []
|
||||
for (let i = 0; i < num_rows; i += rowGroupSize) {
|
||||
const groupStart = writer.offset
|
||||
|
||||
// row group columns
|
||||
/** @type {ColumnChunk[]} */
|
||||
const columns = []
|
||||
|
||||
// write columns
|
||||
for (let i = 0; i < columnData.length; i++) {
|
||||
const { name, data } = columnData[i]
|
||||
const file_offset = BigInt(writer.offset)
|
||||
const schemaPath = [schema[0], schema[i + 1]]
|
||||
const meta_data = writeColumn(writer, schemaPath, data, compressed)
|
||||
|
||||
// save metadata
|
||||
columns.push({
|
||||
file_path: name,
|
||||
file_offset,
|
||||
meta_data,
|
||||
})
|
||||
}
|
||||
|
||||
row_groups.push({
|
||||
columns,
|
||||
total_byte_size: BigInt(writer.offset - groupStart),
|
||||
num_rows: BigInt(Math.min(rowGroupSize, Number(num_rows) - i)),
|
||||
})
|
||||
}
|
||||
|
||||
// Write metadata
|
||||
// write metadata
|
||||
/** @type {FileMetaData} */
|
||||
const metadata = {
|
||||
version: 2,
|
||||
created_by: 'hyparquet',
|
||||
schema,
|
||||
num_rows,
|
||||
row_groups: [{
|
||||
columns,
|
||||
total_byte_size: BigInt(writer.offset - 4),
|
||||
num_rows,
|
||||
}],
|
||||
row_groups,
|
||||
metadata_length: 0,
|
||||
key_value_metadata: kvMetadata,
|
||||
}
|
||||
@ -81,7 +86,7 @@ export function parquetWrite({ columnData, compressed = true, kvMetadata }) {
|
||||
delete metadata.metadata_length
|
||||
writeMetadata(writer, metadata)
|
||||
|
||||
// Write footer PAR1
|
||||
// write footer PAR1
|
||||
writer.appendUint32(0x31524150)
|
||||
|
||||
return writer.getBuffer()
|
||||
|
||||
@ -172,6 +172,22 @@ describe('parquetWrite', () => {
|
||||
expect(result[5].double).not.toEqual(0)
|
||||
})
|
||||
|
||||
it('splits row groups', async () => {
|
||||
const data = Array(200).fill(13)
|
||||
const file = parquetWrite({ columnData: [{ name: 'int', data }], rowGroupSize: 100 })
|
||||
const metadata = parquetMetadata(file)
|
||||
expect(metadata.row_groups.length).toBe(2)
|
||||
expect(metadata.row_groups[0].num_rows).toBe(100n)
|
||||
expect(metadata.row_groups[1].num_rows).toBe(100n)
|
||||
// round trip
|
||||
const result = await parquetReadObjects({ file })
|
||||
expect(result.length).toBe(200)
|
||||
expect(result[0]).toEqual({ int: 13 })
|
||||
expect(result[99]).toEqual({ int: 13 })
|
||||
expect(result[100]).toEqual({ int: 13 })
|
||||
expect(result[199]).toEqual({ int: 13 })
|
||||
})
|
||||
|
||||
it('throws for wrong type specified', () => {
|
||||
expect(() => parquetWrite({ columnData: [{ name: 'int', data: [1, 2, 3], type: 'BOOLEAN' }] }))
|
||||
.toThrow('parquet cannot write mixed types')
|
||||
|
||||
Loading…
Reference in New Issue
Block a user