This commit is contained in:
Kenny Daniel 2024-05-13 09:22:55 -07:00
parent 7ae1f88047
commit c83aa2ea5b
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
8 changed files with 153 additions and 8 deletions

@ -45,6 +45,10 @@ export function convert(data, schemaElement) {
if (ctype === 'INTERVAL') {
throw new Error('parquet interval not supported')
}
const logicalType = schemaElement.logical_type?.type
if (logicalType === 'FLOAT16') {
return Array.from(data).map(parseFloat16)
}
return data
}
@ -71,3 +75,18 @@ function parseInt96Date(value) {
const millis = days * dayMillis + nano
return new Date(millis)
}
/**
* @param {Uint8Array | undefined} bytes
* @returns {number | undefined}
*/
export function parseFloat16(bytes) {
if (!bytes) return undefined
const int16 = (bytes[1] << 8) | bytes[0]
const sign = int16 >> 15 ? -1 : 1
const exp = (int16 >> 10) & 0x1f
const frac = int16 & 0x3ff
if (exp === 0) return sign * Math.pow(2, -14) * (frac / 1024) // subnormals
if (exp === 0x1f) return frac ? NaN : sign * Infinity
return sign * Math.pow(2, exp - 15) * (1 + frac / 1024)
}

@ -1,4 +1,5 @@
import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, ParquetType } from './constants.js'
import { parseFloat16 } from './convert.js'
import { getSchemaPath } from './schema.js'
import { deserializeTCompactProtocol } from './thrift.js'
@ -24,6 +25,7 @@ import { deserializeTCompactProtocol } from './thrift.js'
*
* @typedef {import("./types.d.ts").AsyncBuffer} AsyncBuffer
* @typedef {import("./types.d.ts").FileMetaData} FileMetaData
* @typedef {import("./types.d.ts").SchemaElement} SchemaElement
* @param {AsyncBuffer} asyncBuffer parquet file contents
* @param {number} initialFetchSize initial fetch size in bytes
* @returns {Promise<FileMetaData>} parquet metadata object
@ -103,6 +105,7 @@ export function parquetMetadata(arrayBuffer) {
// Parse metadata from thrift data
const version = metadata.field_1
/** @type {SchemaElement[]} */
const schema = metadata.field_2.map((/** @type {any} */ field) => ({
type: ParquetType[field.field_1],
type_length: field.field_2,
@ -115,8 +118,8 @@ export function parquetMetadata(arrayBuffer) {
field_id: field.field_9,
logical_type: logicalType(field.field_10),
}))
// @ts-expect-error get types by column index
const columnTypes = schema.map(e => e.type).filter(e => e)
// schema element per column index
const columnSchema = schema.filter(e => e.type)
const num_rows = metadata.field_3
const row_groups = metadata.field_4.map((/** @type {any} */ rowGroup) => ({
columns: rowGroup.field_1.map((/** @type {any} */ column, /** @type {number} */ columnIndex) => ({
@ -134,7 +137,7 @@ export function parquetMetadata(arrayBuffer) {
data_page_offset: column.field_3.field_9,
index_page_offset: column.field_3.field_10,
dictionary_page_offset: column.field_3.field_11,
statistics: columnStats(column.field_3.field_12, columnTypes[columnIndex]),
statistics: columnStats(column.field_3.field_12, columnSchema[columnIndex]),
encoding_stats: column.field_3.field_13?.map((/** @type {any} */ encodingStat) => ({
page_type: encodingStat.field_1,
encoding: Encoding[encodingStat.field_2],
@ -235,10 +238,11 @@ function logicalType(logicalType) {
* Convert column statistics based on column type.
*
* @param {any} stats
* @param {import("./types.d.ts").ParquetType} type
* @param {SchemaElement} schema
* @returns {import("./types.d.ts").Statistics}
*/
function columnStats(stats, type) {
function columnStats(stats, schema) {
const { type, logical_type } = schema
function convert(/** @type {Uint8Array} */ value) {
if (value === undefined) return value
if (type === 'BOOLEAN') return value[0] === 1
@ -259,6 +263,9 @@ function columnStats(stats, type) {
const view = new DataView(value.buffer, value.byteOffset, value.byteLength)
return view.getFloat64(0, true)
}
if (logical_type?.type === 'FLOAT16') {
return parseFloat16(value)
}
return value
}
return stats && {

2
src/types.d.ts vendored

@ -44,7 +44,7 @@ export interface SchemaElement {
scale?: number
precision?: number
field_id?: number
logicalType?: LogicalType
logical_type?: LogicalType
}
export type ParquetType =

@ -1,5 +1,5 @@
import { describe, expect, it } from 'vitest'
import { convert } from '../src/convert.js'
import { convert, parseFloat16 } from '../src/convert.js'
/**
* @typedef {import('../src/types.js').SchemaElement} SchemaElement
@ -101,3 +101,33 @@ describe('convert function', () => {
.toThrow('parquet interval not supported')
})
})
describe('parseFloat16', () => {
it('should convert numbers', () => {
expect(parseFloat16(new Uint8Array([0x00, 0xbc]))).toBe(-1)
expect(parseFloat16(new Uint8Array([0x00, 0x00]))).toBe(0)
expect(parseFloat16(new Uint8Array([0x00, 0x38]))).toBe(0.5)
expect(parseFloat16(new Uint8Array([0x00, 0x3c]))).toBe(1)
expect(parseFloat16(new Uint8Array([0x00, 0x40]))).toBe(2)
})
it('should convert -0', () => {
expect(parseFloat16(new Uint8Array([0x00, 0x80]))).toBe(-0)
expect(parseFloat16(new Uint8Array([0x00, 0x80]))).not.toBe(0)
})
it('should convert Infinity', () => {
expect(parseFloat16(new Uint8Array([0x00, 0x7c]))).toBe(Infinity)
expect(parseFloat16(new Uint8Array([0x00, 0xfc]))).toBe(-Infinity)
})
it('should convert NaN', () => {
expect(parseFloat16(new Uint8Array([0x00, 0x7e]))).toBeNaN()
expect(parseFloat16(new Uint8Array([0x01, 0x7e]))).toBeNaN()
})
it('should convert a subnormal number', () => {
expect(parseFloat16(new Uint8Array([0xff, 0x03])))
.toBeCloseTo(Math.pow(2, -14) * (1023 / 1024), 5)
})
})

@ -0,0 +1,10 @@
[
[null],
[1],
[-2],
[null],
[0],
[-1],
[0],
[2]
]

@ -0,0 +1,78 @@
{
"version": 2,
"schema": [
{
"repetition_type": "REQUIRED",
"name": "schema",
"num_children": 1
},
{
"type": "FIXED_LEN_BYTE_ARRAY",
"type_length": 2,
"repetition_type": "OPTIONAL",
"name": "x",
"logical_type": {
"type": "FLOAT16"
}
}
],
"num_rows": 8,
"row_groups": [
{
"columns": [
{
"file_offset": 80,
"meta_data": {
"type": "FIXED_LEN_BYTE_ARRAY",
"encodings": [
"PLAIN",
"RLE",
"RLE_DICTIONARY"
],
"path_in_schema": [
"x"
],
"codec": "UNCOMPRESSED",
"num_values": 8,
"total_uncompressed_size": 76,
"total_compressed_size": 76,
"data_page_offset": 32,
"dictionary_page_offset": 4,
"statistics": {
"max": 2,
"min": -2,
"null_count": 1,
"max_value": 2,
"min_value": -2
},
"encoding_stats": [
{
"page_type": 2,
"encoding": "PLAIN",
"count": 1
},
{
"page_type": 0,
"encoding": "RLE_DICTIONARY",
"count": 1
}
]
}
}
],
"total_byte_size": 76,
"num_rows": 8,
"file_offset": 4,
"total_compressed_size": 76,
"ordinal": 0
}
],
"key_value_metadata": [
{
"key": "ARROW:schema",
"value": "/////3AAAAAQAAAAAAAKAAwABgAFAAgACgAAAAABBAAMAAAACAAIAAAABAAIAAAABAAAAAEAAAAUAAAAEAAUAAgABgAHAAwAAAAQABAAAAAAAAEDEAAAABgAAAAEAAAAAAAAAAEAAAB4AAAABAAEAAQAAAAAAAAA"
}
],
"created_by": "parquet-cpp-arrow version 15.0.0-SNAPSHOT",
"metadata_length": 346
}

Binary file not shown.

@ -27,7 +27,8 @@ describe('parquetRead test files', () => {
onComplete: (rows) => {
const base = filename.replace('.parquet', '')
const expected = fileToJson(`test/files/${base}.json`)
expect(toJson(rows)).toEqual(expected)
// stringify and parse to make legal json
expect(JSON.parse(JSON.stringify(toJson(rows)))).toEqual(expected)
},
})
})