/** * Replace bigint, date, etc with legal JSON types. * When parsing parquet files, bigints are used to represent 64-bit integers. * However, JSON does not support bigints, so it's helpful to convert to numbers. * * @param {any} obj object to convert * @returns {unknown} converted object */ export function toJson(obj) { if (obj === undefined) return null if (typeof obj === 'bigint') return Number(obj) if (Array.isArray(obj)) return obj.map(toJson) if (obj instanceof Uint8Array) return Array.from(obj) if (obj instanceof Date) return obj.toISOString() if (obj instanceof Object) { /** @type {Record} */ const newObj = {} for (const key of Object.keys(obj)) { if (obj[key] === undefined) continue newObj[key] = toJson(obj[key]) } return newObj } return obj } /** * Concatenate two arrays fast. * * @param {any[]} aaa first array * @param {DecodedArray} bbb second array */ export function concat(aaa, bbb) { const chunk = 10000 for (let i = 0; i < bbb.length; i += chunk) { aaa.push(...bbb.slice(i, i + chunk)) } } /** * Get the byte length of a URL using a HEAD request. * If requestInit is provided, it will be passed to fetch. * * @param {string} url * @param {RequestInit} [requestInit] fetch options * @returns {Promise} */ export async function byteLengthFromUrl(url, requestInit) { return await fetch(url, { ...requestInit, method: 'HEAD' }) .then(res => { if (!res.ok) throw new Error(`fetch head failed ${res.status}`) const length = res.headers.get('Content-Length') if (!length) throw new Error('missing content length') return parseInt(length) }) } /** * Construct an AsyncBuffer for a URL. * If byteLength is not provided, will make a HEAD request to get the file size. * If requestInit is provided, it will be passed to fetch. * * @param {object} options * @param {string} options.url * @param {number} [options.byteLength] * @param {RequestInit} [options.requestInit] * @returns {Promise} */ export async function asyncBufferFromUrl({ url, byteLength, requestInit }) { if (!url) throw new Error('missing url') // byte length from HEAD request byteLength ||= await byteLengthFromUrl(url, requestInit) const init = requestInit || {} return { byteLength, async slice(start, end) { // fetch byte range from url const headers = new Headers(init.headers) const endStr = end === undefined ? '' : end - 1 headers.set('Range', `bytes=${start}-${endStr}`) const res = await fetch(url, { ...init, headers }) if (!res.ok || !res.body) throw new Error(`fetch failed ${res.status}`) return res.arrayBuffer() }, } } /** * Construct an AsyncBuffer for a local file using node fs package. * * @param {string} filename * @returns {Promise} */ export async function asyncBufferFromFile(filename) { const fsPackage = 'fs' // webpack no include const fs = await import(fsPackage) const stat = await fs.promises.stat(filename) return { byteLength: stat.size, async slice(start, end) { // read file slice const readStream = fs.createReadStream(filename, { start, end }) return await readStreamToArrayBuffer(readStream) }, } } /** * Convert a node ReadStream to ArrayBuffer. * * @param {import('stream').Readable} input * @returns {Promise} */ function readStreamToArrayBuffer(input) { return new Promise((resolve, reject) => { /** @type {Buffer[]} */ const chunks = [] input.on('data', chunk => chunks.push(chunk)) input.on('end', () => { const buffer = Buffer.concat(chunks) resolve(buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength)) }) input.on('error', reject) }) } /** * Returns a cached layer on top of an AsyncBuffer. For caching slices of a file * that are read multiple times, possibly over a network. * * @param {AsyncBuffer} file file-like object to cache * @returns {AsyncBuffer} cached file-like object */ export function cachedAsyncBuffer({ byteLength, slice }) { const cache = new Map() return { byteLength, /** * @param {number} start * @param {number} [end] * @returns {Awaitable} */ slice(start, end) { const key = cacheKey(start, end, byteLength) const cached = cache.get(key) if (cached) return cached // cache miss, read from file const promise = slice(start, end) cache.set(key, promise) return promise }, } } /** * Returns canonical cache key for a byte range 'start,end'. * Normalize int-range and suffix-range requests to the same key. * * @import {AsyncBuffer, Awaitable, DecodedArray} from '../src/types.d.ts' * @param {number} start start byte of range * @param {number} [end] end byte of range, or undefined for suffix range * @param {number} [size] size of file, or undefined for suffix range * @returns {string} */ function cacheKey(start, end, size) { if (start < 0) { if (end !== undefined) throw new Error(`invalid suffix range [${start}, ${end}]`) if (size === undefined) return `${start},` return `${size + start},${size}` } else if (end !== undefined) { if (start > end) throw new Error(`invalid empty range [${start}, ${end}]`) return `${start},${end}` } else if (size === undefined) { return `${start},` } else { return `${start},${size}` } }