mirror of
https://github.com/asadbek064/hyparquet-writer.git
synced 2025-12-05 23:31:54 +00:00
Handle byte array vs string, and change parquetWrite column api
This commit is contained in:
parent
abe825ded4
commit
c40e7c4320
3
.gitignore
vendored
3
.gitignore
vendored
@ -1,5 +1,6 @@
|
||||
coverage
|
||||
node_modules
|
||||
package-lock.json
|
||||
dist
|
||||
*.tgz
|
||||
.DS_Store
|
||||
/types
|
||||
|
||||
16
README.md
16
README.md
@ -1,17 +1,21 @@
|
||||
# Hyparquet Writer
|
||||
|
||||
[](https://github.com/hyparam/hyparquet-writer/actions)
|
||||
[](https://opensource.org/licenses/MIT)
|
||||

|
||||
[](https://www.npmjs.com/package/hyparquet?activeTab=dependencies)
|
||||
|
||||
## Usage
|
||||
|
||||
```javascript
|
||||
import { writeParquet } from 'hyparquet-writer'
|
||||
Call `parquetWrite` with a list of columns, each column is an object with a `name` and `data` field. The `data` field should be an array of same-type values.
|
||||
|
||||
const arrayBuffer = writeParquet({
|
||||
name: ['Alice', 'Bob', 'Charlie'],
|
||||
age: [25, 30, 35],
|
||||
})
|
||||
```javascript
|
||||
import { parquetWrite } from 'hyparquet-writer'
|
||||
|
||||
const arrayBuffer = parquetWrite([
|
||||
{ name: 'name', data: ['Alice', 'Bob', 'Charlie'] },
|
||||
{ name: 'age', data: [25, 30, 35] },
|
||||
])
|
||||
```
|
||||
|
||||
## References
|
||||
|
||||
18
package.json
18
package.json
@ -6,7 +6,8 @@
|
||||
"homepage": "https://hyperparam.app",
|
||||
"keywords": [
|
||||
"hyparquet",
|
||||
"parquet"
|
||||
"parquet",
|
||||
"thrift"
|
||||
],
|
||||
"license": "MIT",
|
||||
"repository": {
|
||||
@ -15,14 +16,27 @@
|
||||
},
|
||||
"main": "src/index.js",
|
||||
"files": [
|
||||
"src"
|
||||
"src",
|
||||
"types"
|
||||
],
|
||||
"type": "module",
|
||||
"types": "src/index.d.ts",
|
||||
"exports": {
|
||||
".": {
|
||||
"import": "./src/index.js",
|
||||
"types": "./types/index.d.ts"
|
||||
},
|
||||
"./src/*.js": {
|
||||
"import": "./src/*.js",
|
||||
"types": "./types/*.d.ts"
|
||||
}
|
||||
},
|
||||
"scripts": {
|
||||
"build:types": "tsc -p ./tsconfig.build.json",
|
||||
"coverage": "vitest run --coverage",
|
||||
"lint": "eslint",
|
||||
"lint:fix": "eslint --fix",
|
||||
"prepare": "npm run build:types",
|
||||
"test": "vitest run"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
@ -13,9 +13,14 @@ export function unconvert(schemaElement, values) {
|
||||
return values.map(v => v.getTime())
|
||||
}
|
||||
if (ctype === 'JSON') {
|
||||
const encoder = new TextEncoder()
|
||||
if (!Array.isArray(values)) throw new Error('JSON must be an array')
|
||||
const encoder = new TextEncoder()
|
||||
return values.map(v => encoder.encode(JSON.stringify(v)))
|
||||
}
|
||||
if (ctype === 'UTF8') {
|
||||
if (!Array.isArray(values)) throw new Error('strings must be an array')
|
||||
const encoder = new TextEncoder()
|
||||
return values.map(v => encoder.encode(v))
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
@ -84,10 +84,9 @@ function writePlainDouble(writer, values) {
|
||||
* @param {DecodedArray} values
|
||||
*/
|
||||
function writePlainByteArray(writer, values) {
|
||||
const encoder = new TextEncoder()
|
||||
for (const value of values) {
|
||||
const bytes = typeof value === 'string' ? encoder.encode(value) : value
|
||||
writer.appendUint32(bytes.length)
|
||||
writer.appendBytes(bytes)
|
||||
if (!(value instanceof Uint8Array)) throw new Error('BYTE_ARRAY must be Uint8Array')
|
||||
writer.appendUint32(value.length)
|
||||
writer.appendBytes(value)
|
||||
}
|
||||
}
|
||||
|
||||
@ -29,7 +29,13 @@ export function getSchemaElementForValues(name, values) {
|
||||
else if (typeof value === 'bigint') valueType = 'INT64'
|
||||
else if (Number.isInteger(value)) valueType = 'INT32'
|
||||
else if (typeof value === 'number') valueType = 'DOUBLE'
|
||||
else if (typeof value === 'string') valueType = 'BYTE_ARRAY'
|
||||
else if (value instanceof Uint8Array) valueType = 'BYTE_ARRAY'
|
||||
else if (typeof value === 'string') {
|
||||
valueType = 'BYTE_ARRAY'
|
||||
// make sure they are all strings
|
||||
if (type && !converted_type) throw new Error('mixed types not supported')
|
||||
converted_type = 'UTF8'
|
||||
}
|
||||
else if (value instanceof Date) {
|
||||
valueType = 'INT64'
|
||||
// make sure they are all dates
|
||||
|
||||
@ -21,7 +21,7 @@ const CompactType = {
|
||||
*
|
||||
* Expects keys named like "field_1", "field_2", etc. in ascending order.
|
||||
*
|
||||
* @import {Writer} from './types.js'
|
||||
* @import {Writer} from '../src/types.js'
|
||||
* @param {Writer} writer
|
||||
* @param {Record<string, any>} data
|
||||
*/
|
||||
|
||||
6
src/types.d.ts
vendored
6
src/types.d.ts
vendored
@ -1,3 +1,9 @@
|
||||
import { DecodedArray, ParquetType } from "hyparquet"
|
||||
|
||||
export interface ColumnData {
|
||||
name: string
|
||||
data: DecodedArray
|
||||
}
|
||||
|
||||
export interface Writer {
|
||||
buffer: ArrayBuffer
|
||||
|
||||
20
src/write.js
20
src/write.js
@ -7,17 +7,17 @@ import { getSchemaElementForValues } from './schema.js'
|
||||
* Write data as parquet to an ArrayBuffer
|
||||
*
|
||||
* @import {ColumnChunk, DecodedArray, FileMetaData, SchemaElement, SchemaTree} from 'hyparquet'
|
||||
* @param {Record<string, DecodedArray>} columnData
|
||||
* @import {ColumnData} from '../src/types.js'
|
||||
* @param {ColumnData[]} columnData
|
||||
* @returns {ArrayBuffer}
|
||||
*/
|
||||
export function parquetWrite(columnData) {
|
||||
const writer = new Writer()
|
||||
|
||||
// Check if all columns have the same length
|
||||
const columnNames = Object.keys(columnData)
|
||||
const num_rows = columnNames.length ? BigInt(columnData[columnNames[0]].length) : 0n
|
||||
for (const name of columnNames) {
|
||||
if (BigInt(columnData[name].length) !== num_rows) {
|
||||
const num_rows = columnData.length ? BigInt(columnData[0].data.length) : 0n
|
||||
for (const { data } of columnData) {
|
||||
if (BigInt(data.length) !== num_rows) {
|
||||
throw new Error('parquetWrite: all columns must have the same length')
|
||||
}
|
||||
}
|
||||
@ -29,7 +29,7 @@ export function parquetWrite(columnData) {
|
||||
/** @type {SchemaElement[]} */
|
||||
const schema = [{
|
||||
name: 'root',
|
||||
num_children: columnNames.length,
|
||||
num_children: columnData.length,
|
||||
}]
|
||||
|
||||
// row group columns
|
||||
@ -37,9 +37,9 @@ export function parquetWrite(columnData) {
|
||||
const columns = []
|
||||
|
||||
// Write columns
|
||||
for (const name of columnNames) {
|
||||
const values = columnData[name]
|
||||
const schemaElement = getSchemaElementForValues(name, values)
|
||||
for (const { name, data } of columnData) {
|
||||
// auto-detect type
|
||||
const schemaElement = getSchemaElementForValues(name, data)
|
||||
if (!schemaElement.type) throw new Error(`column ${name} cannot determine type`)
|
||||
const file_offset = BigInt(writer.offset)
|
||||
/** @type {SchemaElement[]} */
|
||||
@ -47,7 +47,7 @@ export function parquetWrite(columnData) {
|
||||
schema[0],
|
||||
schemaElement,
|
||||
]
|
||||
const meta_data = writeColumn(writer, schemaPath, values)
|
||||
const meta_data = writeColumn(writer, schemaPath, data)
|
||||
|
||||
// save metadata
|
||||
schema.push(schemaElement)
|
||||
|
||||
@ -2,8 +2,7 @@
|
||||
/**
|
||||
* Self-expanding buffer view
|
||||
*
|
||||
* @import {Writer} from './types.js'
|
||||
* @returns {Writer}
|
||||
* @returns {import('../src/types.js').Writer}
|
||||
*/
|
||||
export function Writer() {
|
||||
this.buffer = new ArrayBuffer(1024)
|
||||
|
||||
@ -16,7 +16,7 @@ export const exampleMetadata = {
|
||||
{ name: 'int', type: 'INT32', repetition_type: 'REQUIRED' },
|
||||
{ name: 'bigint', type: 'INT64', repetition_type: 'REQUIRED' },
|
||||
{ name: 'double', type: 'DOUBLE', repetition_type: 'REQUIRED' },
|
||||
{ name: 'string', type: 'BYTE_ARRAY', repetition_type: 'REQUIRED' },
|
||||
{ name: 'string', type: 'BYTE_ARRAY', repetition_type: 'REQUIRED', converted_type: 'UTF8' },
|
||||
{ name: 'nullable', type: 'BOOLEAN', repetition_type: 'OPTIONAL' },
|
||||
],
|
||||
num_rows: 4n,
|
||||
@ -110,7 +110,7 @@ export const exampleMetadata = {
|
||||
total_byte_size: 236n,
|
||||
num_rows: 4n,
|
||||
}],
|
||||
metadata_length: 336,
|
||||
metadata_length: 338,
|
||||
}
|
||||
|
||||
describe('writeMetadata', () => {
|
||||
|
||||
@ -62,7 +62,10 @@ describe('writePlain', () => {
|
||||
it('writes BYTE_ARRAY', () => {
|
||||
const writer = new Writer()
|
||||
const strings = ['a', 'b', 'c', 'd']
|
||||
writePlain(writer, strings, 'BYTE_ARRAY')
|
||||
// strings must be pre-converted to Uint8Array
|
||||
const encoder = new TextEncoder()
|
||||
const bytes = strings.map(s => encoder.encode(s))
|
||||
writePlain(writer, bytes, 'BYTE_ARRAY')
|
||||
|
||||
let offset = 0
|
||||
for (const s of strings) {
|
||||
|
||||
@ -6,32 +6,33 @@ import { exampleMetadata } from './metadata.test.js'
|
||||
/**
|
||||
* Utility to encode a parquet file and then read it back into a JS object.
|
||||
*
|
||||
* @param {Record<string, any[]>} columnData
|
||||
* @import {ColumnData} from '../src/types.js'
|
||||
* @param {ColumnData[]} columnData
|
||||
* @returns {Promise<Record<string, any>>}
|
||||
*/
|
||||
async function roundTripDeserialize(columnData) {
|
||||
const file = parquetWrite(columnData)
|
||||
return await parquetReadObjects({ file })
|
||||
return await parquetReadObjects({ file, utf8: false })
|
||||
}
|
||||
|
||||
const data = {
|
||||
bool: [true, false, true, false], // BOOLEAN
|
||||
int: [0, 127, 0x7fff, 0x7fffffff], // INT32
|
||||
bigint: [0n, 127n, 0x7fffn, 0x7fffffffffffffffn], // INT64
|
||||
double: [0, 0.0001, 123.456, 1e100], // DOUBLE
|
||||
string: ['a', 'b', 'c', 'd'], // BYTE_ARRAY
|
||||
nullable: [true, false, null, null], // BOOLEAN nullable
|
||||
}
|
||||
const basicData = [
|
||||
{ name: 'bool', data: [true, false, true, false] },
|
||||
{ name: 'int', data: [0, 127, 0x7fff, 0x7fffffff] },
|
||||
{ name: 'bigint', data: [0n, 127n, 0x7fffn, 0x7fffffffffffffffn] },
|
||||
{ name: 'double', data: [0, 0.0001, 123.456, 1e100] },
|
||||
{ name: 'string', data: ['a', 'b', 'c', 'd'] },
|
||||
{ name: 'nullable', data: [true, false, null, null] },
|
||||
]
|
||||
|
||||
describe('parquetWrite', () => {
|
||||
it('writes expected metadata', () => {
|
||||
const file = parquetWrite(data)
|
||||
const file = parquetWrite(basicData)
|
||||
const metadata = parquetMetadata(file)
|
||||
expect(metadata).toEqual(exampleMetadata)
|
||||
})
|
||||
|
||||
it('serializes basic types', async () => {
|
||||
const result = await roundTripDeserialize(data)
|
||||
const result = await roundTripDeserialize(basicData)
|
||||
expect(result).toEqual([
|
||||
{ bool: true, int: 0, bigint: 0n, double: 0, string: 'a', nullable: true },
|
||||
{ bool: false, int: 127, bigint: 127n, double: 0.0001, string: 'b', nullable: false },
|
||||
@ -46,16 +47,17 @@ describe('parquetWrite', () => {
|
||||
bool[100] = false
|
||||
bool[500] = true
|
||||
bool[9999] = false
|
||||
const buffer = parquetWrite({ bool })
|
||||
const buffer = parquetWrite([{ name: 'bool', data: bool }])
|
||||
expect(buffer.byteLength).toBe(1399)
|
||||
const metadata = parquetMetadata(buffer)
|
||||
expect(metadata.metadata_length).toBe(89)
|
||||
})
|
||||
|
||||
it('serializes list types', async () => {
|
||||
const result = await roundTripDeserialize({
|
||||
list: [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]],
|
||||
})
|
||||
const result = await roundTripDeserialize([{
|
||||
name: 'list',
|
||||
data: [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]],
|
||||
}])
|
||||
expect(result).toEqual([
|
||||
{ list: [1, 2, 3] },
|
||||
{ list: [4, 5, 6] },
|
||||
@ -65,9 +67,10 @@ describe('parquetWrite', () => {
|
||||
})
|
||||
|
||||
it('serializes object types', async () => {
|
||||
const result = await roundTripDeserialize({
|
||||
obj: [{ a: 1, b: 2 }, { a: 3, b: 4 }, { a: 5, b: 6 }, { a: 7, b: 8 }],
|
||||
})
|
||||
const result = await roundTripDeserialize([{
|
||||
name: 'obj',
|
||||
data: [{ a: 1, b: 2 }, { a: 3, b: 4 }, { a: 5, b: 6 }, { a: 7, b: 8 }],
|
||||
}])
|
||||
expect(result).toEqual([
|
||||
{ obj: { a: 1, b: 2 } },
|
||||
{ obj: { a: 3, b: 4 } },
|
||||
@ -77,9 +80,10 @@ describe('parquetWrite', () => {
|
||||
})
|
||||
|
||||
it('serializes date types', async () => {
|
||||
const result = await roundTripDeserialize({
|
||||
date: [new Date(0), new Date(100000), new Date(200000), new Date(300000)],
|
||||
})
|
||||
const result = await roundTripDeserialize([{
|
||||
name: 'date',
|
||||
data: [new Date(0), new Date(100000), new Date(200000), new Date(300000)],
|
||||
}])
|
||||
expect(result).toEqual([
|
||||
{ date: new Date(0) },
|
||||
{ date: new Date(100000) },
|
||||
@ -88,8 +92,21 @@ describe('parquetWrite', () => {
|
||||
])
|
||||
})
|
||||
|
||||
it('serializes byte array types', async () => {
|
||||
const result = await roundTripDeserialize([{
|
||||
name: 'bytes',
|
||||
data: [Uint8Array.of(1, 2, 3), Uint8Array.of(4, 5, 6), Uint8Array.of(7, 8, 9), Uint8Array.of(10, 11, 12)],
|
||||
}])
|
||||
expect(result).toEqual([
|
||||
{ bytes: Uint8Array.of(1, 2, 3) },
|
||||
{ bytes: Uint8Array.of(4, 5, 6) },
|
||||
{ bytes: Uint8Array.of(7, 8, 9) },
|
||||
{ bytes: Uint8Array.of(10, 11, 12) },
|
||||
])
|
||||
})
|
||||
|
||||
it('throws for mixed types', () => {
|
||||
expect(() => parquetWrite({ mixed: [1, 2, 3, 'boom'] }))
|
||||
.toThrow('parquet cannot write mixed types: INT32 and BYTE_ARRAY')
|
||||
expect(() => parquetWrite([{ name: 'mixed', data: [1, 2, 3, 'boom'] }]))
|
||||
.toThrow('mixed types not supported')
|
||||
})
|
||||
})
|
||||
|
||||
11
tsconfig.build.json
Normal file
11
tsconfig.build.json
Normal file
@ -0,0 +1,11 @@
|
||||
{
|
||||
"extends": "./tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"noEmit": false,
|
||||
"declaration": true,
|
||||
"emitDeclarationOnly": true,
|
||||
"outDir": "types",
|
||||
"declarationMap": true
|
||||
},
|
||||
"include": ["src"]
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user