Skip to content

Commit fd63810

Browse files
committed
add capability and option flag to treat in96 as a timestamp
1 parent b2d9890 commit fd63810

File tree

6 files changed

+132
-9
lines changed

6 files changed

+132
-9
lines changed

lib/bufferReader.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export interface BufferReaderOptions {
99
default_dictionary_size?: number;
1010
metadata?: FileMetaDataExt;
1111
rawStatistics?: Statistics;
12+
treatInt96AsTimestamp?: boolean;
1213
}
1314

1415
interface BufferReaderQueueRow {

lib/codec/plain.ts

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -146,25 +146,57 @@ function encodeValues_INT96(values: number[]) {
146146
return buf;
147147
}
148148

149-
function decodeValues_INT96(cursor: Cursor, count: number) {
149+
function decodeValues_INT96(cursor: Cursor, count: number, opts?: Options) {
150150
const values = [];
151+
// Default to false for backward compatibility
152+
const treatAsTimestamp = opts?.treatInt96AsTimestamp === true;
151153

152154
for (let i = 0; i < count; ++i) {
153-
const low = INT53.readInt64LE(cursor.buffer, cursor.offset);
154-
const high = cursor.buffer.readUInt32LE(cursor.offset + 8);
155-
156-
if (high === 0xffffffff) {
157-
values.push(~-low + 1); // truncate to 64 actual precision
155+
const nanosSinceMidnight = INT53.readInt64LE(cursor.buffer, cursor.offset);
156+
const julianDay = cursor.buffer.readUInt32LE(cursor.offset + 8);
157+
158+
if (treatAsTimestamp) {
159+
// Convert Julian day and nanoseconds to a timestamp
160+
values.push(convertInt96ToTimestamp(julianDay, nanosSinceMidnight));
158161
} else {
159-
values.push(low); // truncate to 64 actual precision
162+
// For non-timestamp INT96 values, maintain existing behavior
163+
if (julianDay === 0xffffffff) {
164+
values.push(~-nanosSinceMidnight + 1); // negative value
165+
} else {
166+
values.push(nanosSinceMidnight); // positive value
167+
}
160168
}
161-
169+
162170
cursor.offset += 12;
163171
}
164172

165173
return values;
166174
}
167175

176+
/**
177+
* Convert INT96 to timestamp
178+
* In the Parquet format, INT96 timestamps are stored as:
179+
* - The first 8 bytes (low) represent nanoseconds within the day
180+
* - The last 4 bytes (high) represent the Julian day
181+
*
182+
* @param julianDay Julian day number
183+
* @param nanosSinceMidnight Nanoseconds since midnight
184+
* @returns JavaScript Date object (UTC)
185+
*/
186+
function convertInt96ToTimestamp(julianDay: number, nanosSinceMidnight: number | bigint): Date {
187+
// Julian day 2440588 corresponds to 1970-01-01 (Unix epoch)
188+
const daysSinceEpoch = julianDay - 2440588;
189+
190+
// Convert days to milliseconds (86,400,000 ms per day)
191+
const millisSinceEpoch = daysSinceEpoch * 86400000;
192+
193+
// Convert nanoseconds to milliseconds
194+
const nanosInMillis = Number(BigInt(nanosSinceMidnight) / 1000000n);
195+
196+
// Create a UTC Date
197+
return new Date(millisSinceEpoch + nanosInMillis);
198+
}
199+
168200
function encodeValues_FLOAT(values: number[]) {
169201
const buf = Buffer.alloc(4 * values.length);
170202
for (let i = 0; i < values.length; i++) {
@@ -322,7 +354,7 @@ export const decodeValues = function (type: ValidValueTypes | string, cursor: Cu
322354
return decodeValues_INT64(cursor, count, opts);
323355

324356
case 'INT96':
325-
return decodeValues_INT96(cursor, count);
357+
return decodeValues_INT96(cursor, count, opts);
326358

327359
case 'FLOAT':
328360
return decodeValues_FLOAT(cursor, count);

lib/codec/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export interface Options {
2222
name?: string;
2323
precision?: number;
2424
scale?: number;
25+
treatInt96AsTimestamp?: boolean;
2526
}
2627

2728
export interface Cursor {

lib/declare.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,17 @@ export class NewPageHeader extends parquet_thrift.PageHeader {
212212
headerSize?: number;
213213
}
214214

215+
export interface BufferReaderOptions {
216+
default_dictionary_size?: number;
217+
maxLength?: number;
218+
maxSpan?: number;
219+
queueWait?: number;
220+
metadata?: FileMetaDataExt;
221+
cache?: boolean;
222+
rawStatistics?: boolean;
223+
treatInt96AsTimestamp?: boolean; // Default to false for backward compatibility
224+
}
225+
215226
export interface WriterOptions {
216227
pageIndex?: boolean;
217228
pageSize?: number;

lib/reader.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ export class ParquetReader {
124124
envelopeReader: ParquetEnvelopeReader | null;
125125
metadata: FileMetaDataExt | null;
126126
schema: parquet_schema.ParquetSchema;
127+
treatInt96AsTimestamp: boolean;
127128

128129
/**
129130
* Open the parquet file pointed to by the specified path and return a new
@@ -198,6 +199,9 @@ export class ParquetReader {
198199
if (!PARQUET_VERSIONS.includes(metadata.version)) {
199200
throw 'invalid parquet version';
200201
}
202+
203+
// Default to false for backward compatibility
204+
this.treatInt96AsTimestamp = opts.treatInt96AsTimestamp === true;
201205

202206
// If metadata is a json file then we need to convert INT64 and CTIME
203207
if (metadata.json) {
@@ -411,6 +415,7 @@ export class ParquetEnvelopeReader {
411415
default_dictionary_size: number;
412416
metadata?: FileMetaDataExt;
413417
schema?: parquet_schema.ParquetSchema;
418+
treatInt96AsTimestamp?: boolean;
414419

415420
static async openFile(filePath: string | Buffer | URL, options?: BufferReaderOptions) {
416421
const fileStat = await parquet_util.fstat(filePath);
@@ -564,6 +569,7 @@ export class ParquetEnvelopeReader {
564569
this.fileSize = fileSize;
565570
this.default_dictionary_size = options.default_dictionary_size || 10000000;
566571
this.metadata = metadata;
572+
this.treatInt96AsTimestamp = options.treatInt96AsTimestamp === true;
567573
if (options.maxLength || options.maxSpan || options.queueWait) {
568574
const bufferReader = new BufferReader(this, options);
569575
this.read = (offset, length) => bufferReader.read(offset, length);
@@ -755,6 +761,7 @@ export class ParquetEnvelopeReader {
755761
compression: compression,
756762
column: field,
757763
num_values: metadata.num_values,
764+
treatInt96AsTimestamp: this.treatInt96AsTimestamp
758765
});
759766

760767
// If this exists and is greater than zero then we need to have an offset
@@ -1046,6 +1053,7 @@ async function decodeDataPage(cursor: Cursor, header: parquet_thrift.PageHeader,
10461053
precision: opts.column!.precision,
10471054
scale: opts.column!.scale,
10481055
name: opts.column!.name,
1056+
treatInt96AsTimestamp: opts.treatInt96AsTimestamp,
10491057
});
10501058

10511059
cursor.offset = cursorEnd;
@@ -1109,6 +1117,7 @@ async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeade
11091117

11101118
const values = decodeValues(opts.type!, valueEncoding as ParquetCodec, valuesBufCursor, valueCountNonNull, {
11111119
bitWidth: opts.column!.typeLength!,
1120+
treatInt96AsTimestamp: opts.treatInt96AsTimestamp,
11121121
...opts.column!,
11131122
});
11141123

test/int96_timestamp.test.js

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
'use strict';
2+
const chai = require('chai');
3+
const assert = chai.assert;
4+
const { ParquetReader } = require('../lib/reader');
5+
6+
describe('INT96 timestamp handling', function() {
7+
this.timeout(30000); // Increase timeout for URL fetching
8+
9+
const testUrl = 'https://aws-public-blockchain.s3.us-east-2.amazonaws.com/v1.0/eth/traces/date%3D2016-05-22/part-00000-54f4b70c-db10-479c-a117-e3cc760a7e26-c000.snappy.parquet';
10+
11+
it('should handle INT96 values as numbers by default', async function() {
12+
const reader = await ParquetReader.openUrl(testUrl);
13+
const cursor = reader.getCursor();
14+
15+
// Read the first row
16+
const row = await cursor.next();
17+
18+
// Find INT96 columns (if any)
19+
const schema = reader.getSchema();
20+
const fields = schema.fields;
21+
22+
// Check if there are any INT96 columns and verify they're numbers
23+
let foundInt96 = false;
24+
for (const fieldName in fields) {
25+
const field = fields[fieldName];
26+
if (field.primitiveType === 'INT96') {
27+
foundInt96 = true;
28+
assert.isNumber(row[fieldName], `Expected ${fieldName} to be a number`);
29+
}
30+
}
31+
32+
// If no INT96 columns were found, log a message
33+
if (!foundInt96) {
34+
console.log('No INT96 columns found in the test file');
35+
}
36+
37+
await reader.close();
38+
});
39+
40+
it('should convert INT96 values to timestamps when option is enabled', async function() {
41+
const reader = await ParquetReader.openUrl(testUrl, { treatInt96AsTimestamp: true });
42+
const cursor = reader.getCursor();
43+
44+
// Read the first row
45+
const row = await cursor.next();
46+
47+
// Find INT96 columns (if any)
48+
const schema = reader.getSchema();
49+
const fields = schema.fields;
50+
51+
// Check if there are any INT96 columns and verify they're Date objects
52+
let foundInt96 = false;
53+
for (const fieldName in fields) {
54+
const field = fields[fieldName];
55+
if (field.primitiveType === 'INT96') {
56+
foundInt96 = true;
57+
assert.instanceOf(row[fieldName], Date, `Expected ${fieldName} to be a Date object`);
58+
console.log(`${fieldName} timestamp:`, row[fieldName]);
59+
}
60+
}
61+
62+
// If no INT96 columns were found, log a message
63+
if (!foundInt96) {
64+
console.log('No INT96 columns found in the test file');
65+
}
66+
67+
await reader.close();
68+
});
69+
});

0 commit comments

Comments
 (0)