diff --git a/README.md b/README.md index 484fdd9..71c80be 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![npm](https://img.shields.io/npm/v/hyparquet-writer)](https://www.npmjs.com/package/hyparquet-writer) [![workflow status](https://github.com/hyparam/hyparquet-writer/actions/workflows/ci.yml/badge.svg)](https://github.com/hyparam/hyparquet-writer/actions) [![mit license](https://img.shields.io/badge/License-MIT-orange.svg)](https://opensource.org/licenses/MIT) -![coverage](https://img.shields.io/badge/Coverage-96-darkred) +![coverage](https://img.shields.io/badge/Coverage-95-darkred) [![dependencies](https://img.shields.io/badge/Dependencies-0-blueviolet)](https://www.npmjs.com/package/hyparquet?activeTab=dependencies) ## Usage diff --git a/src/column.js b/src/column.js index b579f51..2ee59fd 100644 --- a/src/column.js +++ b/src/column.js @@ -2,9 +2,9 @@ import { Encoding, PageType } from 'hyparquet/src/constants.js' import { unconvert } from './convert.js' import { writeRleBitPackedHybrid } from './encoding.js' import { writePlain } from './plain.js' +import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' import { serializeTCompactProtocol } from './thrift.js' import { Writer } from './writer.js' -import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' /** * @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType, SchemaElement} from 'hyparquet' @@ -18,22 +18,17 @@ export function writeColumn(writer, schemaPath, values) { const { type } = schemaElement if (!type) throw new Error(`column ${schemaElement.name} cannot determine type`) const offsetStart = writer.offset + const num_values = values.length let num_nulls = 0 - // Unconvert type if necessary - values = unconvert(schemaElement, values) - - // Write page to temp buffer - const page = new Writer() - - /** @type {import('hyparquet/src/types.js').Encoding} */ - const encoding = 'PLAIN' + // Write levels to temp buffer + const levels = new Writer() // TODO: repetition levels const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) let repetition_levels_byte_length = 0 if (maxRepetitionLevel) { - repetition_levels_byte_length = writeRleBitPackedHybrid(page, []) + repetition_levels_byte_length = writeRleBitPackedHybrid(levels, []) } // definition levels @@ -49,10 +44,15 @@ export function writeColumn(writer, schemaPath, values) { definitionLevels.push(maxDefinitionLevel) } } - definition_levels_byte_length = writeRleBitPackedHybrid(page, definitionLevels) + definition_levels_byte_length = writeRleBitPackedHybrid(levels, definitionLevels) } - // write page data + // Unconvert type and filter out nulls + values = unconvert(schemaElement, values) + .filter(v => v !== null && v !== undefined) + + // write page data to temp buffer + const page = new Writer() writePageData(page, values, type) // TODO: compress page data @@ -61,13 +61,13 @@ export function writeColumn(writer, schemaPath, values) { /** @type {PageHeader} */ const header = { type: 'DATA_PAGE_V2', - uncompressed_page_size: page.offset, - compressed_page_size: page.offset, + uncompressed_page_size: levels.offset + page.offset, + compressed_page_size: levels.offset + page.offset, data_page_header_v2: { - num_values: values.length, + num_values, num_nulls, - num_rows: values.length, - encoding, + num_rows: num_values, + encoding: 'PLAIN', definition_levels_byte_length, repetition_levels_byte_length, is_compressed: false, @@ -75,6 +75,9 @@ export function writeColumn(writer, schemaPath, values) { } writePageHeader(writer, header) + // write levels + writer.appendBuffer(levels.getBuffer()) + // write page data writer.appendBuffer(page.getBuffer()) @@ -83,7 +86,7 @@ export function writeColumn(writer, schemaPath, values) { encodings: ['PLAIN'], path_in_schema: schemaPath.slice(1).map(s => s.name), codec: 'UNCOMPRESSED', - num_values: BigInt(values.length), + num_values: BigInt(num_values), total_compressed_size: BigInt(writer.offset - offsetStart), total_uncompressed_size: BigInt(writer.offset - offsetStart), data_page_offset: BigInt(offsetStart), diff --git a/src/encoding.js b/src/encoding.js index ad20a94..fa30f0e 100644 --- a/src/encoding.js +++ b/src/encoding.js @@ -101,7 +101,8 @@ function writeRle(writer, values, bitWidth) { count++ // continue the run } else { // write the count of repeated values - writer.appendVarInt(count) + const header = count << 1 + writer.appendVarInt(header) // write the value const width = bitWidth + 7 >> 3 // bytes needed diff --git a/test/encoding.test.js b/test/encoding.test.js index ad508d1..71b728f 100644 --- a/test/encoding.test.js +++ b/test/encoding.test.js @@ -56,4 +56,14 @@ describe('RLE bit-packed hybrid', () => { const decoded = roundTripDeserialize(original) expect(decoded).toEqual(original) }) + + it('should round-trip a sparse array of booleans', () => { + const original = Array(10000).fill(0) + original[10] = 1 + original[100] = 1 + original[500] = 1 + original[9999] = 1 + const decoded = roundTripDeserialize(original) + expect(decoded).toEqual(original) + }) }) diff --git a/test/write.test.js b/test/write.test.js index c973b61..db180c7 100644 --- a/test/write.test.js +++ b/test/write.test.js @@ -41,16 +41,30 @@ describe('parquetWrite', () => { ]) }) - it('efficiently serializes sparse booleans', () => { + it('efficiently serializes sparse booleans', async () => { const bool = Array(10000).fill(null) bool[10] = true bool[100] = false bool[500] = true bool[9999] = false - const buffer = parquetWrite([{ name: 'bool', data: bool }]) - expect(buffer.byteLength).toBe(1399) - const metadata = parquetMetadata(buffer) - expect(metadata.metadata_length).toBe(89) + const file = parquetWrite([{ name: 'bool', data: bool }]) + expect(file.byteLength).toBe(147) + const metadata = parquetMetadata(file) + expect(metadata.metadata_length).toBe(86) + const result = await parquetReadObjects({ file }) + expect(result.length).toBe(10000) + expect(result[0]).toEqual({ bool: null }) + expect(result[9]).toEqual({ bool: null }) + expect(result[10]).toEqual({ bool: true }) + expect(result[100]).toEqual({ bool: false }) + expect(result[500]).toEqual({ bool: true }) + expect(result[9999]).toEqual({ bool: false }) + }) + + it('efficiently serializes long string', () => { + const str = 'a'.repeat(10000) + const file = parquetWrite([{ name: 'string', data: [str] }]) + expect(file.byteLength).toBe(10136) }) it('serializes list types', async () => {