diff --git a/.gitignore b/.gitignore index 43afcc4..f9373bc 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ package-lock.json *.tgz .DS_Store /*.parquet +/data /types diff --git a/README.md b/README.md index 0e6e185..c19bc81 100644 --- a/README.md +++ b/README.md @@ -11,20 +11,18 @@ Hyparquet Writer is a JavaScript library for writing [Apache Parquet](https://parquet.apache.org) files. It is designed to be lightweight, fast and store data very efficiently. It is a companion to the [hyparquet](https://github.com/hyparam/hyparquet) library, which is a JavaScript library for reading parquet files. -## Usage +## Quick Start -Call `parquetWrite` with argument `columnData`. Each column in `columnData` should contain: +To write a parquet file to an `ArrayBuffer` use `parquetWriteBuffer` with argument `columnData`. Each column in `columnData` should contain: - `name`: the column name - `data`: an array of same-type values -- `type`: the parquet schema type (optional, type guessed from data if not provided) - -Example: +- `type`: the parquet schema type (optional) ```javascript -import { parquetWrite } from 'hyparquet-writer' +import { parquetWriteBuffer } from 'hyparquet-writer' -const arrayBuffer = parquetWrite({ +const arrayBuffer = parquetWriteBuffer({ columnData: [ { name: 'name', data: ['Alice', 'Bob', 'Charlie'], type: 'STRING' }, { name: 'age', data: [25, 30, 35], type: 'INT32' }, @@ -32,19 +30,49 @@ const arrayBuffer = parquetWrite({ }) ``` -## Options +Note: if `type` is not provided, the type will be guessed from the data. The supported parquet types are: -Options can be passed to `parquetWrite` to change parquet file properties: +- `BOOLEAN` +- `INT32` +- `INT64` +- `FLOAT` +- `DOUBLE` +- `BYTE_ARRAY` +### Node.js Write to Local Parquet File + +To write a local parquet file in node.js use `parquetWriteFile` with arguments `filename` and `columnData`: + +```javascript +const { parquetWriteFile } = await import('hyparquet-writer') + +parquetWriteFile({ + filename: 'example.parquet', + columnData: [ + { name: 'name', data: ['Alice', 'Bob', 'Charlie'], type: 'STRING' }, + { name: 'age', data: [25, 30, 35], type: 'INT32' }, + ], +}) +``` + +Note: hyparquet-writer is published as an ES module, so dynamic `import()` may be required on the command line. + +## Advanced Usage + +Options can be passed to `parquetWrite` to adjust parquet file writing behavior: + + - `writer`: a generic writer object - `compression`: use snappy compression (default true) - `statistics`: write column statistics (default true) - `rowGroupSize`: number of rows in each row group (default 100000) - - `kvMetadata`: extra key-value metadata + - `kvMetadata`: extra key-value metadata to be stored in the parquet footer ```javascript -import { parquetWrite } from 'hyparquet-writer' +import { ByteWriter, parquetWrite } from 'hyparquet-writer' +const writer = new ByteWriter() const arrayBuffer = parquetWrite({ + writer, columnData: [ { name: 'name', data: ['Alice', 'Bob', 'Charlie'], type: 'STRING' }, { name: 'age', data: [25, 30, 35], type: 'INT32' }, diff --git a/src/bytewriter.js b/src/bytewriter.js index 58f5037..d4ab57d 100644 --- a/src/bytewriter.js +++ b/src/bytewriter.js @@ -1,6 +1,7 @@ /** * Generic buffered writer. + * Writes data to an auto-expanding ArrayBuffer. * * @import {Writer} from '../src/types.js' * @returns {Writer} diff --git a/src/filewriter.js b/src/filewriter.js new file mode 100644 index 0000000..34a748f --- /dev/null +++ b/src/filewriter.js @@ -0,0 +1,49 @@ +import fs from 'fs' +import { ByteWriter } from './bytewriter.js' + +/** + * Buffered file writer. + * Writes data to a local file in chunks using node fs. + * + * @import {Writer} from '../src/types.js' + * @param {string} filename + * @returns {Writer} + */ +export function fileWriter(filename) { + const writer = new ByteWriter() + const chunkSize = 1_000_000 // 1mb + + // create a new file or overwrite existing one + fs.writeFileSync(filename, '', { flag: 'w' }) + + function flush() { + const chunk = writer.buffer.slice(0, writer.offset) + // TODO: async + fs.writeFileSync(filename, new Uint8Array(chunk), { flag: 'a' }) + writer.offset = 0 + } + + /** + * Override the ensure method + * @param {number} size + */ + writer.ensure = function(size) { + if (writer.offset > chunkSize) { + flush() + } + if (writer.offset + size > writer.buffer.byteLength) { + const newSize = Math.max(writer.buffer.byteLength * 2, writer.offset + size) + const newBuffer = new ArrayBuffer(newSize) + new Uint8Array(newBuffer).set(new Uint8Array(writer.buffer)) + writer.buffer = newBuffer + writer.view = new DataView(writer.buffer) + } + } + writer.getBuffer = function () { + throw new Error('getBuffer not supported for FileWriter') + } + writer.finish = function() { + flush() + } + return writer +} diff --git a/src/index.js b/src/index.js index f6f2dd4..8dc88e4 100644 --- a/src/index.js +++ b/src/index.js @@ -1,2 +1,4 @@ -export { parquetWrite } from './write.js' +export { parquetWrite, parquetWriteBuffer, parquetWriteFile } from './write.js' +export { ByteWriter } from './bytewriter.js' +export { fileWriter } from './filewriter.js' export { ParquetWriter } from './parquet-writer.js' diff --git a/src/parquet-writer.js b/src/parquet-writer.js index 82c3e24..819d536 100644 --- a/src/parquet-writer.js +++ b/src/parquet-writer.js @@ -2,10 +2,10 @@ import { writeColumn } from './column.js' import { writeMetadata } from './metadata.js' /** - * Create a new ParquetWriter. + * ParquetWriter class allows incremental writing of parquet files. * * @import {ColumnChunk, FileMetaData, RowGroup, SchemaElement} from 'hyparquet' - * @import {KeyValue} from 'hyparquet/src/types.js' + * @import {KeyValue} from 'hyparquet/src/types.js' // TODO export from hyparquet * @import {ColumnData, Writer} from '../src/types.js' * @param {object} options * @param {Writer} options.writer diff --git a/src/schema.js b/src/schema.js index df4fe51..99147d3 100644 --- a/src/schema.js +++ b/src/schema.js @@ -13,10 +13,12 @@ export function getSchemaElementForValues(name, values, type) { if (values instanceof BigInt64Array) return { name, type: 'INT64', repetition_type: 'REQUIRED' } if (values instanceof Float32Array) return { name, type: 'FLOAT', repetition_type: 'REQUIRED' } if (values instanceof Float64Array) return { name, type: 'DOUBLE', repetition_type: 'REQUIRED' } + /** @type {FieldRepetitionType} */ let repetition_type = 'REQUIRED' /** @type {ConvertedType | undefined} */ let converted_type = undefined + for (const value of values) { if (value === null || value === undefined) { repetition_type = 'OPTIONAL' diff --git a/src/types.d.ts b/src/types.d.ts index edfa57f..5a3eef0 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -1,4 +1,14 @@ -import { DecodedArray, ParquetType } from "hyparquet" +import type { DecodedArray, ParquetType } from 'hyparquet' +import type { KeyValue } from 'hyparquet/src/types.js' // TODO export from hyparquet + +export interface ParquetWriteOptions { + writer: Writer + columnData: ColumnData[] + compressed?: boolean + statistics?: boolean + rowGroupSize?: number + kvMetadata?: KeyValue[] +} export interface ColumnData { name: string diff --git a/src/write.js b/src/write.js index 63c666e..21f2e3b 100644 --- a/src/write.js +++ b/src/write.js @@ -1,23 +1,16 @@ +import { ByteWriter } from './bytewriter.js' +import { fileWriter } from './filewriter.js' import { ParquetWriter } from './parquet-writer.js' import { schemaFromColumnData } from './schema.js' -import { ByteWriter } from './bytewriter.js' /** - * Write data as parquet to an ArrayBuffer + * Write data as parquet to a file or stream. * - * @import {KeyValue} from 'hyparquet/src/types.js' - * @import {ColumnData} from '../src/types.js' - * @param {object} options - * @param {ColumnData[]} options.columnData - * @param {boolean} [options.compressed] - * @param {boolean} [options.statistics] - * @param {number} [options.rowGroupSize] - * @param {KeyValue[]} [options.kvMetadata] - * @returns {ArrayBuffer} + * @import {ParquetWriteOptions} from '../src/types.js' + * @param {ParquetWriteOptions} options */ -export function parquetWrite({ columnData, compressed = true, statistics = true, rowGroupSize = 100000, kvMetadata }) { +export function parquetWrite({ writer, columnData, compressed = true, statistics = true, rowGroupSize = 100000, kvMetadata }) { const schema = schemaFromColumnData(columnData) - const writer = new ByteWriter() const pq = new ParquetWriter({ writer, schema, @@ -30,5 +23,27 @@ export function parquetWrite({ columnData, compressed = true, statistics = true, rowGroupSize, }) pq.finish() +} + +/** + * Write data as parquet to an ArrayBuffer. + * + * @param {Omit} options + * @returns {ArrayBuffer} + */ +export function parquetWriteBuffer(options) { + const writer = new ByteWriter() + parquetWrite({ ...options, writer }) return writer.getBuffer() } + +/** + * Write data as parquet to an ArrayBuffer. + * + * @param {Omit & {filename: string}} options + */ +export function parquetWriteFile(options) { + const { filename, ...rest } = options + const writer = fileWriter(filename) + parquetWrite({ ...rest, writer }) +} diff --git a/test/filewriter.test.js b/test/filewriter.test.js new file mode 100644 index 0000000..78b39ce --- /dev/null +++ b/test/filewriter.test.js @@ -0,0 +1,97 @@ +import fs from 'fs' +import { afterEach, beforeEach, describe, expect, it } from 'vitest' +import { fileWriter } from '../src/filewriter.js' + +const filedir = 'data/' +const filename = 'data/filewriter.test.bin' + +describe('FileWriter', () => { + beforeEach(() => { + // ensure data directory exists + if (!fs.existsSync(filedir)) { + fs.mkdirSync(filedir) + } + }) + + afterEach(() => { + // remove test file + if (fs.existsSync(filename)) { + fs.unlinkSync(filename) + } + }) + + it('throws an error when calling getBuffer', () => { + const writer = fileWriter(filename) + expect(() => writer.getBuffer()).toThrowError('getBuffer not supported') + }) + + it('writes single byte and flushes on finish', () => { + const writer = fileWriter(filename) + writer.appendUint8(0xff) + writer.finish() + + // verify file exists and content is correct + expect(fs.existsSync(filename)).toBe(true) + const contents = fs.readFileSync(filename) + expect(new Uint8Array(contents)).toEqual(new Uint8Array([0xff])) + }) + + it('writes multiple data types to file', () => { + const writer = fileWriter(filename) + writer.appendUint8(0xab) + writer.appendUint32(0x12345678) + writer.appendInt32(-1) + writer.appendInt64(0x1122334455667788n) + writer.appendVarInt(300) + writer.finish() + + const contents = new Uint8Array(fs.readFileSync(filename)) + + const expected = new Uint8Array([ + 0xab, + 0x78, 0x56, 0x34, 0x12, + 0xff, 0xff, 0xff, 0xff, + 0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11, + 0xac, 0x02, + ]) + expect(contents).toEqual(expected) + }) + + it('auto-flushes when exceeding chunk size', () => { + // default chunkSize = 1_000_000 bytes + const writer = fileWriter(filename) + + // write slightly over 1mb to trigger auto-flush + const largeArray = new Uint8Array(1_100_000).fill(0xaa) + writer.appendBytes(largeArray) + writer.appendBytes(largeArray) + + // expect first flush + expect(fs.statSync(filename).size).toBe(1_100_000) + + writer.finish() + + // expect final flush + expect(fs.statSync(filename).size).toBe(2_200_000) + }) + + it('overwrites existing file if new writer is created with same filename', () => { + // first write + let writer = fileWriter(filename) + writer.appendBytes(new Uint8Array([0x11, 0x22])) + writer.finish() + + // verify the file now has [0x11, 0x22] + let contents = fs.readFileSync(filename) + expect(new Uint8Array(contents)).toEqual(new Uint8Array([0x11, 0x22])) + + // second write + writer = fileWriter(filename) + writer.appendBytes(new Uint8Array([0xaa, 0xbb])) + writer.finish() + + // should overwrite the previous content + contents = fs.readFileSync(filename) + expect(new Uint8Array(contents)).toEqual(new Uint8Array([0xaa, 0xbb])) + }) +}) diff --git a/test/metadata.test.js b/test/metadata.test.js index 4ef528d..92648ae 100644 --- a/test/metadata.test.js +++ b/test/metadata.test.js @@ -147,10 +147,11 @@ describe('writeMetadata', () => { it('writes metadata and parses in hyparquet', () => { const writer = new ByteWriter() - // Write header PAR1 + // write header PAR1 writer.appendUint32(0x31524150) - // Write metadata + // write metadata + /** @type {FileMetaData} */ const withKvMetadata = { ...exampleMetadata, key_value_metadata: [ @@ -161,14 +162,13 @@ describe('writeMetadata', () => { } writeMetadata(writer, withKvMetadata) - // Write footer PAR1 + // write footer PAR1 writer.appendUint32(0x31524150) const file = writer.getBuffer() - const output = parquetMetadata(file) + const outputMetadata = parquetMetadata(file) - /** @type {FileMetaData} */ - expect(output).toEqual(withKvMetadata) + expect(outputMetadata).toEqual(withKvMetadata) }) }) diff --git a/test/write.test.js b/test/write.buffer.test.js similarity index 82% rename from test/write.test.js rename to test/write.buffer.test.js index 6ca53dc..ce044e4 100644 --- a/test/write.test.js +++ b/test/write.buffer.test.js @@ -1,6 +1,6 @@ import { parquetMetadata, parquetReadObjects } from 'hyparquet' import { describe, expect, it } from 'vitest' -import { parquetWrite } from '../src/index.js' +import { parquetWriteBuffer } from '../src/index.js' import { exampleMetadata } from './metadata.test.js' /** @@ -11,22 +11,24 @@ import { exampleMetadata } from './metadata.test.js' * @returns {Promise>} */ async function roundTripDeserialize(columnData) { - const file = parquetWrite({ columnData }) + const file = parquetWriteBuffer({ columnData }) return await parquetReadObjects({ file, utf8: false }) } -const basicData = [ +/** @type {ColumnData[]} */ +export const basicData = [ { name: 'bool', data: [true, false, true, false] }, { name: 'int', data: [0, 127, 0x7fff, 0x7fffffff] }, { name: 'bigint', data: [0n, 127n, 0x7fffn, 0x7fffffffffffffffn] }, + // { name: 'float', data: [0, 0.0001, 123.456, 1e100], type: 'FLOAT' }, // TODO { name: 'double', data: [0, 0.0001, 123.456, 1e100] }, { name: 'string', data: ['a', 'b', 'c', 'd'] }, { name: 'nullable', data: [true, false, null, null] }, ] -describe('parquetWrite', () => { +describe('parquetWriteBuffer', () => { it('writes expected metadata', () => { - const file = parquetWrite({ columnData: basicData }) + const file = parquetWriteBuffer({ columnData: basicData }) const metadata = parquetMetadata(file) expect(metadata).toEqual(exampleMetadata) }) @@ -47,7 +49,7 @@ describe('parquetWrite', () => { bool[100] = false bool[500] = true bool[9999] = false - const file = parquetWrite({ columnData: [{ name: 'bool', data: bool }] }) + const file = parquetWriteBuffer({ columnData: [{ name: 'bool', data: bool }] }) expect(file.byteLength).toBe(160) const metadata = parquetMetadata(file) expect(metadata.metadata_length).toBe(98) @@ -63,14 +65,14 @@ describe('parquetWrite', () => { it('efficiently serializes long string', () => { const str = 'a'.repeat(10000) - const file = parquetWrite({ columnData: [{ name: 'string', data: [str] }] }) + const file = parquetWriteBuffer({ columnData: [{ name: 'string', data: [str] }] }) expect(file.byteLength).toBe(646) }) it('less efficiently serializes string without compression', () => { const str = 'a'.repeat(10000) const columnData = [{ name: 'string', data: [str] }] - const file = parquetWrite({ columnData, compressed: false }) + const file = parquetWriteBuffer({ columnData, compressed: false }) expect(file.byteLength).toBe(10175) }) @@ -78,7 +80,7 @@ describe('parquetWrite', () => { const data = Array(100000) .fill('aaaa', 0, 50000) .fill('bbbb', 50000, 100000) - const file = parquetWrite({ columnData: [{ name: 'string', data }], statistics: false }) + const file = parquetWriteBuffer({ columnData: [{ name: 'string', data }], statistics: false }) expect(file.byteLength).toBe(178) // round trip const result = await parquetReadObjects({ file }) @@ -88,8 +90,8 @@ describe('parquetWrite', () => { }) it('writes statistics when enabled', () => { - const withStats = parquetWrite({ columnData: basicData, statistics: true }) - const noStats = parquetWrite({ columnData: basicData, statistics: false }) + const withStats = parquetWriteBuffer({ columnData: basicData, statistics: true }) + const noStats = parquetWriteBuffer({ columnData: basicData, statistics: false }) expect(withStats.byteLength).toBe(669) expect(noStats.byteLength).toBe(575) }) @@ -181,7 +183,7 @@ describe('parquetWrite', () => { it('splits row groups', async () => { const data = Array(200).fill(13) - const file = parquetWrite({ columnData: [{ name: 'int', data }], rowGroupSize: 100 }) + const file = parquetWriteBuffer({ columnData: [{ name: 'int', data }], rowGroupSize: 100 }) const metadata = parquetMetadata(file) expect(metadata.row_groups.length).toBe(2) expect(metadata.row_groups[0].num_rows).toBe(100n) @@ -196,31 +198,31 @@ describe('parquetWrite', () => { }) it('throws for wrong type specified', () => { - expect(() => parquetWrite({ columnData: [{ name: 'int', data: [1, 2, 3], type: 'BOOLEAN' }] })) + expect(() => parquetWriteBuffer({ columnData: [{ name: 'int', data: [1, 2, 3], type: 'BOOLEAN' }] })) .toThrow('parquet cannot write mixed types') }) it('throws for empty column with no type specified', () => { - expect(() => parquetWrite({ columnData: [{ name: 'empty', data: [] }] })) + expect(() => parquetWriteBuffer({ columnData: [{ name: 'empty', data: [] }] })) .toThrow('column empty cannot determine type') - expect(() => parquetWrite({ columnData: [{ name: 'empty', data: [null, null, null, null] }] })) + expect(() => parquetWriteBuffer({ columnData: [{ name: 'empty', data: [null, null, null, null] }] })) .toThrow('column empty cannot determine type') }) it('throws for mixed types', () => { - expect(() => parquetWrite({ columnData: [{ name: 'mixed', data: [1, 2, 3, 'boom'] }] })) + expect(() => parquetWriteBuffer({ columnData: [{ name: 'mixed', data: [1, 2, 3, 'boom'] }] })) .toThrow('mixed types not supported') }) it('throws error when columns have mismatched lengths', () => { - expect(() => parquetWrite({ columnData: [ + expect(() => parquetWriteBuffer({ columnData: [ { name: 'col1', data: [1, 2, 3] }, { name: 'col2', data: [4, 5] }, ] })).toThrow('columns must have the same length') }) it('throws error for unsupported data types', () => { - expect(() => parquetWrite({ columnData: [{ name: 'func', data: [() => {}] }] })) + expect(() => parquetWriteBuffer({ columnData: [{ name: 'func', data: [() => {}] }] })) .toThrow('cannot determine parquet type for: () => {}') }) }) diff --git a/test/write.file.test.js b/test/write.file.test.js new file mode 100644 index 0000000..d13d914 --- /dev/null +++ b/test/write.file.test.js @@ -0,0 +1,43 @@ +import fs from 'fs' +import { asyncBufferFromFile, parquetMetadataAsync, parquetReadObjects } from 'hyparquet' +import { afterEach, beforeEach, describe, expect, it } from 'vitest' +import { exampleMetadata } from './metadata.test.js' +import { parquetWriteFile } from '../src/index.js' +import { basicData } from './write.buffer.test.js' + +const filedir = 'data/' +const filename = 'data/write.file.parquet' + +describe('parquetWrite with FileWriter', () => { + beforeEach(() => { + // ensure data directory exists + if (!fs.existsSync(filedir)) { + fs.mkdirSync(filedir) + } + }) + + afterEach(() => { + // remove test file + if (fs.existsSync(filename)) { + fs.unlinkSync(filename) + } + }) + + it('writes parquet file', async () => { + parquetWriteFile({ filename, columnData: basicData }) + + // check parquet metadata + const file = await asyncBufferFromFile(filename) + const metadata = await parquetMetadataAsync(file) + expect(metadata).toEqual(exampleMetadata) + + // check parquet data + const result = await parquetReadObjects({ file, metadata }) + expect(result).toEqual([ + { bool: true, int: 0, bigint: 0n, double: 0, string: 'a', nullable: true }, + { bool: false, int: 127, bigint: 127n, double: 0.0001, string: 'b', nullable: false }, + { bool: true, int: 0x7fff, bigint: 0x7fffn, double: 123.456, string: 'c', nullable: null }, + { bool: false, int: 0x7fffffff, bigint: 0x7fffffffffffffffn, double: 1e100, string: 'd', nullable: null }, + ]) + }) +})