mirror of
https://github.com/asadbek064/hyparquet.git
synced 2026-01-10 04:56:38 +00:00
Split out convert function
This commit is contained in:
parent
c70b3b2227
commit
ca971ccc01
@ -28,7 +28,7 @@
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/node": "20.11.20",
|
||||
"@typescript-eslint/eslint-plugin": "7.0.2",
|
||||
"@typescript-eslint/eslint-plugin": "7.1.0",
|
||||
"@vitest/coverage-v8": "1.3.1",
|
||||
"eslint": "8.57.0",
|
||||
"eslint-plugin-import": "2.29.1",
|
||||
|
||||
@ -1,18 +1,16 @@
|
||||
import { Encoding, PageType } from './constants.js'
|
||||
import { convert } from './convert.js'
|
||||
import { assembleObjects, readDataPage, readDictionaryPage } from './datapage.js'
|
||||
import { parquetHeader } from './header.js'
|
||||
import { getMaxDefinitionLevel, isRequired, schemaElement } from './schema.js'
|
||||
import { snappyUncompress } from './snappy.js'
|
||||
|
||||
/**
|
||||
* @typedef {import('./types.js').ArrayBufferLike} ArrayBufferLike
|
||||
* @typedef {import('./types.js').SchemaElement} SchemaElement
|
||||
* @typedef {import('./types.js').ColumnMetaData} ColumnMetaData
|
||||
* @typedef {import('./types.js').RowGroup} RowGroup
|
||||
*/
|
||||
|
||||
const dayMillis = 86400000000000 // 1 day in milliseconds
|
||||
|
||||
/**
|
||||
* Parse column data from a buffer.
|
||||
*
|
||||
@ -133,63 +131,6 @@ export function getColumnOffset(columnMetadata) {
|
||||
return Number(columnOffset)
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert known types from primitive to rich.
|
||||
*
|
||||
* @param {any[]} data series of primitive types
|
||||
* @param {SchemaElement} schemaElement schema element for the data
|
||||
* @returns {any[]} series of rich types
|
||||
*/
|
||||
function convert(data, schemaElement) {
|
||||
const ctype = schemaElement.converted_type
|
||||
if (ctype === undefined) return data
|
||||
if (ctype === 'UTF8') {
|
||||
const decoder = new TextDecoder()
|
||||
return data.map(v => decoder.decode(v))
|
||||
}
|
||||
if (ctype === 'DECIMAL') {
|
||||
const scaleFactor = Math.pow(10, schemaElement.scale || 0)
|
||||
if (typeof data[0] === 'number') {
|
||||
return scaleFactor === 1 ? data : data.map(v => v * scaleFactor)
|
||||
} else if (typeof data[0] === 'bigint') {
|
||||
return scaleFactor === 1 ? data : data.map(v => Number(v) * scaleFactor)
|
||||
} else {
|
||||
return data.map(v => parseDecimal(v) * scaleFactor)
|
||||
}
|
||||
}
|
||||
if (ctype === 'DATE') {
|
||||
return data.map(v => new Date(v * dayMillis))
|
||||
}
|
||||
if (ctype === 'TIME_MILLIS') {
|
||||
return data.map(v => new Date(v))
|
||||
}
|
||||
if (ctype === 'JSON') {
|
||||
return data.map(v => JSON.parse(v))
|
||||
}
|
||||
if (ctype === 'BSON') {
|
||||
throw new Error('parquet bson not supported')
|
||||
}
|
||||
if (ctype === 'INTERVAL') {
|
||||
throw new Error('parquet interval not supported')
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse decimal from byte array.
|
||||
*
|
||||
* @param {Uint8Array} bytes
|
||||
* @returns {number}
|
||||
*/
|
||||
function parseDecimal(bytes) {
|
||||
// TODO: handle signed
|
||||
let value = 0
|
||||
for (const byte of bytes) {
|
||||
value = value << 8 | byte
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef {import('./types.js').PageHeader} PageHeader
|
||||
* @typedef {import('./types.js').CompressionCodec} CompressionCodec
|
||||
|
||||
61
src/convert.js
Normal file
61
src/convert.js
Normal file
@ -0,0 +1,61 @@
|
||||
/**
|
||||
* @typedef {import('./types.js').SchemaElement} SchemaElement
|
||||
*/
|
||||
|
||||
const dayMillis = 86400000000000 // 1 day in milliseconds
|
||||
|
||||
/**
|
||||
* Convert known types from primitive to rich.
|
||||
*
|
||||
* @param {any[]} data series of primitive types
|
||||
* @param {SchemaElement} schemaElement schema element for the data
|
||||
* @returns {any[]} series of rich types
|
||||
*/
|
||||
export function convert(data, schemaElement) {
|
||||
const ctype = schemaElement.converted_type
|
||||
if (ctype === 'UTF8') {
|
||||
const decoder = new TextDecoder()
|
||||
return data.map(v => decoder.decode(v))
|
||||
}
|
||||
if (ctype === 'DECIMAL') {
|
||||
const scaleFactor = schemaElement.scale ? Math.pow(10, schemaElement.scale) : 1
|
||||
if (typeof data[0] === 'number') {
|
||||
return scaleFactor === 1 ? data : data.map(v => v * scaleFactor)
|
||||
} else if (typeof data[0] === 'bigint') {
|
||||
return scaleFactor === 1 ? data : data.map(v => v * BigInt(scaleFactor))
|
||||
} else {
|
||||
return data.map(v => parseDecimal(v) * scaleFactor)
|
||||
}
|
||||
}
|
||||
if (ctype === 'DATE') {
|
||||
return data.map(v => new Date(v * dayMillis))
|
||||
}
|
||||
if (ctype === 'TIME_MILLIS') {
|
||||
return data.map(v => new Date(v))
|
||||
}
|
||||
if (ctype === 'JSON') {
|
||||
return data.map(v => JSON.parse(v))
|
||||
}
|
||||
if (ctype === 'BSON') {
|
||||
throw new Error('parquet bson not supported')
|
||||
}
|
||||
if (ctype === 'INTERVAL') {
|
||||
throw new Error('parquet interval not supported')
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse decimal from byte array.
|
||||
*
|
||||
* @param {Uint8Array} bytes
|
||||
* @returns {number}
|
||||
*/
|
||||
function parseDecimal(bytes) {
|
||||
// TODO: handle signed
|
||||
let value = 0
|
||||
for (const byte of bytes) {
|
||||
value = value << 8 | byte
|
||||
}
|
||||
return value
|
||||
}
|
||||
@ -11,7 +11,6 @@ import { deserializeTCompactProtocol } from './thrift.js'
|
||||
/**
|
||||
* Read parquet header from a buffer.
|
||||
*
|
||||
* @typedef {import("./types.d.ts").ArrayBufferLike} ArrayBufferLike
|
||||
* @typedef {import("./types.d.ts").PageHeader} PageHeader
|
||||
* @param {ArrayBuffer} arrayBuffer parquet file contents
|
||||
* @param {number} offset offset to start reading from
|
||||
|
||||
8
src/types.d.ts
vendored
8
src/types.d.ts
vendored
@ -6,14 +6,6 @@ export interface AsyncBuffer {
|
||||
slice(start: number, end?: number): Promise<ArrayBuffer>
|
||||
}
|
||||
|
||||
/**
|
||||
* Just like an ArrayBuffer, but an interface
|
||||
*/
|
||||
export interface ArrayBufferLike {
|
||||
byteLength: number
|
||||
slice(start: number, end?: number): ArrayBuffer
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a decoded value, and includes the number of bytes read.
|
||||
* This is used to read data from the file and advance a virtual file pointer.
|
||||
|
||||
95
test/convert.test.js
Normal file
95
test/convert.test.js
Normal file
@ -0,0 +1,95 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { convert } from '../src/convert.js'
|
||||
|
||||
/**
|
||||
* @typedef {import('../src/types.js').SchemaElement} SchemaElement
|
||||
*/
|
||||
|
||||
describe('convert function', () => {
|
||||
const name = 'name'
|
||||
it('returns the same data if converted_type is undefined', () => {
|
||||
const data = [1, 2, 3]
|
||||
const schemaElement = { name }
|
||||
expect(convert(data, schemaElement)).toEqual(data)
|
||||
})
|
||||
|
||||
it('converts byte arrays to UTF8 strings', () => {
|
||||
const data = [new TextEncoder().encode('test'), new TextEncoder().encode('vitest')]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'UTF8' }
|
||||
expect(convert(data, schemaElement)).toEqual(['test', 'vitest'])
|
||||
})
|
||||
|
||||
it('converts numbers to DECIMAL', () => {
|
||||
const data = [100, 200]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'DECIMAL' }
|
||||
expect(convert(data, schemaElement)).toEqual([100, 200])
|
||||
})
|
||||
|
||||
it('converts numbers to DECIMAL with scale', () => {
|
||||
const data = [100, 200]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'DECIMAL', scale: 2 }
|
||||
expect(convert(data, schemaElement)).toEqual([10000, 20000])
|
||||
})
|
||||
|
||||
it('converts bigint to DECIMAL', () => {
|
||||
const data = [BigInt(1000), BigInt(2000)]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'DECIMAL' }
|
||||
expect(convert(data, schemaElement)).toEqual([1000n, 2000n])
|
||||
})
|
||||
|
||||
it('converts bigint to DECIMAL with scale', () => {
|
||||
const data = [BigInt(1000), BigInt(2000)]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'DECIMAL', scale: 3 }
|
||||
expect(convert(data, schemaElement)).toEqual([1000000n, 2000000n])
|
||||
})
|
||||
|
||||
it('converts byte arrays to DECIMAL', () => {
|
||||
const data = [new Uint8Array([0, 0, 0, 100]), new Uint8Array([0, 0, 0, 200])]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'DECIMAL', scale: 0 }
|
||||
expect(convert(data, schemaElement)).toEqual([100, 200])
|
||||
})
|
||||
|
||||
it('converts epoch time to DATE', () => {
|
||||
const data = [1, 2] // days since epoch
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'DATE' }
|
||||
expect(convert(data, schemaElement)).toEqual([new Date(86400000000000), new Date(86400000000000 * 2)])
|
||||
})
|
||||
|
||||
it('converts milliseconds to TIME_MILLIS', () => {
|
||||
const now = Date.now()
|
||||
const data = [now]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'TIME_MILLIS' }
|
||||
expect(convert(data, schemaElement)).toEqual([new Date(now)])
|
||||
})
|
||||
|
||||
it('parses strings to JSON', () => {
|
||||
const data = ['{"key": true}', '{"quay": 314}']
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'JSON' }
|
||||
expect(convert(data, schemaElement)).toEqual([{ key: true }, { quay: 314 }])
|
||||
})
|
||||
|
||||
it('throws error for BSON conversion', () => {
|
||||
const data = [{}]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'BSON' }
|
||||
expect(() => convert(data, schemaElement))
|
||||
.toThrow('parquet bson not supported')
|
||||
})
|
||||
|
||||
it('throws error for INTERVAL conversion', () => {
|
||||
const data = [{}]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'INTERVAL' }
|
||||
expect(() => convert(data, schemaElement))
|
||||
.toThrow('parquet interval not supported')
|
||||
})
|
||||
})
|
||||
Loading…
Reference in New Issue
Block a user