From 8609192b23c16d4e1660033b73f7f31389304b26 Mon Sep 17 00:00:00 2001 From: LiraNuna <205628+LiraNuna@users.noreply.github.com> Date: Mon, 9 Jun 2025 20:02:31 -0500 Subject: [PATCH] Introduce 'custom parsers' option for decoding dates (#87) --- src/convert.js | 62 +++++++++++++++++------- src/index.js | 2 + src/indexes.js | 12 +++-- src/metadata.js | 46 ++++++++++-------- src/rowgroup.js | 6 ++- src/types.d.ts | 19 ++++++++ test/column.test.js | 3 ++ test/convert.test.js | 109 ++++++++++++++++++++++++++++++++++--------- 8 files changed, 195 insertions(+), 64 deletions(-) diff --git a/src/convert.js b/src/convert.js index 5378f1e..be5d897 100644 --- a/src/convert.js +++ b/src/convert.js @@ -1,4 +1,26 @@ -const dayMillis = 86400000 // 1 day in milliseconds +/** + * @import {ColumnDecoder, DecodedArray, Encoding, ParquetParsers, SchemaElement} from '../src/types.d.ts' + */ + +/** + * Default type parsers when no custom ones are given + * @type ParquetParsers + */ +export const DEFAULT_PARSERS = { + timestampFromMilliseconds(millis) { + return new Date(Number(millis)) + }, + timestampFromMicroseconds(micros) { + return new Date(Number(micros / 1000n)) + }, + timestampFromNanoseconds(nanos) { + return new Date(Number(nanos / 1000000n)) + }, + dateFromDays(days) { + const dayInMillis = 86400000 + return new Date(days * dayInMillis) + }, +} /** * Convert known types from primitive to rich, and dereference dictionary. @@ -29,11 +51,11 @@ export function convertWithDictionary(data, dictionary, encoding, columnDecoder) * Convert known types from primitive to rich. * * @param {DecodedArray} data series of primitive types - * @param {Pick} columnDecoder + * @param {Pick} columnDecoder * @returns {DecodedArray} series of rich types */ export function convert(data, columnDecoder) { - const { element, utf8 = true } = columnDecoder + const { element, parsers, utf8 = true } = columnDecoder const { type, converted_type: ctype, logical_type: ltype } = element if (ctype === 'DECIMAL') { const scale = element.scale || 0 @@ -49,26 +71,30 @@ export function convert(data, columnDecoder) { return arr } if (!ctype && type === 'INT96') { - return Array.from(data).map(parseInt96Date) + const arr = new Array(data.length) + for (let i = 0; i < arr.length; i++) { + arr[i] = parsers.timestampFromNanoseconds(parseInt96Nanos(data[i])) + } + return arr } if (ctype === 'DATE') { const arr = new Array(data.length) for (let i = 0; i < arr.length; i++) { - arr[i] = new Date(data[i] * dayMillis) + arr[i] = parsers.dateFromDays(data[i]) } return arr } if (ctype === 'TIMESTAMP_MILLIS') { const arr = new Array(data.length) for (let i = 0; i < arr.length; i++) { - arr[i] = new Date(Number(data[i])) + arr[i] = parsers.timestampFromMilliseconds(data[i]) } return arr } if (ctype === 'TIMESTAMP_MICROS') { const arr = new Array(data.length) for (let i = 0; i < arr.length; i++) { - arr[i] = new Date(Number(data[i] / 1000n)) + arr[i] = parsers.timestampFromMicroseconds(data[i]) } return arr } @@ -111,12 +137,13 @@ export function convert(data, columnDecoder) { } if (ltype?.type === 'TIMESTAMP') { const { unit } = ltype - let factor = 1n - if (unit === 'MICROS') factor = 1000n - if (unit === 'NANOS') factor = 1000000n + /** @type {ParquetParsers[keyof ParquetParsers]} */ + let parser = parsers.timestampFromMilliseconds + if (unit === 'MICROS') parser = parsers.timestampFromMicroseconds + if (unit === 'NANOS') parser = parsers.timestampFromNanoseconds const arr = new Array(data.length) for (let i = 0; i < arr.length; i++) { - arr[i] = new Date(Number(data[i] / factor)) + arr[i] = parser(data[i]) } return arr } @@ -143,15 +170,14 @@ export function parseDecimal(bytes) { } /** - * @import {ColumnDecoder, DecodedArray, Encoding, SchemaElement} from '../src/types.d.ts' + * Converts INT96 date format (hi 32bit days, lo 64bit nanos) to nanos since epoch * @param {bigint} value - * @returns {Date} + * @returns {bigint} */ -function parseInt96Date(value) { - const days = Number((value >> 64n) - 2440588n) - const nano = Number((value & 0xffffffffffffffffn) / 1000000n) - const millis = days * dayMillis + nano - return new Date(millis) +function parseInt96Nanos(value) { + const days = (value >> 64n) - 2440588n + const nano = value & 0xffffffffffffffffn + return days * 86400000000000n + nano } /** diff --git a/src/index.js b/src/index.js index 3ca6400..610ee6f 100644 --- a/src/index.js +++ b/src/index.js @@ -60,4 +60,6 @@ export function parquetReadObjects(options) { * @typedef {import('../src/types.d.ts').BoundaryOrder} BoundaryOrder * @typedef {import('../src/types.d.ts').ColumnData} ColumnData * @typedef {import('../src/types.d.ts').ParquetReadOptions} ParquetReadOptions + * @typedef {import('../src/types.d.ts').MetadataOptions} MetadataOptions + * @typedef {import('../src/types.d.ts').ParquetParsers} ParquetParsers */ diff --git a/src/indexes.js b/src/indexes.js index 8118bf1..e6f0933 100644 --- a/src/indexes.js +++ b/src/indexes.js @@ -1,18 +1,22 @@ import { BoundaryOrder } from './constants.js' +import { DEFAULT_PARSERS } from './convert.js' import { convertMetadata } from './metadata.js' import { deserializeTCompactProtocol } from './thrift.js' /** * @param {DataReader} reader * @param {SchemaElement} schema + * @param {ParquetParsers | undefined} parsers * @returns {ColumnIndex} */ -export function readColumnIndex(reader, schema) { +export function readColumnIndex(reader, schema, parsers = undefined) { + parsers = { ...DEFAULT_PARSERS, ...parsers } + const thrift = deserializeTCompactProtocol(reader) return { null_pages: thrift.field_1, - min_values: thrift.field_2.map((/** @type {any} */ m) => convertMetadata(m, schema)), - max_values: thrift.field_3.map((/** @type {any} */ m) => convertMetadata(m, schema)), + min_values: thrift.field_2.map((/** @type {any} */ m) => convertMetadata(m, schema, parsers)), + max_values: thrift.field_3.map((/** @type {any} */ m) => convertMetadata(m, schema, parsers)), boundary_order: BoundaryOrder[thrift.field_4], null_counts: thrift.field_5, repetition_level_histograms: thrift.field_6, @@ -33,7 +37,7 @@ export function readOffsetIndex(reader) { } /** - * @import {ColumnIndex, DataReader, OffsetIndex, PageLocation, SchemaElement} from '../src/types.d.ts' + * @import {ColumnIndex, DataReader, OffsetIndex, PageLocation, ParquetParsers, SchemaElement} from '../src/types.d.ts' * @param {any} loc * @returns {PageLocation} */ diff --git a/src/metadata.js b/src/metadata.js index 4baac4e..55cd6f3 100644 --- a/src/metadata.js +++ b/src/metadata.js @@ -1,5 +1,5 @@ import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, PageType, ParquetType } from './constants.js' -import { parseDecimal, parseFloat16 } from './convert.js' +import { DEFAULT_PARSERS, parseDecimal, parseFloat16 } from './convert.js' import { getSchemaPath } from './schema.js' import { deserializeTCompactProtocol } from './thrift.js' @@ -26,10 +26,10 @@ export const defaultInitialFetchSize = 1 << 19 // 512kb * or a second request for up to the metadata size. * * @param {AsyncBuffer} asyncBuffer parquet file contents - * @param {number} initialFetchSize initial fetch size in bytes (default 512kb) + * @param {MetadataOptions & { initialFetchSize?: number }} options initial fetch size in bytes (default 512kb) * @returns {Promise} parquet metadata object */ -export async function parquetMetadataAsync(asyncBuffer, initialFetchSize = defaultInitialFetchSize) { +export async function parquetMetadataAsync(asyncBuffer, { parsers, initialFetchSize = defaultInitialFetchSize } = {}) { if (!asyncBuffer || !(asyncBuffer.byteLength >= 0)) throw new Error('parquet expected AsyncBuffer') // fetch last bytes (footer) of the file @@ -59,10 +59,10 @@ export async function parquetMetadataAsync(asyncBuffer, initialFetchSize = defau const combinedView = new Uint8Array(combinedBuffer) combinedView.set(new Uint8Array(metadataBuffer)) combinedView.set(new Uint8Array(footerBuffer), footerOffset - metadataOffset) - return parquetMetadata(combinedBuffer) + return parquetMetadata(combinedBuffer, { parsers }) } else { // parse metadata from the footer - return parquetMetadata(footerBuffer) + return parquetMetadata(footerBuffer, { parsers }) } } @@ -70,12 +70,16 @@ export async function parquetMetadataAsync(asyncBuffer, initialFetchSize = defau * Read parquet metadata from a buffer synchronously. * * @param {ArrayBuffer} arrayBuffer parquet file footer + * @param {MetadataOptions} options metadata parsing options * @returns {FileMetaData} parquet metadata object */ -export function parquetMetadata(arrayBuffer) { +export function parquetMetadata(arrayBuffer, { parsers } = {}) { if (!(arrayBuffer instanceof ArrayBuffer)) throw new Error('parquet expected ArrayBuffer') const view = new DataView(arrayBuffer) + // Use default parsers if not given + parsers = { ...DEFAULT_PARSERS, ...parsers } + // Validate footer magic number "PAR1" if (view.byteLength < 8) { throw new Error('parquet file is too short') @@ -135,7 +139,7 @@ export function parquetMetadata(arrayBuffer) { data_page_offset: column.field_3.field_9, index_page_offset: column.field_3.field_10, dictionary_page_offset: column.field_3.field_11, - statistics: convertStats(column.field_3.field_12, columnSchema[columnIndex]), + statistics: convertStats(column.field_3.field_12, columnSchema[columnIndex], parsers), encoding_stats: column.field_3.field_13?.map((/** @type {any} */ encodingStat) => ({ page_type: PageType[encodingStat.field_1], encoding: Encoding[encodingStat.field_2], @@ -246,19 +250,20 @@ function timeUnit(unit) { /** * Convert column statistics based on column type. * - * @import {AsyncBuffer, FileMetaData, LogicalType, MinMaxType, SchemaElement, SchemaTree, Statistics, TimeUnit} from '../src/types.d.ts' + * @import {AsyncBuffer, FileMetaData, LogicalType, MetadataOptions, MinMaxType, ParquetParsers, SchemaElement, SchemaTree, Statistics, TimeUnit} from '../src/types.d.ts' * @param {any} stats * @param {SchemaElement} schema + * @param {ParquetParsers} parsers * @returns {Statistics} */ -function convertStats(stats, schema) { +function convertStats(stats, schema, parsers) { return stats && { - max: convertMetadata(stats.field_1, schema), - min: convertMetadata(stats.field_2, schema), + max: convertMetadata(stats.field_1, schema, parsers), + min: convertMetadata(stats.field_2, schema, parsers), null_count: stats.field_3, distinct_count: stats.field_4, - max_value: convertMetadata(stats.field_5, schema), - min_value: convertMetadata(stats.field_6, schema), + max_value: convertMetadata(stats.field_5, schema, parsers), + min_value: convertMetadata(stats.field_6, schema, parsers), is_max_value_exact: stats.field_7, is_min_value_exact: stats.field_8, } @@ -267,9 +272,10 @@ function convertStats(stats, schema) { /** * @param {Uint8Array | undefined} value * @param {SchemaElement} schema + * @param {ParquetParsers} parsers * @returns {MinMaxType | undefined} */ -export function convertMetadata(value, schema) { +export function convertMetadata(value, schema, parsers) { const { type, converted_type, logical_type } = schema if (value === undefined) return value if (type === 'BOOLEAN') return value[0] === 1 @@ -277,12 +283,12 @@ export function convertMetadata(value, schema) { const view = new DataView(value.buffer, value.byteOffset, value.byteLength) if (type === 'FLOAT' && view.byteLength === 4) return view.getFloat32(0, true) if (type === 'DOUBLE' && view.byteLength === 8) return view.getFloat64(0, true) - if (type === 'INT32' && converted_type === 'DATE') return new Date(view.getInt32(0, true) * 86400000) - if (type === 'INT64' && converted_type === 'TIMESTAMP_MICROS') return new Date(Number(view.getBigInt64(0, true) / 1000n)) - if (type === 'INT64' && converted_type === 'TIMESTAMP_MILLIS') return new Date(Number(view.getBigInt64(0, true))) - if (type === 'INT64' && logical_type?.type === 'TIMESTAMP' && logical_type?.unit === 'NANOS') return new Date(Number(view.getBigInt64(0, true) / 1000000n)) - if (type === 'INT64' && logical_type?.type === 'TIMESTAMP' && logical_type?.unit === 'MICROS') return new Date(Number(view.getBigInt64(0, true) / 1000n)) - if (type === 'INT64' && logical_type?.type === 'TIMESTAMP') return new Date(Number(view.getBigInt64(0, true))) + if (type === 'INT32' && converted_type === 'DATE') return parsers.dateFromDays(view.getInt32(0, true)) + if (type === 'INT64' && converted_type === 'TIMESTAMP_MILLIS') return parsers.timestampFromMilliseconds(view.getBigInt64(0, true)) + if (type === 'INT64' && converted_type === 'TIMESTAMP_MICROS') return parsers.timestampFromMicroseconds(view.getBigInt64(0, true)) + if (type === 'INT64' && logical_type?.type === 'TIMESTAMP' && logical_type?.unit === 'NANOS') return parsers.timestampFromNanoseconds(view.getBigInt64(0, true)) + if (type === 'INT64' && logical_type?.type === 'TIMESTAMP' && logical_type?.unit === 'MICROS') return parsers.timestampFromMicroseconds(view.getBigInt64(0, true)) + if (type === 'INT64' && logical_type?.type === 'TIMESTAMP') return parsers.timestampFromMilliseconds(view.getBigInt64(0, true)) if (type === 'INT32' && view.byteLength === 4) return view.getInt32(0, true) if (type === 'INT64' && view.byteLength === 8) return view.getBigInt64(0, true) if (converted_type === 'DECIMAL') return parseDecimal(value) * 10 ** -(schema.scale || 0) diff --git a/src/rowgroup.js b/src/rowgroup.js index 12fdcae..916f556 100644 --- a/src/rowgroup.js +++ b/src/rowgroup.js @@ -1,11 +1,12 @@ import { assembleNested } from './assemble.js' import { readColumn } from './column.js' +import { DEFAULT_PARSERS } from './convert.js' import { getColumnRange } from './plan.js' import { getSchemaPath } from './schema.js' import { flatten } from './utils.js' /** - * @import {AsyncColumn, AsyncRowGroup, DecodedArray, GroupPlan, ParquetReadOptions, QueryPlan, RowGroup, SchemaTree} from './types.js' + * @import {AsyncColumn, AsyncRowGroup, DecodedArray, GroupPlan, ParquetParsers, ParquetReadOptions, QueryPlan, RowGroup, SchemaTree} from './types.js' */ /** * Read a row group from a file-like object. @@ -20,6 +21,8 @@ export function readRowGroup(options, { metadata, columns }, groupPlan) { /** @type {AsyncColumn[]} */ const asyncColumns = [] + /** @type {ParquetParsers} */ + const parsers = { ...DEFAULT_PARSERS, ...options.parsers } // read column data for (const { file_path, meta_data } of groupPlan.rowGroup.columns) { @@ -58,6 +61,7 @@ export function readRowGroup(options, { metadata, columns }, groupPlan) { element: schemaPath[schemaPath.length - 1].element, schemaPath, codec: meta_data.codec, + parsers, compressors, utf8, } diff --git a/src/types.d.ts b/src/types.d.ts index d110367..9f8c2af 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -1,3 +1,20 @@ +/** + * Custom parsers for columns + */ +export interface ParquetParsers { + timestampFromMilliseconds(millis: bigint): any; + timestampFromMicroseconds(micros: bigint): any; + timestampFromNanoseconds(nanos: bigint): any; + dateFromDays(days: number): any; +} + +/** + * Paruqet Metadata options for metadata parsing + */ +export interface MetadataOptions { + parsers?: ParquetParsers // custom parsers to decode advanced types +} + /** * Parquet query options for reading data */ @@ -13,6 +30,7 @@ export interface ParquetReadOptions { onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed compressors?: Compressors // custom decompressors utf8?: boolean // decode byte arrays as utf8 strings (default true) + parsers?: ParquetParsers // custom parsers to decode advanced types } /** @@ -409,6 +427,7 @@ export interface ColumnDecoder { element: SchemaElement schemaPath: SchemaTree[] codec: CompressionCodec + parsers: ParquetParsers compressors?: Compressors utf8?: boolean } diff --git a/test/column.test.js b/test/column.test.js index 00d3f7a..ce7b2ea 100644 --- a/test/column.test.js +++ b/test/column.test.js @@ -1,5 +1,6 @@ import { describe, expect, it } from 'vitest' import { readColumn } from '../src/column.js' +import { DEFAULT_PARSERS } from '../src/convert.js' import { parquetMetadata } from '../src/index.js' import { asyncBufferFromFile } from '../src/node.js' import { getColumnRange } from '../src/plan.js' @@ -29,6 +30,7 @@ describe('readColumn', () => { type: column.meta_data.type, element: schemaPath[schemaPath.length - 1].element, schemaPath, + parsers: DEFAULT_PARSERS, codec: column.meta_data.codec, } const rowGroupSelect = { @@ -59,6 +61,7 @@ describe('readColumn', () => { type: column.meta_data.type, element: schemaPath[schemaPath.length - 1].element, schemaPath, + parsers: DEFAULT_PARSERS, codec: column.meta_data.codec, } const rowGroupSelect = { diff --git a/test/convert.test.js b/test/convert.test.js index 2332ea2..b2b3437 100644 --- a/test/convert.test.js +++ b/test/convert.test.js @@ -1,5 +1,5 @@ import { describe, expect, it } from 'vitest' -import { convert, parseDecimal, parseFloat16 } from '../src/convert.js' +import { DEFAULT_PARSERS, convert, parseDecimal, parseFloat16 } from '../src/convert.js' /** * @import {ColumnDecoder, SchemaElement} from '../src/types.js' @@ -7,31 +7,33 @@ import { convert, parseDecimal, parseFloat16 } from '../src/convert.js' describe('convert function', () => { const name = 'name' + const parsers = DEFAULT_PARSERS + it('returns the same data if converted_type is undefined', () => { const data = [1, 2, 3] const element = { name } - expect(convert(data, { element })).toEqual(data) + expect(convert(data, { element, parsers })).toEqual(data) }) it('converts byte arrays to utf8', () => { const data = [new TextEncoder().encode('foo'), new TextEncoder().encode('bar')] /** @type {SchemaElement} */ const element = { name, converted_type: 'UTF8' } - expect(convert(data, { element })).toEqual(['foo', 'bar']) + expect(convert(data, { element, parsers })).toEqual(['foo', 'bar']) }) it('converts byte arrays to utf8 default true', () => { const data = [new TextEncoder().encode('foo'), new TextEncoder().encode('bar')] /** @type {SchemaElement} */ const element = { name, type: 'BYTE_ARRAY' } - expect(convert(data, { element })).toEqual(['foo', 'bar']) + expect(convert(data, { element, parsers })).toEqual(['foo', 'bar']) }) it('preserves byte arrays utf8=false', () => { const data = [new TextEncoder().encode('foo'), new TextEncoder().encode('bar')] /** @type {SchemaElement} */ const element = { name, type: 'BYTE_ARRAY' } - expect(convert(data, { element, utf8: false })).toEqual([ + expect(convert(data, { element, parsers, utf8: false })).toEqual([ new Uint8Array([102, 111, 111]), new Uint8Array([98, 97, 114]), ]) }) @@ -40,49 +42,49 @@ describe('convert function', () => { const data = [100, 200] /** @type {SchemaElement} */ const element = { name, converted_type: 'DECIMAL' } - expect(convert(data, { element })).toEqual([100, 200]) + expect(convert(data, { element, parsers })).toEqual([100, 200]) }) it('converts numbers to DECIMAL with scale', () => { const data = [100, 200] /** @type {SchemaElement} */ const element = { name, converted_type: 'DECIMAL', scale: 2 } - expect(convert(data, { element })).toEqual([1, 2]) + expect(convert(data, { element, parsers })).toEqual([1, 2]) }) it('converts bigint to DECIMAL', () => { const data = [1000n, 2000n] /** @type {SchemaElement} */ const element = { name, converted_type: 'DECIMAL' } - expect(convert(data, { element })).toEqual([1000, 2000]) + expect(convert(data, { element, parsers })).toEqual([1000, 2000]) }) it('converts bigint to DECIMAL with scale', () => { const data = [10n, 20n] /** @type {SchemaElement} */ const element = { name, converted_type: 'DECIMAL', scale: 2 } - expect(convert(data, { element })).toEqual([0.1, 0.2]) + expect(convert(data, { element, parsers })).toEqual([0.1, 0.2]) }) it('converts byte arrays to DECIMAL', () => { const data = [new Uint8Array([0, 0, 0, 100]), new Uint8Array([0, 0, 0, 200])] /** @type {SchemaElement} */ const element = { name, converted_type: 'DECIMAL', scale: 0 } - expect(convert(data, { element })).toEqual([100, 200]) + expect(convert(data, { element, parsers })).toEqual([100, 200]) }) it('converts byte array from issue #59 to DECIMAL', () => { const data = [new Uint8Array([18, 83, 137, 151, 156, 0])] /** @type {SchemaElement} */ const element = { name, converted_type: 'DECIMAL', scale: 10, precision: 14 } - expect(convert(data, { element })).toEqual([2015]) + expect(convert(data, { element, parsers })).toEqual([2015]) }) it('converts epoch time to DATE', () => { const data = [1, 2] // days since epoch /** @type {SchemaElement} */ const element = { name, converted_type: 'DATE' } - expect(convert(data, { element })).toEqual([new Date(86400000), new Date(86400000 * 2)]) + expect(convert(data, { element, parsers })).toEqual([new Date(86400000), new Date(86400000 * 2)]) }) it('converts INT96 to DATE', () => { @@ -90,14 +92,14 @@ describe('convert function', () => { const data = [45284764452596988585705472n, 45284764452597048585705472n] /** @type {SchemaElement} */ const element = { name, type: 'INT96' } - expect(convert(data, { element })).toEqual([new Date('2009-03-01T00:00:00.000Z'), new Date('2009-03-01T00:01:00.000Z')]) + expect(convert(data, { element, parsers })).toEqual([new Date('2009-03-01T00:00:00.000Z'), new Date('2009-03-01T00:01:00.000Z')]) }) it('converts epoch time to TIMESTAMP_MILLIS', () => { const data = [1716506900000n, 1716507000000n] /** @type {SchemaElement} */ const element = { name, converted_type: 'TIMESTAMP_MILLIS' } - expect(convert(data, { element })).toEqual([ + expect(convert(data, { element, parsers })).toEqual([ new Date('2024-05-23T23:28:20.000Z'), new Date('2024-05-23T23:30:00.000Z'), ]) }) @@ -106,7 +108,7 @@ describe('convert function', () => { const data = [1716506900000000n, 1716507000000000n] /** @type {SchemaElement} */ const element = { name, converted_type: 'TIMESTAMP_MICROS' } - expect(convert(data, { element })).toEqual([ + expect(convert(data, { element, parsers })).toEqual([ new Date('2024-05-23T23:28:20.000Z'), new Date('2024-05-23T23:30:00.000Z'), ]) }) @@ -117,28 +119,28 @@ describe('convert function', () => { .map(str => encoder.encode(str)) /** @type {SchemaElement} */ const element = { name, converted_type: 'JSON' } - expect(convert(data, { element })).toEqual([{ key: true }, { quay: 314 }]) + expect(convert(data, { element, parsers })).toEqual([{ key: true }, { quay: 314 }]) }) it('converts uint64', () => { const data = [100n, -100n] /** @type {SchemaElement} */ const element = { name, converted_type: 'UINT_64' } - expect(convert(data, { element })).toEqual(new BigUint64Array([100n, 18446744073709551516n])) + expect(convert(data, { element, parsers })).toEqual(new BigUint64Array([100n, 18446744073709551516n])) }) it('converts to float16', () => { const data = [new Uint8Array([0x00, 0x3c]), new Uint8Array([0x00, 0x40])] /** @type {SchemaElement} */ const element = { name, logical_type: { type: 'FLOAT16' } } - expect(convert(data, { element })).toEqual([1, 2]) + expect(convert(data, { element, parsers })).toEqual([1, 2]) }) it('converts timestamp with units', () => { const data = [1716506900000000n, 1716507000000000n] /** @type {SchemaElement} */ const element = { name, logical_type: { type: 'TIMESTAMP', isAdjustedToUTC: true, unit: 'MICROS' } } - expect(convert(data, { element })).toEqual([ + expect(convert(data, { element, parsers })).toEqual([ new Date('2024-05-23T23:28:20.000Z'), new Date('2024-05-23T23:30:00.000Z'), ]) }) @@ -147,7 +149,7 @@ describe('convert function', () => { const data = [{}] /** @type {SchemaElement} */ const element = { name, converted_type: 'BSON' } - expect(() => convert(data, { element })) + expect(() => convert(data, { element, parsers })) .toThrow('parquet bson not supported') }) @@ -155,9 +157,74 @@ describe('convert function', () => { const data = [{}] /** @type {SchemaElement} */ const element = { name, converted_type: 'INTERVAL' } - expect(() => convert(data, { element })) + expect(() => convert(data, { element, parsers })) .toThrow('parquet interval not supported') }) + + it('respects custom parsers - dateFromDays', () => { + const data = [1, 2] // days since epoch + /** @type {SchemaElement} */ + const element = { name, converted_type: 'DATE' } + /** @type {Pick} */ + const columnParser = { + element, + parsers: { + ...parsers, + dateFromDays: days => days, + }, + } + + expect(convert(data, columnParser)).toEqual([ 1, 2 ]) + }) + + it('respects custom parsers - timestampFromMilliseconds', () => { + const data = [1716506900000n, 1716507000000n] + /** @type {SchemaElement} */ + const element = { name, converted_type: 'TIMESTAMP_MILLIS' } + /** @type {Pick} */ + const columnParser = { + element, + parsers: { + ...parsers, + timestampFromMilliseconds: millis => Number(millis / 100000n), + }, + } + + expect(convert(data, columnParser)).toEqual([ 17165069, 17165070 ]) + }) + + it('respects custom parsers - timestampFromMicroseconds', () => { + const data = [1716506900000000n, 1716507000000000n] + /** @type {SchemaElement} */ + const element = { name, logical_type: { type: 'TIMESTAMP', isAdjustedToUTC: true, unit: 'MICROS' } } + /** @type {Pick} */ + const columnParser = { + element, + parsers: { + ...parsers, + timestampFromMicroseconds: micros => Number(micros / 100000000n), + }, + } + + expect(convert(data, columnParser)).toEqual([ 17165069, 17165070 ]) + }) + + it('respects custom parsers - timestampFromNanoseconds', () => { + // from alltypes_plain.parquet + const data = [45284764452596988585705472n, 45284764452597048585705472n] + /** @type {SchemaElement} */ + const element = { name, type: 'INT96' } + /** @type {Pick} */ + const columnParser = { + element, + parsers: { + ...parsers, + timestampFromNanoseconds: micros => Number(micros / 100000000000n), + }, + } + + expect(convert(data, columnParser)).toEqual([ 12358656, 12358656 ]) + }) }) describe('parseFloat16', () => {