Stream writer

This commit is contained in:
Kenny Daniel 2025-04-07 01:02:21 -07:00
parent daba713202
commit 5e89e80cad
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
3 changed files with 128 additions and 61 deletions

@ -1 +1,2 @@
export { parquetWrite } from './write.js'
export { ParquetWriter } from './parquet-writer.js'

99
src/parquet-writer.js Normal file

@ -0,0 +1,99 @@
import { writeColumn } from './column.js'
import { Writer } from './writer.js'
import { writeMetadata } from './metadata.js'
/**
* Create a new ParquetWriter.
*
* @import {ColumnChunk, FileMetaData, RowGroup, SchemaElement, SchemaTree} from 'hyparquet'
* @import {KeyValue} from 'hyparquet/src/types.js'
* @import {ColumnData} from './types.js'
* @param {object} options
* @param {SchemaElement[]} options.schema
* @param {boolean} [options.compressed]
* @param {boolean} [options.statistics]
* @param {KeyValue[]} [options.kvMetadata]
*/
export function ParquetWriter({ schema, compressed = true, statistics = true, kvMetadata }) {
/** @type {RowGroup[]} */
this.row_groups = []
this.schema = schema
this.compressed = compressed
this.statistics = statistics
this.kvMetadata = kvMetadata
this.num_rows = BigInt(0)
this.writer = new Writer()
// write header PAR1
this.writer.appendUint32(0x31524150)
}
/**
* Write data to the file.
* Will split data into row groups of the specified size.
*
* @param {object} options
* @param {ColumnData[]} options.columnData
* @param {number} [options.rowGroupSize]
*/
ParquetWriter.prototype.write = function({ columnData, rowGroupSize = 100000 }) {
const columnDataRows = columnData[0]?.data?.length || 0
for (let groupStartIndex = 0; groupStartIndex < columnDataRows; groupStartIndex += rowGroupSize) {
const groupStartOffset = this.writer.offset
const groupSize = Math.min(rowGroupSize, columnDataRows - groupStartIndex)
// row group columns
/** @type {ColumnChunk[]} */
const columns = []
// write columns
for (let j = 0; j < columnData.length; j++) {
const { name, data } = columnData[j]
const schemaPath = [this.schema[0], this.schema[j + 1]]
const groupData = data.slice(groupStartIndex, groupStartIndex + groupSize)
const file_offset = BigInt(this.writer.offset)
const meta_data = writeColumn(this.writer, schemaPath, groupData, this.compressed, this.statistics)
// save column chunk metadata
columns.push({
file_path: name,
file_offset,
meta_data,
})
}
this.num_rows += BigInt(groupSize)
this.row_groups.push({
columns,
total_byte_size: BigInt(this.writer.offset - groupStartOffset),
num_rows: BigInt(groupSize),
})
}
}
/**
* Finish writing the file and return the buffer.
*
* @returns {ArrayBuffer}
*/
ParquetWriter.prototype.finish = function() {
// write metadata
/** @type {FileMetaData} */
const metadata = {
version: 2,
created_by: 'hyparquet',
schema: this.schema,
num_rows: this.num_rows,
row_groups: this.row_groups,
metadata_length: 0,
key_value_metadata: this.kvMetadata,
}
// @ts-ignore don't want to actually serialize metadata_length
delete metadata.metadata_length
writeMetadata(this.writer, metadata)
// write footer PAR1
this.writer.appendUint32(0x31524150)
return this.writer.getBuffer()
}

@ -1,7 +1,5 @@
import { writeColumn } from './column.js'
import { Writer } from './writer.js'
import { writeMetadata } from './metadata.js'
import { getSchemaElementForValues } from './schema.js'
import { ParquetWriter } from './parquet-writer.js'
/**
* Write data as parquet to an ArrayBuffer
@ -18,18 +16,40 @@ import { getSchemaElementForValues } from './schema.js'
* @returns {ArrayBuffer}
*/
export function parquetWrite({ columnData, compressed = true, statistics = true, rowGroupSize = 100000, kvMetadata }) {
const num_rows = columnData.length ? BigInt(columnData[0].data.length) : 0n
const writer = new Writer()
const schema = schemaFromColumnData(columnData)
const writer = new ParquetWriter({
schema,
compressed,
statistics,
kvMetadata,
})
// construct schema
writer.write({
columnData,
rowGroupSize,
})
return writer.finish()
}
/**
* Convert column data to schema.
*
* @param {ColumnData[]} columnData
* @returns {SchemaElement[]}
*/
function schemaFromColumnData(columnData) {
/** @type {SchemaElement[]} */
const schema = [{
name: 'root',
num_children: columnData.length,
}]
let num_rows = 0
for (const { name, data, type } of columnData) {
// check if all columns have the same length
if (BigInt(data.length) !== num_rows) {
if (num_rows === 0) {
num_rows = data.length
} else if (num_rows !== data.length) {
throw new Error('columns must have the same length')
}
// auto-detect type
@ -37,58 +57,5 @@ export function parquetWrite({ columnData, compressed = true, statistics = true,
if (!schemaElement.type) throw new Error(`column ${name} cannot determine type`)
schema.push(schemaElement)
}
// write header PAR1
writer.appendUint32(0x31524150)
/** @type {RowGroup[]} */
const row_groups = []
for (let i = 0; i < num_rows; i += rowGroupSize) {
const groupStart = writer.offset
// row group columns
/** @type {ColumnChunk[]} */
const columns = []
// write columns
for (let i = 0; i < columnData.length; i++) {
const { name, data } = columnData[i]
const file_offset = BigInt(writer.offset)
const schemaPath = [schema[0], schema[i + 1]]
const meta_data = writeColumn(writer, schemaPath, data, compressed, statistics)
// save metadata
columns.push({
file_path: name,
file_offset,
meta_data,
})
}
row_groups.push({
columns,
total_byte_size: BigInt(writer.offset - groupStart),
num_rows: BigInt(Math.min(rowGroupSize, Number(num_rows) - i)),
})
}
// write metadata
/** @type {FileMetaData} */
const metadata = {
version: 2,
created_by: 'hyparquet',
schema,
num_rows,
row_groups,
metadata_length: 0,
key_value_metadata: kvMetadata,
}
// @ts-ignore don't want to actually serialize metadata_length
delete metadata.metadata_length
writeMetadata(writer, metadata)
// write footer PAR1
writer.appendUint32(0x31524150)
return writer.getBuffer()
return schema
}