From ca971ccc01308309f702112d9a39313184de7413 Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Mon, 26 Feb 2024 12:20:48 -0800 Subject: [PATCH] Split out convert function --- package.json | 2 +- src/column.js | 61 +--------------------------- src/convert.js | 61 ++++++++++++++++++++++++++++ src/header.js | 1 - src/types.d.ts | 8 ---- test/convert.test.js | 95 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 158 insertions(+), 70 deletions(-) create mode 100644 src/convert.js create mode 100644 test/convert.test.js diff --git a/package.json b/package.json index 3c4b5eb..f2e9d92 100644 --- a/package.json +++ b/package.json @@ -28,7 +28,7 @@ }, "devDependencies": { "@types/node": "20.11.20", - "@typescript-eslint/eslint-plugin": "7.0.2", + "@typescript-eslint/eslint-plugin": "7.1.0", "@vitest/coverage-v8": "1.3.1", "eslint": "8.57.0", "eslint-plugin-import": "2.29.1", diff --git a/src/column.js b/src/column.js index 98c5d19..b2129ea 100644 --- a/src/column.js +++ b/src/column.js @@ -1,18 +1,16 @@ import { Encoding, PageType } from './constants.js' +import { convert } from './convert.js' import { assembleObjects, readDataPage, readDictionaryPage } from './datapage.js' import { parquetHeader } from './header.js' import { getMaxDefinitionLevel, isRequired, schemaElement } from './schema.js' import { snappyUncompress } from './snappy.js' /** - * @typedef {import('./types.js').ArrayBufferLike} ArrayBufferLike * @typedef {import('./types.js').SchemaElement} SchemaElement * @typedef {import('./types.js').ColumnMetaData} ColumnMetaData * @typedef {import('./types.js').RowGroup} RowGroup */ -const dayMillis = 86400000000000 // 1 day in milliseconds - /** * Parse column data from a buffer. * @@ -133,63 +131,6 @@ export function getColumnOffset(columnMetadata) { return Number(columnOffset) } -/** - * Convert known types from primitive to rich. - * - * @param {any[]} data series of primitive types - * @param {SchemaElement} schemaElement schema element for the data - * @returns {any[]} series of rich types - */ -function convert(data, schemaElement) { - const ctype = schemaElement.converted_type - if (ctype === undefined) return data - if (ctype === 'UTF8') { - const decoder = new TextDecoder() - return data.map(v => decoder.decode(v)) - } - if (ctype === 'DECIMAL') { - const scaleFactor = Math.pow(10, schemaElement.scale || 0) - if (typeof data[0] === 'number') { - return scaleFactor === 1 ? data : data.map(v => v * scaleFactor) - } else if (typeof data[0] === 'bigint') { - return scaleFactor === 1 ? data : data.map(v => Number(v) * scaleFactor) - } else { - return data.map(v => parseDecimal(v) * scaleFactor) - } - } - if (ctype === 'DATE') { - return data.map(v => new Date(v * dayMillis)) - } - if (ctype === 'TIME_MILLIS') { - return data.map(v => new Date(v)) - } - if (ctype === 'JSON') { - return data.map(v => JSON.parse(v)) - } - if (ctype === 'BSON') { - throw new Error('parquet bson not supported') - } - if (ctype === 'INTERVAL') { - throw new Error('parquet interval not supported') - } - return data -} - -/** - * Parse decimal from byte array. - * - * @param {Uint8Array} bytes - * @returns {number} - */ -function parseDecimal(bytes) { - // TODO: handle signed - let value = 0 - for (const byte of bytes) { - value = value << 8 | byte - } - return value -} - /** * @typedef {import('./types.js').PageHeader} PageHeader * @typedef {import('./types.js').CompressionCodec} CompressionCodec diff --git a/src/convert.js b/src/convert.js new file mode 100644 index 0000000..1808829 --- /dev/null +++ b/src/convert.js @@ -0,0 +1,61 @@ +/** + * @typedef {import('./types.js').SchemaElement} SchemaElement + */ + +const dayMillis = 86400000000000 // 1 day in milliseconds + +/** + * Convert known types from primitive to rich. + * + * @param {any[]} data series of primitive types + * @param {SchemaElement} schemaElement schema element for the data + * @returns {any[]} series of rich types + */ +export function convert(data, schemaElement) { + const ctype = schemaElement.converted_type + if (ctype === 'UTF8') { + const decoder = new TextDecoder() + return data.map(v => decoder.decode(v)) + } + if (ctype === 'DECIMAL') { + const scaleFactor = schemaElement.scale ? Math.pow(10, schemaElement.scale) : 1 + if (typeof data[0] === 'number') { + return scaleFactor === 1 ? data : data.map(v => v * scaleFactor) + } else if (typeof data[0] === 'bigint') { + return scaleFactor === 1 ? data : data.map(v => v * BigInt(scaleFactor)) + } else { + return data.map(v => parseDecimal(v) * scaleFactor) + } + } + if (ctype === 'DATE') { + return data.map(v => new Date(v * dayMillis)) + } + if (ctype === 'TIME_MILLIS') { + return data.map(v => new Date(v)) + } + if (ctype === 'JSON') { + return data.map(v => JSON.parse(v)) + } + if (ctype === 'BSON') { + throw new Error('parquet bson not supported') + } + if (ctype === 'INTERVAL') { + throw new Error('parquet interval not supported') + } + return data +} + +/** + * Parse decimal from byte array. + * + * @param {Uint8Array} bytes + * @returns {number} + */ +function parseDecimal(bytes) { + // TODO: handle signed + let value = 0 + for (const byte of bytes) { + value = value << 8 | byte + } + return value +} diff --git a/src/header.js b/src/header.js index d0642d1..171017f 100644 --- a/src/header.js +++ b/src/header.js @@ -11,7 +11,6 @@ import { deserializeTCompactProtocol } from './thrift.js' /** * Read parquet header from a buffer. * - * @typedef {import("./types.d.ts").ArrayBufferLike} ArrayBufferLike * @typedef {import("./types.d.ts").PageHeader} PageHeader * @param {ArrayBuffer} arrayBuffer parquet file contents * @param {number} offset offset to start reading from diff --git a/src/types.d.ts b/src/types.d.ts index c4afa44..db057b0 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -6,14 +6,6 @@ export interface AsyncBuffer { slice(start: number, end?: number): Promise } -/** - * Just like an ArrayBuffer, but an interface - */ -export interface ArrayBufferLike { - byteLength: number - slice(start: number, end?: number): ArrayBuffer -} - /** * Represents a decoded value, and includes the number of bytes read. * This is used to read data from the file and advance a virtual file pointer. diff --git a/test/convert.test.js b/test/convert.test.js new file mode 100644 index 0000000..c4af85f --- /dev/null +++ b/test/convert.test.js @@ -0,0 +1,95 @@ +import { describe, expect, it } from 'vitest' +import { convert } from '../src/convert.js' + +/** + * @typedef {import('../src/types.js').SchemaElement} SchemaElement + */ + +describe('convert function', () => { + const name = 'name' + it('returns the same data if converted_type is undefined', () => { + const data = [1, 2, 3] + const schemaElement = { name } + expect(convert(data, schemaElement)).toEqual(data) + }) + + it('converts byte arrays to UTF8 strings', () => { + const data = [new TextEncoder().encode('test'), new TextEncoder().encode('vitest')] + /** @type {SchemaElement} */ + const schemaElement = { name, converted_type: 'UTF8' } + expect(convert(data, schemaElement)).toEqual(['test', 'vitest']) + }) + + it('converts numbers to DECIMAL', () => { + const data = [100, 200] + /** @type {SchemaElement} */ + const schemaElement = { name, converted_type: 'DECIMAL' } + expect(convert(data, schemaElement)).toEqual([100, 200]) + }) + + it('converts numbers to DECIMAL with scale', () => { + const data = [100, 200] + /** @type {SchemaElement} */ + const schemaElement = { name, converted_type: 'DECIMAL', scale: 2 } + expect(convert(data, schemaElement)).toEqual([10000, 20000]) + }) + + it('converts bigint to DECIMAL', () => { + const data = [BigInt(1000), BigInt(2000)] + /** @type {SchemaElement} */ + const schemaElement = { name, converted_type: 'DECIMAL' } + expect(convert(data, schemaElement)).toEqual([1000n, 2000n]) + }) + + it('converts bigint to DECIMAL with scale', () => { + const data = [BigInt(1000), BigInt(2000)] + /** @type {SchemaElement} */ + const schemaElement = { name, converted_type: 'DECIMAL', scale: 3 } + expect(convert(data, schemaElement)).toEqual([1000000n, 2000000n]) + }) + + it('converts byte arrays to DECIMAL', () => { + const data = [new Uint8Array([0, 0, 0, 100]), new Uint8Array([0, 0, 0, 200])] + /** @type {SchemaElement} */ + const schemaElement = { name, converted_type: 'DECIMAL', scale: 0 } + expect(convert(data, schemaElement)).toEqual([100, 200]) + }) + + it('converts epoch time to DATE', () => { + const data = [1, 2] // days since epoch + /** @type {SchemaElement} */ + const schemaElement = { name, converted_type: 'DATE' } + expect(convert(data, schemaElement)).toEqual([new Date(86400000000000), new Date(86400000000000 * 2)]) + }) + + it('converts milliseconds to TIME_MILLIS', () => { + const now = Date.now() + const data = [now] + /** @type {SchemaElement} */ + const schemaElement = { name, converted_type: 'TIME_MILLIS' } + expect(convert(data, schemaElement)).toEqual([new Date(now)]) + }) + + it('parses strings to JSON', () => { + const data = ['{"key": true}', '{"quay": 314}'] + /** @type {SchemaElement} */ + const schemaElement = { name, converted_type: 'JSON' } + expect(convert(data, schemaElement)).toEqual([{ key: true }, { quay: 314 }]) + }) + + it('throws error for BSON conversion', () => { + const data = [{}] + /** @type {SchemaElement} */ + const schemaElement = { name, converted_type: 'BSON' } + expect(() => convert(data, schemaElement)) + .toThrow('parquet bson not supported') + }) + + it('throws error for INTERVAL conversion', () => { + const data = [{}] + /** @type {SchemaElement} */ + const schemaElement = { name, converted_type: 'INTERVAL' } + expect(() => convert(data, schemaElement)) + .toThrow('parquet interval not supported') + }) +})