diff --git a/src/column.js b/src/column.js index 1ba1b41..3ffd884 100644 --- a/src/column.js +++ b/src/column.js @@ -1,9 +1,8 @@ import { unconvert } from './unconvert.js' -import { writeRleBitPackedHybrid } from './encoding.js' import { writePlain } from './plain.js' import { snappyCompress } from './snappy.js' import { ByteWriter } from './bytewriter.js' -import { writeLevels, writePageHeader } from './datapage.js' +import { writeDataPageV2, writePageHeader } from './datapage.js' /** * @param {Writer} writer @@ -23,77 +22,39 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) { // Compute statistics const statistics = stats ? getStatistics(values) : undefined - // Write levels to temp buffer - const levels = new ByteWriter() - const { definition_levels_byte_length, repetition_levels_byte_length, num_nulls } - = writeLevels(levels, schemaPath, values) - // dictionary encoding let dictionary_page_offset = undefined + let data_page_offset = BigInt(writer.offset) /** @type {DecodedArray | undefined} */ const dictionary = useDictionary(values, type) if (dictionary) { dictionary_page_offset = BigInt(writer.offset) // replace values with dictionary indices - const indexes = new Int32Array(values.length) + const indexes = new Array(values.length) for (let i = 0; i < values.length; i++) { - indexes[i] = dictionary.indexOf(values[i]) + if (values[i] !== null && values[i] !== undefined) { + indexes[i] = dictionary.indexOf(values[i]) + } } - values = indexes // write unconverted dictionary page const unconverted = unconvert(schemaElement, dictionary) writeDictionaryPage(writer, unconverted, type, compressed) + + // write data page with dictionary indexes + data_page_offset = BigInt(writer.offset) + writeDataPageV2(writer, indexes, type, schemaPath, 'RLE_DICTIONARY', compressed) } else { - // unconvert type and filter out nulls + // unconvert values from rich types to simple values = unconvert(schemaElement, values) - .filter(v => v !== null && v !== undefined) + + // write data page + writeDataPageV2(writer, values, type, schemaPath, 'PLAIN', compressed) } - // write page data to temp buffer - const page = new ByteWriter() /** @type {import('hyparquet').Encoding} */ const encoding = dictionary ? 'RLE_DICTIONARY' : 'PLAIN' - if (dictionary) { - const bitWidth = Math.ceil(Math.log2(dictionary.length)) - page.appendUint8(bitWidth) - writeRleBitPackedHybrid(page, values) - } else { - writePlain(page, values, type) - } - - // compress page data - let compressedPage = page - if (compressed) { - compressedPage = new ByteWriter() - snappyCompress(compressedPage, new Uint8Array(page.getBuffer())) - } - - // write page header - const data_page_offset = BigInt(writer.offset) - /** @type {PageHeader} */ - const header = { - type: 'DATA_PAGE_V2', - uncompressed_page_size: levels.offset + page.offset, - compressed_page_size: levels.offset + compressedPage.offset, - data_page_header_v2: { - num_values, - num_nulls, - num_rows: num_values, - encoding: dictionary ? 'RLE_DICTIONARY' : encoding, - definition_levels_byte_length, - repetition_levels_byte_length, - is_compressed: true, - }, - } - writePageHeader(writer, header) - - // write levels - writer.appendBuffer(levels.getBuffer()) - - // write page data - writer.appendBuffer(compressedPage.getBuffer()) return { type, diff --git a/src/datapage.js b/src/datapage.js index 953b2b8..bfc9b90 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -1,12 +1,76 @@ import { Encoding, PageType } from 'hyparquet/src/constants.js' +import { ByteWriter } from './bytewriter.js' import { writeRleBitPackedHybrid } from './encoding.js' +import { writePlain } from './plain.js' +import { snappyCompress } from './snappy.js' import { serializeTCompactProtocol } from './thrift.js' import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' /** - * @import {DecodedArray, PageHeader, SchemaElement} from 'hyparquet' * @import {Writer} from '../src/types.js' * @param {Writer} writer + * @param {DecodedArray} values + * @param {ParquetType} type + * @param {SchemaElement[]} schemaPath + * @param {import('hyparquet').Encoding} encoding + * @param {boolean} compressed + */ +export function writeDataPageV2(writer, values, type, schemaPath, encoding, compressed) { + const num_values = values.length + + // write levels to temp buffer + const levels = new ByteWriter() + const { definition_levels_byte_length, repetition_levels_byte_length, num_nulls } + = writeLevels(levels, schemaPath, values) + + const nonnull = values.filter(v => v !== null && v !== undefined) + + // write page data to temp buffer + const page = new ByteWriter() + if (encoding === 'RLE_DICTIONARY') { + const maxValue = Math.max(...nonnull) + const bitWidth = Math.ceil(Math.log2(maxValue + 1)) + page.appendUint8(bitWidth) + writeRleBitPackedHybrid(page, nonnull) + } else { + writePlain(page, nonnull, type) + } + + // compress page data + let compressedPage = page + if (compressed) { + compressedPage = new ByteWriter() + snappyCompress(compressedPage, new Uint8Array(page.getBuffer())) + } + + // write page header + /** @type {PageHeader} */ + const header = { + type: 'DATA_PAGE_V2', + uncompressed_page_size: levels.offset + page.offset, + compressed_page_size: levels.offset + compressedPage.offset, + data_page_header_v2: { + num_values, + num_nulls, + num_rows: num_values, + encoding, + definition_levels_byte_length, + repetition_levels_byte_length, + is_compressed: compressed, + }, + } + writePageHeader(writer, header) + + // write levels + writer.appendBuffer(levels.getBuffer()) + + // write page data + writer.appendBuffer(compressedPage.getBuffer()) +} + +/** + * @import {DecodedArray, PageHeader, ParquetType, SchemaElement} from 'hyparquet' + * @param {Writer} writer * @param {PageHeader} header */ export function writePageHeader(writer, header) { @@ -46,7 +110,7 @@ export function writePageHeader(writer, header) { * @param {DecodedArray} values * @returns {{ definition_levels_byte_length: number, repetition_levels_byte_length: number, num_nulls: number}} */ -export function writeLevels(writer, schemaPath, values) { +function writeLevels(writer, schemaPath, values) { let num_nulls = 0 // TODO: repetition levels diff --git a/test/write.buffer.test.js b/test/write.buffer.test.js index c0e7748..c7b4864 100644 --- a/test/write.buffer.test.js +++ b/test/write.buffer.test.js @@ -62,7 +62,7 @@ describe('parquetWriteBuffer', () => { const str = 'a'.repeat(10000) const columnData = [{ name: 'string', data: [str] }] const file = parquetWriteBuffer({ columnData, compressed: false }) - expect(file.byteLength).toBe(10175) + expect(file.byteLength).toBe(10176) }) it('efficiently serializes column with few distinct values', async () => {