FileWriter

This commit is contained in:
Kenny Daniel 2025-04-08 03:22:30 -07:00
parent 1abab3b490
commit 5c686412c1
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
13 changed files with 302 additions and 52 deletions

1
.gitignore vendored

@ -4,4 +4,5 @@ package-lock.json
*.tgz
.DS_Store
/*.parquet
/data
/types

@ -11,20 +11,18 @@
Hyparquet Writer is a JavaScript library for writing [Apache Parquet](https://parquet.apache.org) files. It is designed to be lightweight, fast and store data very efficiently. It is a companion to the [hyparquet](https://github.com/hyparam/hyparquet) library, which is a JavaScript library for reading parquet files.
## Usage
## Quick Start
Call `parquetWrite` with argument `columnData`. Each column in `columnData` should contain:
To write a parquet file to an `ArrayBuffer` use `parquetWriteBuffer` with argument `columnData`. Each column in `columnData` should contain:
- `name`: the column name
- `data`: an array of same-type values
- `type`: the parquet schema type (optional, type guessed from data if not provided)
Example:
- `type`: the parquet schema type (optional)
```javascript
import { parquetWrite } from 'hyparquet-writer'
import { parquetWriteBuffer } from 'hyparquet-writer'
const arrayBuffer = parquetWrite({
const arrayBuffer = parquetWriteBuffer({
columnData: [
{ name: 'name', data: ['Alice', 'Bob', 'Charlie'], type: 'STRING' },
{ name: 'age', data: [25, 30, 35], type: 'INT32' },
@ -32,19 +30,49 @@ const arrayBuffer = parquetWrite({
})
```
## Options
Note: if `type` is not provided, the type will be guessed from the data. The supported parquet types are:
Options can be passed to `parquetWrite` to change parquet file properties:
- `BOOLEAN`
- `INT32`
- `INT64`
- `FLOAT`
- `DOUBLE`
- `BYTE_ARRAY`
### Node.js Write to Local Parquet File
To write a local parquet file in node.js use `parquetWriteFile` with arguments `filename` and `columnData`:
```javascript
const { parquetWriteFile } = await import('hyparquet-writer')
parquetWriteFile({
filename: 'example.parquet',
columnData: [
{ name: 'name', data: ['Alice', 'Bob', 'Charlie'], type: 'STRING' },
{ name: 'age', data: [25, 30, 35], type: 'INT32' },
],
})
```
Note: hyparquet-writer is published as an ES module, so dynamic `import()` may be required on the command line.
## Advanced Usage
Options can be passed to `parquetWrite` to adjust parquet file writing behavior:
- `writer`: a generic writer object
- `compression`: use snappy compression (default true)
- `statistics`: write column statistics (default true)
- `rowGroupSize`: number of rows in each row group (default 100000)
- `kvMetadata`: extra key-value metadata
- `kvMetadata`: extra key-value metadata to be stored in the parquet footer
```javascript
import { parquetWrite } from 'hyparquet-writer'
import { ByteWriter, parquetWrite } from 'hyparquet-writer'
const writer = new ByteWriter()
const arrayBuffer = parquetWrite({
writer,
columnData: [
{ name: 'name', data: ['Alice', 'Bob', 'Charlie'], type: 'STRING' },
{ name: 'age', data: [25, 30, 35], type: 'INT32' },

@ -1,6 +1,7 @@
/**
* Generic buffered writer.
* Writes data to an auto-expanding ArrayBuffer.
*
* @import {Writer} from '../src/types.js'
* @returns {Writer}

49
src/filewriter.js Normal file

@ -0,0 +1,49 @@
import fs from 'fs'
import { ByteWriter } from './bytewriter.js'
/**
* Buffered file writer.
* Writes data to a local file in chunks using node fs.
*
* @import {Writer} from '../src/types.js'
* @param {string} filename
* @returns {Writer}
*/
export function fileWriter(filename) {
const writer = new ByteWriter()
const chunkSize = 1_000_000 // 1mb
// create a new file or overwrite existing one
fs.writeFileSync(filename, '', { flag: 'w' })
function flush() {
const chunk = writer.buffer.slice(0, writer.offset)
// TODO: async
fs.writeFileSync(filename, new Uint8Array(chunk), { flag: 'a' })
writer.offset = 0
}
/**
* Override the ensure method
* @param {number} size
*/
writer.ensure = function(size) {
if (writer.offset > chunkSize) {
flush()
}
if (writer.offset + size > writer.buffer.byteLength) {
const newSize = Math.max(writer.buffer.byteLength * 2, writer.offset + size)
const newBuffer = new ArrayBuffer(newSize)
new Uint8Array(newBuffer).set(new Uint8Array(writer.buffer))
writer.buffer = newBuffer
writer.view = new DataView(writer.buffer)
}
}
writer.getBuffer = function () {
throw new Error('getBuffer not supported for FileWriter')
}
writer.finish = function() {
flush()
}
return writer
}

