diff --git a/src/column.js b/src/column.js new file mode 100644 index 0000000..54dbe84 --- /dev/null +++ b/src/column.js @@ -0,0 +1,126 @@ +import { Encoding, PageType } from 'hyparquet/src/constants.js' +import { writeRleBitPackedHybrid } from './encoding.js' +import { writePlain } from './plain.js' +import { serializeTCompactProtocol } from './thrift.js' +import { Writer } from './writer.js' + +/** + * @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType} from 'hyparquet/src/types.js' + * @param {Writer} writer + * @param {string} columnName + * @param {DecodedArray} values + * @param {ParquetType} type + * @returns {ColumnMetaData} + */ +export function writeColumn(writer, columnName, values, type) { + // Get data stats + const num_nulls = values.filter(v => v === null).length + const offsetStart = writer.offset + + // Write page to temp buffer + const page = new Writer() + + /** @type {import('hyparquet/src/types.js').Encoding} */ + const encoding = 'PLAIN' + + // TODO: repetition levels + const maxRepetitionLevel = 0 + let repetition_levels_byte_length = 0 + if (maxRepetitionLevel) { + repetition_levels_byte_length = writeRleBitPackedHybrid(page, []) + } + + // TODO: definition levels + const maxDefinitionLevel = 0 + let definition_levels_byte_length = 0 + if (maxDefinitionLevel) { + definition_levels_byte_length = writeRleBitPackedHybrid(page, []) + } + + // write page data (TODO: compressed) + const { uncompressed_page_size, compressed_page_size } = writePageData(page, values, type) + + // write page header + /** @type {PageHeader} */ + const header = { + type: 'DATA_PAGE_V2', + uncompressed_page_size, + compressed_page_size, + data_page_header_v2: { + num_values: values.length, + num_nulls, + num_rows: values.length, + encoding, + definition_levels_byte_length, + repetition_levels_byte_length, + is_compressed: false, + }, + } + writePageHeader(writer, header) + + // write page data + writer.appendBuffer(page.getBuffer()) + + return { + type, + encodings: ['PLAIN'], + path_in_schema: [columnName], + codec: 'UNCOMPRESSED', + num_values: BigInt(values.length), + total_compressed_size: BigInt(writer.offset - offsetStart), + total_uncompressed_size: BigInt(writer.offset - offsetStart), + data_page_offset: BigInt(offsetStart), + } +} + +/** + * Deduce a ParquetType from the JS value + * + * @param {DecodedArray} values + * @returns {ParquetType} + */ +export function getParquetTypeForValues(values) { + if (values.every(v => typeof v === 'boolean')) return 'BOOLEAN' + if (values.every(v => typeof v === 'bigint')) return 'INT64' + if (values.every(v => Number.isInteger(v))) return 'INT32' + if (values.every(v => typeof v === 'number')) return 'DOUBLE' + if (values.every(v => typeof v === 'string')) return 'BYTE_ARRAY' + throw new Error(`Cannot determine parquet type for: ${values}`) +} + +/** + * @param {Writer} writer + * @param {PageHeader} header + */ +function writePageHeader(writer, header) { + const compact = { + field_1: PageType.indexOf(header.type), + field_2: header.uncompressed_page_size, + field_3: header.compressed_page_size, + field_8: header.data_page_header_v2 && { + field_1: header.data_page_header_v2.num_values, + field_2: header.data_page_header_v2.num_nulls, + field_3: header.data_page_header_v2.num_rows, + field_4: Encoding.indexOf(header.data_page_header_v2.encoding), + field_5: header.data_page_header_v2.definition_levels_byte_length, + field_6: header.data_page_header_v2.repetition_levels_byte_length, + field_7: header.data_page_header_v2.is_compressed ? undefined : false, // default true + }, + } + serializeTCompactProtocol(writer, compact) +} + +/** + * @param {Writer} writer + * @param {DecodedArray} values + * @param {ParquetType} type + * @returns {{ uncompressed_page_size: number, compressed_page_size: number }} + */ +function writePageData(writer, values, type) { + // write plain data + const startOffset = writer.offset + writePlain(writer, values, type) + const size = writer.offset - startOffset + + return { uncompressed_page_size: size, compressed_page_size: size } +} diff --git a/src/index.js b/src/index.js index e69de29..f72a139 100644 --- a/src/index.js +++ b/src/index.js @@ -0,0 +1 @@ +export { parquetWrite } from './write.js' diff --git a/src/metadata.js b/src/metadata.js index b898ada..c265594 100644 --- a/src/metadata.js +++ b/src/metadata.js @@ -1,4 +1,4 @@ -import { Encoding, FieldRepetitionType, ParquetType } from 'hyparquet/src/constants.js' +import { ConvertedType, Encoding, FieldRepetitionType, ParquetType } from 'hyparquet/src/constants.js' import { serializeTCompactProtocol } from './thrift.js' const CompressionCodec = [ @@ -27,7 +27,7 @@ export function writeMetadata(writer, metadata) { field_3: element.repetition_type && FieldRepetitionType.indexOf(element.repetition_type), field_4: element.name, field_5: element.num_children, - field_6: element.converted_type, + field_6: element.converted_type && ConvertedType.indexOf(element.converted_type), field_7: element.scale, field_8: element.precision, field_9: element.field_id, @@ -40,7 +40,7 @@ export function writeMetadata(writer, metadata) { field_2: c.file_offset, field_3: c.meta_data && { field_1: ParquetType.indexOf(c.meta_data.type), - field_2: c.meta_data.encodings.map(e => Encoding.indexOf(e)), // WTF simplfy? + field_2: c.meta_data.encodings.map(e => Encoding.indexOf(e)), field_3: c.meta_data.path_in_schema, field_4: CompressionCodec.indexOf(c.meta_data.codec), field_5: c.meta_data.num_values, diff --git a/src/write.js b/src/write.js new file mode 100644 index 0000000..a0ade0b --- /dev/null +++ b/src/write.js @@ -0,0 +1,78 @@ +import { getParquetTypeForValues, writeColumn } from './column.js' +import { Writer } from './writer.js' +import { writeMetadata } from './metadata.js' + +/** + * Write data as parquet to an ArrayBuffer + * + * @import {ColumnChunk, DecodedArray, FileMetaData, SchemaElement} from 'hyparquet' + * @param {Record} columnData + * @returns {ArrayBuffer} + */ +export function parquetWrite(columnData) { + const writer = new Writer() + + // Check if all columns have the same length + const columnNames = Object.keys(columnData) + const num_rows = columnNames.length ? BigInt(columnData[columnNames[0]].length) : 0n + for (const name of columnNames) { + if (BigInt(columnData[name].length) !== num_rows) { + throw new Error('parquetWrite: all columns must have the same length') + } + } + + // Write header PAR1 + writer.appendUint32(0x31524150) + + // schema + /** @type {SchemaElement[]} */ + const schema = [{ + name: 'root', + num_children: columnNames.length, + repetition_type: 'REQUIRED', + }] + + // row group columns + /** @type {ColumnChunk[]} */ + const columns = [] + + // Write columns + for (const name of columnNames) { + const values = columnData[name] + const type = getParquetTypeForValues(values) + const file_offset = BigInt(writer.offset) + const meta_data = writeColumn(writer, name, values, type) + const repetition_type = 'REQUIRED' + + // save metadata + schema.push({ type, name, repetition_type }) + columns.push({ + file_path: name, + file_offset, + meta_data, + }) + } + + // Write metadata + /** @type {FileMetaData} */ + const metadata = { + version: 2, + created_by: 'hyparquet', + schema, + num_rows, + row_groups: [{ + columns, + total_byte_size: BigInt(writer.offset - 4), + num_rows, + }], + metadata_length: 0, + } + // @ts-ignore don't want to actually serialize metadata_length + delete metadata.metadata_length + writeMetadata(writer, metadata) + + // Write footer PAR1 + writer.appendUint32(0x31524150) + + return writer.getBuffer() +} diff --git a/test/write.test.js b/test/write.test.js new file mode 100644 index 0000000..44f7b9d --- /dev/null +++ b/test/write.test.js @@ -0,0 +1,41 @@ +import { parquetMetadata, parquetReadObjects } from 'hyparquet' +import { describe, expect, it } from 'vitest' +import { parquetWrite } from '../src/index.js' +import { exampleMetadata } from './metadata.test.js' + +/** + * Utility to encode a parquet file and then read it back into a JS object. + * + * @param {Record} columnData + * @returns {Promise>} + */ +async function roundTripDeserialize(columnData) { + const file = parquetWrite(columnData) + return await parquetReadObjects({ file }) +} + +const data = { + bool: [true, false, true, false], // BOOLEAN + int: [0, 127, 0x7fff, 0x7fffffff], // INT32 + bigint: [0n, 127n, 0x7fffn, 0x7fffffffffffffffn], // INT64 + double: [0, 0.0001, 123.456, 1e100], // DOUBLE + string: ['a', 'b', 'c', 'd'], // BYTE_ARRAY +} + +describe('parquetWrite', () => { + it('writes expected metadata', () => { + const file = parquetWrite(data) + const metadata = parquetMetadata(file) + expect(metadata).toEqual(exampleMetadata) + }) + + it('serializes basic types correctly', async () => { + const result = await roundTripDeserialize(data) + expect(result).toEqual([ + { bool: true, int: 0, bigint: 0n, double: 0, string: 'a' }, + { bool: false, int: 127, bigint: 127n, double: 0.0001, string: 'b' }, + { bool: true, int: 0x7fff, bigint: 0x7fffn, double: 123.456, string: 'c' }, + { bool: false, int: 0x7fffffff, bigint: 0x7fffffffffffffffn, double: 1e100, string: 'd' }, + ]) + }) +})