Split out writeDataPageV2

This commit is contained in:
Kenny Daniel 2025-04-12 19:02:41 -06:00
parent f6740aba3f
commit 26daec2fcb
No known key found for this signature in database
GPG Key ID: FDF16101AF5AFD3A
3 changed files with 81 additions and 56 deletions

@ -1,9 +1,8 @@
import { unconvert } from './unconvert.js'
import { writeRleBitPackedHybrid } from './encoding.js'
import { writePlain } from './plain.js'
import { snappyCompress } from './snappy.js'
import { ByteWriter } from './bytewriter.js'
import { writeLevels, writePageHeader } from './datapage.js'
import { writeDataPageV2, writePageHeader } from './datapage.js'
/**
* @param {Writer} writer
@ -23,77 +22,39 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) {
// Compute statistics
const statistics = stats ? getStatistics(values) : undefined
// Write levels to temp buffer
const levels = new ByteWriter()
const { definition_levels_byte_length, repetition_levels_byte_length, num_nulls }
= writeLevels(levels, schemaPath, values)
// dictionary encoding
let dictionary_page_offset = undefined
let data_page_offset = BigInt(writer.offset)
/** @type {DecodedArray | undefined} */
const dictionary = useDictionary(values, type)
if (dictionary) {
dictionary_page_offset = BigInt(writer.offset)
// replace values with dictionary indices
const indexes = new Int32Array(values.length)
const indexes = new Array(values.length)
for (let i = 0; i < values.length; i++) {
indexes[i] = dictionary.indexOf(values[i])
if (values[i] !== null && values[i] !== undefined) {
indexes[i] = dictionary.indexOf(values[i])
}
}
values = indexes
// write unconverted dictionary page
const unconverted = unconvert(schemaElement, dictionary)
writeDictionaryPage(writer, unconverted, type, compressed)
// write data page with dictionary indexes
data_page_offset = BigInt(writer.offset)
writeDataPageV2(writer, indexes, type, schemaPath, 'RLE_DICTIONARY', compressed)
} else {
// unconvert type and filter out nulls
// unconvert values from rich types to simple
values = unconvert(schemaElement, values)
.filter(v => v !== null && v !== undefined)
// write data page
writeDataPageV2(writer, values, type, schemaPath, 'PLAIN', compressed)
}
// write page data to temp buffer
const page = new ByteWriter()
/** @type {import('hyparquet').Encoding} */
const encoding = dictionary ? 'RLE_DICTIONARY' : 'PLAIN'
if (dictionary) {
const bitWidth = Math.ceil(Math.log2(dictionary.length))
page.appendUint8(bitWidth)
writeRleBitPackedHybrid(page, values)
} else {
writePlain(page, values, type)
}
// compress page data
let compressedPage = page
if (compressed) {
compressedPage = new ByteWriter()
snappyCompress(compressedPage, new Uint8Array(page.getBuffer()))
}
// write page header
const data_page_offset = BigInt(writer.offset)
/** @type {PageHeader} */
const header = {
type: 'DATA_PAGE_V2',
uncompressed_page_size: levels.offset + page.offset,
compressed_page_size: levels.offset + compressedPage.offset,
data_page_header_v2: {
num_values,
num_nulls,
num_rows: num_values,
encoding: dictionary ? 'RLE_DICTIONARY' : encoding,
definition_levels_byte_length,
repetition_levels_byte_length,
is_compressed: true,
},
}
writePageHeader(writer, header)
// write levels
writer.appendBuffer(levels.getBuffer())
// write page data
writer.appendBuffer(compressedPage.getBuffer())
return {
type,

@ -1,12 +1,76 @@
import { Encoding, PageType } from 'hyparquet/src/constants.js'
import { ByteWriter } from './bytewriter.js'
import { writeRleBitPackedHybrid } from './encoding.js'
import { writePlain } from './plain.js'
import { snappyCompress } from './snappy.js'
import { serializeTCompactProtocol } from './thrift.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
/**
* @import {DecodedArray, PageHeader, SchemaElement} from 'hyparquet'
* @import {Writer} from '../src/types.js'
* @param {Writer} writer
* @param {DecodedArray} values
* @param {ParquetType} type
* @param {SchemaElement[]} schemaPath
* @param {import('hyparquet').Encoding} encoding
* @param {boolean} compressed
*/
export function writeDataPageV2(writer, values, type, schemaPath, encoding, compressed) {
const num_values = values.length
// write levels to temp buffer
const levels = new ByteWriter()
const { definition_levels_byte_length, repetition_levels_byte_length, num_nulls }
= writeLevels(levels, schemaPath, values)
const nonnull = values.filter(v => v !== null && v !== undefined)
// write page data to temp buffer
const page = new ByteWriter()
if (encoding === 'RLE_DICTIONARY') {
const maxValue = Math.max(...nonnull)
const bitWidth = Math.ceil(Math.log2(maxValue + 1))
page.appendUint8(bitWidth)
writeRleBitPackedHybrid(page, nonnull)
} else {
writePlain(page, nonnull, type)
}
// compress page data
let compressedPage = page
if (compressed) {
compressedPage = new ByteWriter()
snappyCompress(compressedPage, new Uint8Array(page.getBuffer()))
}
// write page header
/** @type {PageHeader} */
const header = {
type: 'DATA_PAGE_V2',
uncompressed_page_size: levels.offset + page.offset,
compressed_page_size: levels.offset + compressedPage.offset,
data_page_header_v2: {
num_values,
num_nulls,
num_rows: num_values,
encoding,
definition_levels_byte_length,
repetition_levels_byte_length,
is_compressed: compressed,
},
}
writePageHeader(writer, header)
// write levels
writer.appendBuffer(levels.getBuffer())
// write page data
writer.appendBuffer(compressedPage.getBuffer())
}
/**
* @import {DecodedArray, PageHeader, ParquetType, SchemaElement} from 'hyparquet'
* @param {Writer} writer
* @param {PageHeader} header
*/
export function writePageHeader(writer, header) {
@ -46,7 +110,7 @@ export function writePageHeader(writer, header) {
* @param {DecodedArray} values
* @returns {{ definition_levels_byte_length: number, repetition_levels_byte_length: number, num_nulls: number}}
*/
export function writeLevels(writer, schemaPath, values) {
function writeLevels(writer, schemaPath, values) {
let num_nulls = 0
// TODO: repetition levels

@ -62,7 +62,7 @@ describe('parquetWriteBuffer', () => {
const str = 'a'.repeat(10000)
const columnData = [{ name: 'string', data: [str] }]
const file = parquetWriteBuffer({ columnData, compressed: false })
expect(file.byteLength).toBe(10175)
expect(file.byteLength).toBe(10176)
})
it('efficiently serializes column with few distinct values', async () => {