@ -1,2 +1,4 @@
export { parquetWrite } from './write.js'
export { parquetWrite, parquetWriteBuffer, parquetWriteFile } from './write.js'
export { ByteWriter } from './bytewriter.js'
export { fileWriter } from './filewriter.js'
export { ParquetWriter } from './parquet-writer.js'

@ -2,10 +2,10 @@ import { writeColumn } from './column.js'
import { writeMetadata } from './metadata.js'
/**
* Create a new ParquetWriter.
* ParquetWriter class allows incremental writing of parquet files.
*
* @import {ColumnChunk, FileMetaData, RowGroup, SchemaElement} from 'hyparquet'
* @import {KeyValue} from 'hyparquet/src/types.js'
* @import {KeyValue} from 'hyparquet/src/types.js' // TODO export from hyparquet
* @import {ColumnData, Writer} from '../src/types.js'
* @param {object} options
* @param {Writer} options.writer

@ -13,10 +13,12 @@ export function getSchemaElementForValues(name, values, type) {
if (values instanceof BigInt64Array) return { name, type: 'INT64', repetition_type: 'REQUIRED' }
if (values instanceof Float32Array) return { name, type: 'FLOAT', repetition_type: 'REQUIRED' }
if (values instanceof Float64Array) return { name, type: 'DOUBLE', repetition_type: 'REQUIRED' }
/** @type {FieldRepetitionType} */
let repetition_type = 'REQUIRED'
/** @type {ConvertedType | undefined} */
let converted_type = undefined
for (const value of values) {
if (value === null || value === undefined) {
repetition_type = 'OPTIONAL'

12
src/types.d.ts vendored

@ -1,4 +1,14 @@
import { DecodedArray, ParquetType } from "hyparquet"
import type { DecodedArray, ParquetType } from 'hyparquet'
import type { KeyValue } from 'hyparquet/src/types.js' // TODO export from hyparquet
export interface ParquetWriteOptions {
writer: Writer
columnData: ColumnData[]
compressed?: boolean
statistics?: boolean
rowGroupSize?: number
kvMetadata?: KeyValue[]
}
export interface ColumnData {
name: string

@ -1,23 +1,16 @@
import { ByteWriter } from './bytewriter.js'
import { fileWriter } from './filewriter.js'
import { ParquetWriter } from './parquet-writer.js'
import { schemaFromColumnData } from './schema.js'
import { ByteWriter } from './bytewriter.js'
/**
* Write data as parquet to an ArrayBuffer
* Write data as parquet to a file or stream.
*
* @import {KeyValue} from 'hyparquet/src/types.js'
* @import {ColumnData} from '../src/types.js'
* @param {object} options
* @param {ColumnData[]} options.columnData
* @param {boolean} [options.compressed]
* @param {boolean} [options.statistics]
* @param {number} [options.rowGroupSize]
* @param {KeyValue[]} [options.kvMetadata]
* @returns {ArrayBuffer}
* @import {ParquetWriteOptions} from '../src/types.js'
* @param {ParquetWriteOptions} options
*/
export function parquetWrite({ columnData, compressed = true, statistics = true, rowGroupSize = 100000, kvMetadata }) {
export function parquetWrite({ writer, columnData, compressed = true, statistics = true, rowGroupSize = 100000, kvMetadata }) {
const schema = schemaFromColumnData(columnData)
const writer = new ByteWriter()
const pq = new ParquetWriter({
writer,
schema,
@ -30,5 +23,27 @@ export function parquetWrite({ columnData, compressed = true, statistics = true,
rowGroupSize,
})
pq.finish()
}
/**
* Write data as parquet to an ArrayBuffer.
*
* @param {Omit<ParquetWriteOptions, 'writer'>} options
* @returns {ArrayBuffer}
*/
export function parquetWriteBuffer(options) {
const writer = new ByteWriter()
parquetWrite({ ...options, writer })
return writer.getBuffer()
}
/**
* Write data as parquet to an ArrayBuffer.
*
* @param {Omit<ParquetWriteOptions, 'writer'> & {filename: string}} options
*/
export function parquetWriteFile(options) {
const { filename, ...rest } = options
const writer = fileWriter(filename)
parquetWrite({ ...rest, writer })
}

97
test/filewriter.test.js Normal file

@ -0,0 +1,97 @@
import fs from 'fs'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { fileWriter } from '../src/filewriter.js'
const filedir = 'data/'
const filename = 'data/filewriter.test.bin'
describe('FileWriter', () => {
beforeEach(() => {
// ensure data directory exists
if (!fs.existsSync(filedir)) {
fs.mkdirSync(filedir)
}
})
afterEach(() => {
// remove test file
if (fs.existsSync(filename)) {
fs.unlinkSync(filename)
}
})
it('throws an error when calling getBuffer', () => {
const writer = fileWriter(filename)
expect(() => writer.getBuffer()).toThrowError('getBuffer not supported')
})
it('writes single byte and flushes on finish', () => {
const writer = fileWriter(filename)
writer.appendUint8(0xff)
writer.finish()
// verify file exists and content is correct
expect(fs.existsSync(filename)).toBe(true)
const contents = fs.readFileSync(filename)
expect(new Uint8Array(contents)).toEqual(new Uint8Array([0xff]))
})
it('writes multiple data types to file', () => {
const writer = fileWriter(filename)
writer.appendUint8(0xab)
writer.appendUint32(0x12345678)
writer.appendInt32(-1)
writer.appendInt64(0x1122334455667788n)
writer.appendVarInt(300)
writer.finish()
const contents = new Uint8Array(fs.readFileSync(filename))
const expected = new Uint8Array([
0xab,
0x78, 0x56, 0x34, 0x12,
0xff, 0xff, 0xff, 0xff,
0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11,
0xac, 0x02,
])
expect(contents).toEqual(expected)
})
it('auto-flushes when exceeding chunk size', () => {
// default chunkSize = 1_000_000 bytes
const writer = fileWriter(filename)
// write slightly over 1mb to trigger auto-flush
const largeArray = new Uint8Array(1_100_000).fill(0xaa)
writer.appendBytes(largeArray)
writer.appendBytes(largeArray)
// expect first flush
expect(fs.statSync(filename).size).toBe(1_100_000)
writer.finish()
// expect final flush
expect(fs.statSync(filename).size).toBe(2_200_000)
})
it('overwrites existing file if new writer is created with same filename', () => {
// first write
let writer = fileWriter(filename)
writer.appendBytes(new Uint8Array([0x11, 0x22]))
writer.finish()
// verify the file now has [0x11, 0x22]
let contents = fs.readFileSync(filename)
expect(new Uint8Array(contents)).toEqual(new Uint8Array([0x11, 0x22]))
// second write
writer = fileWriter(filename)
writer.appendBytes(new Uint8Array([0xaa, 0xbb]))
writer.finish()
// should overwrite the previous content
contents = fs.readFileSync(filename)
expect(new Uint8Array(contents)).toEqual(new Uint8Array([0xaa, 0xbb]))
})
})

@ -147,10 +147,11 @@ describe('writeMetadata', () => {
it('writes metadata and parses in hyparquet', () => {
const writer = new ByteWriter()
// Write header PAR1
// write header PAR1
writer.appendUint32(0x31524150)
// Write metadata
// write metadata
/** @type {FileMetaData} */
const withKvMetadata = {
...exampleMetadata,
key_value_metadata: [
@ -161,14 +162,13 @@ describe('writeMetadata', () => {
}
writeMetadata(writer, withKvMetadata)
// Write footer PAR1
// write footer PAR1
writer.appendUint32(0x31524150)
const file = writer.getBuffer()
const output = parquetMetadata(file)
const outputMetadata = parquetMetadata(file)
/** @type {FileMetaData} */
expect(output).toEqual(withKvMetadata)
expect(outputMetadata).toEqual(withKvMetadata)
})
})

@ -1,6 +1,6 @@
import { parquetMetadata, parquetReadObjects } from 'hyparquet'
import { describe, expect, it } from 'vitest'
import { parquetWrite } from '../src/index.js'
import { parquetWriteBuffer } from '../src/index.js'
import { exampleMetadata } from './metadata.test.js'
/**
@ -11,22 +11,24 @@ import { exampleMetadata } from './metadata.test.js'
* @returns {Promise<Record<string, any>>}
*/
async function roundTripDeserialize(columnData) {
const file = parquetWrite({ columnData })
const file = parquetWriteBuffer({ columnData })
return await parquetReadObjects({ file, utf8: false })
}
const basicData = [
/** @type {ColumnData[]} */
export const basicData = [
{ name: 'bool', data: [true, false, true, false] },
{ name: 'int', data: [0, 127, 0x7fff, 0x7fffffff] },
{ name: 'bigint', data: [0n, 127n, 0x7fffn, 0x7fffffffffffffffn] },
// { name: 'float', data: [0, 0.0001, 123.456, 1e100], type: 'FLOAT' }, // TODO
{ name: 'double', data: [0, 0.0001, 123.456, 1e100] },
{ name: 'string', data: ['a', 'b', 'c', 'd'] },
{ name: 'nullable', data: [true, false, null, null] },
]
describe('parquetWrite', () => {
describe('parquetWriteBuffer', () => {
it('writes expected metadata', () => {
const file = parquetWrite({ columnData: basicData })
const file = parquetWriteBuffer({ columnData: basicData })
const metadata = parquetMetadata(file)
expect(metadata).toEqual(exampleMetadata)
})
@ -47,7 +49,7 @@ describe('parquetWrite', () => {
bool[100] = false
bool[500] = true
bool[9999] = false
const file = parquetWrite({ columnData: [{ name: 'bool', data: bool }] })
const file = parquetWriteBuffer({ columnData: [{ name: 'bool', data: bool }] })
expect(file.byteLength).toBe(160)
const metadata = parquetMetadata(file)
expect(metadata.metadata_length).toBe(98)
@ -63,14 +65,14 @@ describe('parquetWrite', () => {
it('efficiently serializes long string', () => {
const str = 'a'.repeat(10000)
const file = parquetWrite({ columnData: [{ name: 'string', data: [str] }] })
const file = parquetWriteBuffer({ columnData: [{ name: 'string', data: [str] }] })
expect(file.byteLength).toBe(646)
})
it('less efficiently serializes string without compression', () => {
const str = 'a'.repeat(10000)
const columnData = [{ name: 'string', data: [str] }]
const file = parquetWrite({ columnData, compressed: false })
const file = parquetWriteBuffer({ columnData, compressed: false })
expect(file.byteLength).toBe(10175)
})
@ -78,7 +80,7 @@ describe('parquetWrite', () => {
const data = Array(100000)
.fill('aaaa', 0, 50000)
.fill('bbbb', 50000, 100000)
const file = parquetWrite({ columnData: [{ name: 'string', data }], statistics: false })
const file = parquetWriteBuffer({ columnData: [{ name: 'string', data }], statistics: false })
expect(file.byteLength).toBe(178)
// round trip
const result = await parquetReadObjects({ file })
@ -88,8 +90,8 @@ describe('parquetWrite', () => {
})
it('writes statistics when enabled', () => {
const withStats = parquetWrite({ columnData: basicData, statistics: true })
const noStats = parquetWrite({ columnData: basicData, statistics: false })
const withStats = parquetWriteBuffer({ columnData: basicData, statistics: true })
const noStats = parquetWriteBuffer({ columnData: basicData, statistics: false })
expect(withStats.byteLength).toBe(669)
expect(noStats.byteLength).toBe(575)
})
@ -181,7 +183,7 @@ describe('parquetWrite', () => {
it('splits row groups', async () => {
const data = Array(200).fill(13)
const file = parquetWrite({ columnData: [{ name: 'int', data }], rowGroupSize: 100 })
const file = parquetWriteBuffer({ columnData: [{ name: 'int', data }], rowGroupSize: 100 })
const metadata = parquetMetadata(file)
expect(metadata.row_groups.length).toBe(2)
expect(metadata.row_groups[0].num_rows).toBe(100n)
@ -196,31 +198,31 @@ describe('parquetWrite', () => {
})
it('throws for wrong type specified', () => {
expect(() => parquetWrite({ columnData: [{ name: 'int', data: [1, 2, 3], type: 'BOOLEAN' }] }))
expect(() => parquetWriteBuffer({ columnData: [{ name: 'int', data: [1, 2, 3], type: 'BOOLEAN' }] }))
.toThrow('parquet cannot write mixed types')
})
it('throws for empty column with no type specified', () => {
expect(() => parquetWrite({ columnData: [{ name: 'empty', data: [] }] }))
expect(() => parquetWriteBuffer({ columnData: [{ name: 'empty', data: [] }] }))
.toThrow('column empty cannot determine type')
expect(() => parquetWrite({ columnData: [{ name: 'empty', data: [null, null, null, null] }] }))
expect(() => parquetWriteBuffer({ columnData: [{ name: 'empty', data: [null, null, null, null] }] }))
.toThrow('column empty cannot determine type')
})
it('throws for mixed types', () => {
expect(() => parquetWrite({ columnData: [{ name: 'mixed', data: [1, 2, 3, 'boom'] }] }))
expect(() => parquetWriteBuffer({ columnData: [{ name: 'mixed', data: [1, 2, 3, 'boom'] }] }))
.toThrow('mixed types not supported')
})
it('throws error when columns have mismatched lengths', () => {
expect(() => parquetWrite({ columnData: [
expect(() => parquetWriteBuffer({ columnData: [
{ name: 'col1', data: [1, 2, 3] },
{ name: 'col2', data: [4, 5] },
] })).toThrow('columns must have the same length')
})
it('throws error for unsupported data types', () => {
expect(() => parquetWrite({ columnData: [{ name: 'func', data: [() => {}] }] }))
expect(() => parquetWriteBuffer({ columnData: [{ name: 'func', data: [() => {}] }] }))
.toThrow('cannot determine parquet type for: () => {}')
})
})

43
test/write.file.test.js Normal file

@ -0,0 +1,43 @@
import fs from 'fs'
import { asyncBufferFromFile, parquetMetadataAsync, parquetReadObjects } from 'hyparquet'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { exampleMetadata } from './metadata.test.js'
import { parquetWriteFile } from '../src/index.js'
import { basicData } from './write.buffer.test.js'
const filedir = 'data/'
const filename = 'data/write.file.parquet'
describe('parquetWrite with FileWriter', () => {
beforeEach(() => {
// ensure data directory exists
if (!fs.existsSync(filedir)) {
fs.mkdirSync(filedir)
}
})
afterEach(() => {
// remove test file
if (fs.existsSync(filename)) {
fs.unlinkSync(filename)
}
})
it('writes parquet file', async () => {
parquetWriteFile({ filename, columnData: basicData })
// check parquet metadata
const file = await asyncBufferFromFile(filename)
const metadata = await parquetMetadataAsync(file)
expect(metadata).toEqual(exampleMetadata)
// check parquet data
const result = await parquetReadObjects({ file, metadata })
expect(result).toEqual([
{ bool: true, int: 0, bigint: 0n, double: 0, string: 'a', nullable: true },
{ bool: false, int: 127, bigint: 127n, double: 0.0001, string: 'b', nullable: false },
{ bool: true, int: 0x7fff, bigint: 0x7fffn, double: 123.456, string: 'c', nullable: null },
{ bool: false, int: 0x7fffffff, bigint: 0x7fffffffffffffffn, double: 1e100, string: 'd', nullable: null },
])
})
})