mirror of
https://github.com/asadbek064/hyparquet.git
synced 2026-01-07 03:26:38 +00:00
Plumb ColumnDecoder into convert (#86)
This commit is contained in:
parent
54a9df0acd
commit
67ab9d5e1a
@ -16,7 +16,7 @@ import { deserializeTCompactProtocol } from './thrift.js'
|
||||
* @returns {DecodedArray[]}
|
||||
*/
|
||||
export function readColumn(reader, { groupStart, selectStart, selectEnd }, columnDecoder, onPage) {
|
||||
const { columnName, element, utf8 } = columnDecoder
|
||||
const { columnName } = columnDecoder
|
||||
/** @type {DecodedArray[]} */
|
||||
const chunks = []
|
||||
/** @type {DecodedArray | undefined} */
|
||||
@ -42,7 +42,7 @@ export function readColumn(reader, { groupStart, selectStart, selectEnd }, colum
|
||||
if (header.type === 'DICTIONARY_PAGE') {
|
||||
// assert(!dictionary)
|
||||
dictionary = readPage(reader, header, columnDecoder, dictionary, undefined, 0)
|
||||
dictionary = convert(dictionary, element, utf8)
|
||||
dictionary = convert(dictionary, columnDecoder)
|
||||
} else {
|
||||
const lastChunkLength = lastChunk?.length || 0
|
||||
const values = readPage(reader, header, columnDecoder, dictionary, lastChunk, selectStart - rowCount)
|
||||
@ -78,7 +78,7 @@ export function readColumn(reader, { groupStart, selectStart, selectEnd }, colum
|
||||
* @returns {DecodedArray}
|
||||
*/
|
||||
export function readPage(reader, header, columnDecoder, dictionary, previousChunk, pageStart) {
|
||||
const { type, element, schemaPath, codec, compressors, utf8 } = columnDecoder
|
||||
const { type, element, schemaPath, codec, compressors } = columnDecoder
|
||||
// read compressed_page_size bytes
|
||||
const compressedBytes = new Uint8Array(
|
||||
reader.view.buffer, reader.view.byteOffset + reader.offset, header.compressed_page_size
|
||||
@ -100,7 +100,7 @@ export function readPage(reader, header, columnDecoder, dictionary, previousChun
|
||||
// assert(!daph.statistics?.null_count || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length))
|
||||
|
||||
// convert types, dereference dictionary, and assemble lists
|
||||
let values = convertWithDictionary(dataPage, dictionary, element, daph.encoding, utf8)
|
||||
let values = convertWithDictionary(dataPage, dictionary, daph.encoding, columnDecoder)
|
||||
if (repetitionLevels.length || definitionLevels?.length) {
|
||||
const output = Array.isArray(previousChunk) ? previousChunk : []
|
||||
return assembleLists(output, definitionLevels, repetitionLevels, values, schemaPath)
|
||||
@ -126,7 +126,7 @@ export function readPage(reader, header, columnDecoder, dictionary, previousChun
|
||||
readDataPageV2(compressedBytes, header, columnDecoder)
|
||||
|
||||
// convert types, dereference dictionary, and assemble lists
|
||||
const values = convertWithDictionary(dataPage, dictionary, element, daph2.encoding, utf8)
|
||||
const values = convertWithDictionary(dataPage, dictionary, daph2.encoding, columnDecoder)
|
||||
const output = Array.isArray(previousChunk) ? previousChunk : []
|
||||
return assembleLists(output, definitionLevels, repetitionLevels, values, schemaPath)
|
||||
} else if (header.type === 'DICTIONARY_PAGE') {
|
||||
|
||||
@ -5,12 +5,11 @@ const dayMillis = 86400000 // 1 day in milliseconds
|
||||
*
|
||||
* @param {DecodedArray} data series of primitive types
|
||||
* @param {DecodedArray | undefined} dictionary
|
||||
* @param {SchemaElement} schemaElement
|
||||
* @param {Encoding} encoding
|
||||
* @param {boolean} [utf8] decode bytes as utf8?
|
||||
* @param {ColumnDecoder} columnDecoder
|
||||
* @returns {DecodedArray} series of rich types
|
||||
*/
|
||||
export function convertWithDictionary(data, dictionary, schemaElement, encoding, utf8) {
|
||||
export function convertWithDictionary(data, dictionary, encoding, columnDecoder) {
|
||||
if (dictionary && encoding.endsWith('_DICTIONARY')) {
|
||||
let output = data
|
||||
if (data instanceof Uint8Array && !(dictionary instanceof Uint8Array)) {
|
||||
@ -22,7 +21,7 @@ export function convertWithDictionary(data, dictionary, schemaElement, encoding,
|
||||
}
|
||||
return output
|
||||
} else {
|
||||
return convert(data, schemaElement, utf8)
|
||||
return convert(data, columnDecoder)
|
||||
}
|
||||
}
|
||||
|
||||
@ -30,14 +29,14 @@ export function convertWithDictionary(data, dictionary, schemaElement, encoding,
|
||||
* Convert known types from primitive to rich.
|
||||
*
|
||||
* @param {DecodedArray} data series of primitive types
|
||||
* @param {SchemaElement} schemaElement
|
||||
* @param {boolean} [utf8] decode bytes as utf8?
|
||||
* @param {Pick<ColumnDecoder, "element" | "utf8">} columnDecoder
|
||||
* @returns {DecodedArray} series of rich types
|
||||
*/
|
||||
export function convert(data, schemaElement, utf8 = true) {
|
||||
const { type, converted_type: ctype, logical_type: ltype } = schemaElement
|
||||
export function convert(data, columnDecoder) {
|
||||
const { element, utf8 = true } = columnDecoder
|
||||
const { type, converted_type: ctype, logical_type: ltype } = element
|
||||
if (ctype === 'DECIMAL') {
|
||||
const scale = schemaElement.scale || 0
|
||||
const scale = element.scale || 0
|
||||
const factor = 10 ** -scale
|
||||
const arr = new Array(data.length)
|
||||
for (let i = 0; i < arr.length; i++) {
|
||||
@ -144,7 +143,7 @@ export function parseDecimal(bytes) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @import {DecodedArray, Encoding, SchemaElement} from '../src/types.d.ts'
|
||||
* @import {ColumnDecoder, DecodedArray, Encoding, SchemaElement} from '../src/types.d.ts'
|
||||
* @param {bigint} value
|
||||
* @returns {Date}
|
||||
*/
|
||||
|
||||
@ -2,36 +2,36 @@ import { describe, expect, it } from 'vitest'
|
||||
import { convert, parseDecimal, parseFloat16 } from '../src/convert.js'
|
||||
|
||||
/**
|
||||
* @import {SchemaElement} from '../src/types.js'
|
||||
* @import {ColumnDecoder, SchemaElement} from '../src/types.js'
|
||||
*/
|
||||
|
||||
describe('convert function', () => {
|
||||
const name = 'name'
|
||||
it('returns the same data if converted_type is undefined', () => {
|
||||
const data = [1, 2, 3]
|
||||
const schemaElement = { name }
|
||||
expect(convert(data, schemaElement)).toEqual(data)
|
||||
const element = { name }
|
||||
expect(convert(data, { element })).toEqual(data)
|
||||
})
|
||||
|
||||
it('converts byte arrays to utf8', () => {
|
||||
const data = [new TextEncoder().encode('foo'), new TextEncoder().encode('bar')]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'UTF8' }
|
||||
expect(convert(data, schemaElement)).toEqual(['foo', 'bar'])
|
||||
const element = { name, converted_type: 'UTF8' }
|
||||
expect(convert(data, { element })).toEqual(['foo', 'bar'])
|
||||
})
|
||||
|
||||
it('converts byte arrays to utf8 default true', () => {
|
||||
const data = [new TextEncoder().encode('foo'), new TextEncoder().encode('bar')]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, type: 'BYTE_ARRAY' }
|
||||
expect(convert(data, schemaElement)).toEqual(['foo', 'bar'])
|
||||
const element = { name, type: 'BYTE_ARRAY' }
|
||||
expect(convert(data, { element })).toEqual(['foo', 'bar'])
|
||||
})
|
||||
|
||||
it('preserves byte arrays utf8=false', () => {
|
||||
const data = [new TextEncoder().encode('foo'), new TextEncoder().encode('bar')]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, type: 'BYTE_ARRAY' }
|
||||
expect(convert(data, schemaElement, false)).toEqual([
|
||||
const element = { name, type: 'BYTE_ARRAY' }
|
||||
expect(convert(data, { element, utf8: false })).toEqual([
|
||||
new Uint8Array([102, 111, 111]), new Uint8Array([98, 97, 114]),
|
||||
])
|
||||
})
|
||||
@ -39,65 +39,65 @@ describe('convert function', () => {
|
||||
it('converts numbers to DECIMAL', () => {
|
||||
const data = [100, 200]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'DECIMAL' }
|
||||
expect(convert(data, schemaElement)).toEqual([100, 200])
|
||||
const element = { name, converted_type: 'DECIMAL' }
|
||||
expect(convert(data, { element })).toEqual([100, 200])
|
||||
})
|
||||
|
||||
it('converts numbers to DECIMAL with scale', () => {
|
||||
const data = [100, 200]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'DECIMAL', scale: 2 }
|
||||
expect(convert(data, schemaElement)).toEqual([1, 2])
|
||||
const element = { name, converted_type: 'DECIMAL', scale: 2 }
|
||||
expect(convert(data, { element })).toEqual([1, 2])
|
||||
})
|
||||
|
||||
it('converts bigint to DECIMAL', () => {
|
||||
const data = [1000n, 2000n]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'DECIMAL' }
|
||||
expect(convert(data, schemaElement)).toEqual([1000, 2000])
|
||||
const element = { name, converted_type: 'DECIMAL' }
|
||||
expect(convert(data, { element })).toEqual([1000, 2000])
|
||||
})
|
||||
|
||||
it('converts bigint to DECIMAL with scale', () => {
|
||||
const data = [10n, 20n]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'DECIMAL', scale: 2 }
|
||||
expect(convert(data, schemaElement)).toEqual([0.1, 0.2])
|
||||
const element = { name, converted_type: 'DECIMAL', scale: 2 }
|
||||
expect(convert(data, { element })).toEqual([0.1, 0.2])
|
||||
})
|
||||
|
||||
it('converts byte arrays to DECIMAL', () => {
|
||||
const data = [new Uint8Array([0, 0, 0, 100]), new Uint8Array([0, 0, 0, 200])]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'DECIMAL', scale: 0 }
|
||||
expect(convert(data, schemaElement)).toEqual([100, 200])
|
||||
const element = { name, converted_type: 'DECIMAL', scale: 0 }
|
||||
expect(convert(data, { element })).toEqual([100, 200])
|
||||
})
|
||||
|
||||
it('converts byte array from issue #59 to DECIMAL', () => {
|
||||
const data = [new Uint8Array([18, 83, 137, 151, 156, 0])]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'DECIMAL', scale: 10, precision: 14 }
|
||||
expect(convert(data, schemaElement)).toEqual([2015])
|
||||
const element = { name, converted_type: 'DECIMAL', scale: 10, precision: 14 }
|
||||
expect(convert(data, { element })).toEqual([2015])
|
||||
})
|
||||
|
||||
it('converts epoch time to DATE', () => {
|
||||
const data = [1, 2] // days since epoch
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'DATE' }
|
||||
expect(convert(data, schemaElement)).toEqual([new Date(86400000), new Date(86400000 * 2)])
|
||||
const element = { name, converted_type: 'DATE' }
|
||||
expect(convert(data, { element })).toEqual([new Date(86400000), new Date(86400000 * 2)])
|
||||
})
|
||||
|
||||
it('converts INT96 to DATE', () => {
|
||||
// from alltypes_plain.parquet
|
||||
const data = [45284764452596988585705472n, 45284764452597048585705472n]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, type: 'INT96' }
|
||||
expect(convert(data, schemaElement)).toEqual([new Date('2009-03-01T00:00:00.000Z'), new Date('2009-03-01T00:01:00.000Z')])
|
||||
const element = { name, type: 'INT96' }
|
||||
expect(convert(data, { element })).toEqual([new Date('2009-03-01T00:00:00.000Z'), new Date('2009-03-01T00:01:00.000Z')])
|
||||
})
|
||||
|
||||
it('converts epoch time to TIMESTAMP_MILLIS', () => {
|
||||
const data = [1716506900000n, 1716507000000n]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'TIMESTAMP_MILLIS' }
|
||||
expect(convert(data, schemaElement)).toEqual([
|
||||
const element = { name, converted_type: 'TIMESTAMP_MILLIS' }
|
||||
expect(convert(data, { element })).toEqual([
|
||||
new Date('2024-05-23T23:28:20.000Z'), new Date('2024-05-23T23:30:00.000Z'),
|
||||
])
|
||||
})
|
||||
@ -105,8 +105,8 @@ describe('convert function', () => {
|
||||
it('converts epoch time to TIMESTAMP_MICROS', () => {
|
||||
const data = [1716506900000000n, 1716507000000000n]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'TIMESTAMP_MICROS' }
|
||||
expect(convert(data, schemaElement)).toEqual([
|
||||
const element = { name, converted_type: 'TIMESTAMP_MICROS' }
|
||||
expect(convert(data, { element })).toEqual([
|
||||
new Date('2024-05-23T23:28:20.000Z'), new Date('2024-05-23T23:30:00.000Z'),
|
||||
])
|
||||
})
|
||||
@ -116,29 +116,29 @@ describe('convert function', () => {
|
||||
const data = ['{"key": true}', '{"quay": 314}']
|
||||
.map(str => encoder.encode(str))
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'JSON' }
|
||||
expect(convert(data, schemaElement)).toEqual([{ key: true }, { quay: 314 }])
|
||||
const element = { name, converted_type: 'JSON' }
|
||||
expect(convert(data, { element })).toEqual([{ key: true }, { quay: 314 }])
|
||||
})
|
||||
|
||||
it('converts uint64', () => {
|
||||
const data = [100n, -100n]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'UINT_64' }
|
||||
expect(convert(data, schemaElement)).toEqual(new BigUint64Array([100n, 18446744073709551516n]))
|
||||
const element = { name, converted_type: 'UINT_64' }
|
||||
expect(convert(data, { element })).toEqual(new BigUint64Array([100n, 18446744073709551516n]))
|
||||
})
|
||||
|
||||
it('converts to float16', () => {
|
||||
const data = [new Uint8Array([0x00, 0x3c]), new Uint8Array([0x00, 0x40])]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, logical_type: { type: 'FLOAT16' } }
|
||||
expect(convert(data, schemaElement)).toEqual([1, 2])
|
||||
const element = { name, logical_type: { type: 'FLOAT16' } }
|
||||
expect(convert(data, { element })).toEqual([1, 2])
|
||||
})
|
||||
|
||||
it('converts timestamp with units', () => {
|
||||
const data = [1716506900000000n, 1716507000000000n]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, logical_type: { type: 'TIMESTAMP', isAdjustedToUTC: true, unit: 'MICROS' } }
|
||||
expect(convert(data, schemaElement)).toEqual([
|
||||
const element = { name, logical_type: { type: 'TIMESTAMP', isAdjustedToUTC: true, unit: 'MICROS' } }
|
||||
expect(convert(data, { element })).toEqual([
|
||||
new Date('2024-05-23T23:28:20.000Z'), new Date('2024-05-23T23:30:00.000Z'),
|
||||
])
|
||||
})
|
||||
@ -146,16 +146,16 @@ describe('convert function', () => {
|
||||
it('throws error for BSON conversion', () => {
|
||||
const data = [{}]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'BSON' }
|
||||
expect(() => convert(data, schemaElement))
|
||||
const element = { name, converted_type: 'BSON' }
|
||||
expect(() => convert(data, { element }))
|
||||
.toThrow('parquet bson not supported')
|
||||
})
|
||||
|
||||
it('throws error for INTERVAL conversion', () => {
|
||||
const data = [{}]
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = { name, converted_type: 'INTERVAL' }
|
||||
expect(() => convert(data, schemaElement))
|
||||
const element = { name, converted_type: 'INTERVAL' }
|
||||
expect(() => convert(data, { element }))
|
||||
.toThrow('parquet interval not supported')
|
||||
})
|
||||
})
|
||||
|
||||
Loading…
Reference in New Issue
Block a user