diff --git a/src/index.js b/src/index.js index f72a139..f6f2dd4 100644 --- a/src/index.js +++ b/src/index.js @@ -1 +1,2 @@ export { parquetWrite } from './write.js' +export { ParquetWriter } from './parquet-writer.js' diff --git a/src/parquet-writer.js b/src/parquet-writer.js new file mode 100644 index 0000000..de3305c --- /dev/null +++ b/src/parquet-writer.js @@ -0,0 +1,99 @@ +import { writeColumn } from './column.js' +import { Writer } from './writer.js' +import { writeMetadata } from './metadata.js' + +/** + * Create a new ParquetWriter. + * + * @import {ColumnChunk, FileMetaData, RowGroup, SchemaElement, SchemaTree} from 'hyparquet' + * @import {KeyValue} from 'hyparquet/src/types.js' + * @import {ColumnData} from './types.js' + * @param {object} options + * @param {SchemaElement[]} options.schema + * @param {boolean} [options.compressed] + * @param {boolean} [options.statistics] + * @param {KeyValue[]} [options.kvMetadata] + */ +export function ParquetWriter({ schema, compressed = true, statistics = true, kvMetadata }) { + /** @type {RowGroup[]} */ + this.row_groups = [] + this.schema = schema + this.compressed = compressed + this.statistics = statistics + this.kvMetadata = kvMetadata + this.num_rows = BigInt(0) + this.writer = new Writer() + + // write header PAR1 + this.writer.appendUint32(0x31524150) +} + +/** + * Write data to the file. + * Will split data into row groups of the specified size. + * + * @param {object} options + * @param {ColumnData[]} options.columnData + * @param {number} [options.rowGroupSize] + */ +ParquetWriter.prototype.write = function({ columnData, rowGroupSize = 100000 }) { + const columnDataRows = columnData[0]?.data?.length || 0 + for (let groupStartIndex = 0; groupStartIndex < columnDataRows; groupStartIndex += rowGroupSize) { + const groupStartOffset = this.writer.offset + const groupSize = Math.min(rowGroupSize, columnDataRows - groupStartIndex) + + // row group columns + /** @type {ColumnChunk[]} */ + const columns = [] + + // write columns + for (let j = 0; j < columnData.length; j++) { + const { name, data } = columnData[j] + const schemaPath = [this.schema[0], this.schema[j + 1]] + const groupData = data.slice(groupStartIndex, groupStartIndex + groupSize) + const file_offset = BigInt(this.writer.offset) + const meta_data = writeColumn(this.writer, schemaPath, groupData, this.compressed, this.statistics) + + // save column chunk metadata + columns.push({ + file_path: name, + file_offset, + meta_data, + }) + } + this.num_rows += BigInt(groupSize) + + this.row_groups.push({ + columns, + total_byte_size: BigInt(this.writer.offset - groupStartOffset), + num_rows: BigInt(groupSize), + }) + } +} + +/** + * Finish writing the file and return the buffer. + * + * @returns {ArrayBuffer} + */ +ParquetWriter.prototype.finish = function() { + // write metadata + /** @type {FileMetaData} */ + const metadata = { + version: 2, + created_by: 'hyparquet', + schema: this.schema, + num_rows: this.num_rows, + row_groups: this.row_groups, + metadata_length: 0, + key_value_metadata: this.kvMetadata, + } + // @ts-ignore don't want to actually serialize metadata_length + delete metadata.metadata_length + writeMetadata(this.writer, metadata) + + // write footer PAR1 + this.writer.appendUint32(0x31524150) + + return this.writer.getBuffer() +} diff --git a/src/write.js b/src/write.js index 0bffb47..8cc3887 100644 --- a/src/write.js +++ b/src/write.js @@ -1,7 +1,5 @@ -import { writeColumn } from './column.js' -import { Writer } from './writer.js' -import { writeMetadata } from './metadata.js' import { getSchemaElementForValues } from './schema.js' +import { ParquetWriter } from './parquet-writer.js' /** * Write data as parquet to an ArrayBuffer @@ -18,18 +16,40 @@ import { getSchemaElementForValues } from './schema.js' * @returns {ArrayBuffer} */ export function parquetWrite({ columnData, compressed = true, statistics = true, rowGroupSize = 100000, kvMetadata }) { - const num_rows = columnData.length ? BigInt(columnData[0].data.length) : 0n - const writer = new Writer() + const schema = schemaFromColumnData(columnData) + const writer = new ParquetWriter({ + schema, + compressed, + statistics, + kvMetadata, + }) - // construct schema + writer.write({ + columnData, + rowGroupSize, + }) + + return writer.finish() +} + +/** + * Convert column data to schema. + * + * @param {ColumnData[]} columnData + * @returns {SchemaElement[]} + */ +function schemaFromColumnData(columnData) { /** @type {SchemaElement[]} */ const schema = [{ name: 'root', num_children: columnData.length, }] + let num_rows = 0 for (const { name, data, type } of columnData) { // check if all columns have the same length - if (BigInt(data.length) !== num_rows) { + if (num_rows === 0) { + num_rows = data.length + } else if (num_rows !== data.length) { throw new Error('columns must have the same length') } // auto-detect type @@ -37,58 +57,5 @@ export function parquetWrite({ columnData, compressed = true, statistics = true, if (!schemaElement.type) throw new Error(`column ${name} cannot determine type`) schema.push(schemaElement) } - - // write header PAR1 - writer.appendUint32(0x31524150) - - /** @type {RowGroup[]} */ - const row_groups = [] - for (let i = 0; i < num_rows; i += rowGroupSize) { - const groupStart = writer.offset - - // row group columns - /** @type {ColumnChunk[]} */ - const columns = [] - - // write columns - for (let i = 0; i < columnData.length; i++) { - const { name, data } = columnData[i] - const file_offset = BigInt(writer.offset) - const schemaPath = [schema[0], schema[i + 1]] - const meta_data = writeColumn(writer, schemaPath, data, compressed, statistics) - - // save metadata - columns.push({ - file_path: name, - file_offset, - meta_data, - }) - } - - row_groups.push({ - columns, - total_byte_size: BigInt(writer.offset - groupStart), - num_rows: BigInt(Math.min(rowGroupSize, Number(num_rows) - i)), - }) - } - - // write metadata - /** @type {FileMetaData} */ - const metadata = { - version: 2, - created_by: 'hyparquet', - schema, - num_rows, - row_groups, - metadata_length: 0, - key_value_metadata: kvMetadata, - } - // @ts-ignore don't want to actually serialize metadata_length - delete metadata.metadata_length - writeMetadata(writer, metadata) - - // write footer PAR1 - writer.appendUint32(0x31524150) - - return writer.getBuffer() + return schema }