Skip to content

Commit 41e3a9e

Browse files
authored
add capability and option flag to treat int96 as a timestamp (#162)
# Problem There's currently no way to handle INT96 timestamps. Closes #158 Solution ======== add a flag to enable treating all INT96 fields in a file as timestamps.
1 parent b2d9890 commit 41e3a9e

File tree

6 files changed

+131
-5
lines changed

6 files changed

+131
-5
lines changed

lib/bufferReader.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export interface BufferReaderOptions {
99
default_dictionary_size?: number;
1010
metadata?: FileMetaDataExt;
1111
rawStatistics?: Statistics;
12+
treatInt96AsTimestamp?: boolean;
1213
}
1314

1415
interface BufferReaderQueueRow {

lib/codec/plain.ts

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,17 +146,27 @@ function encodeValues_INT96(values: number[]) {
146146
return buf;
147147
}
148148

149-
function decodeValues_INT96(cursor: Cursor, count: number) {
149+
function decodeValues_INT96(cursor: Cursor, count: number, opts?: Options) {
150150
const values = [];
151+
// Default to false for backward compatibility
152+
const treatAsTimestamp = opts?.treatInt96AsTimestamp === true;
151153

152154
for (let i = 0; i < count; ++i) {
155+
// when treatAsTimestamp is true, low is nanoseconds since midnight
153156
const low = INT53.readInt64LE(cursor.buffer, cursor.offset);
157+
// when treatAsTimestamp is true, high is Julian day
154158
const high = cursor.buffer.readUInt32LE(cursor.offset + 8);
155159

156-
if (high === 0xffffffff) {
157-
values.push(~-low + 1); // truncate to 64 actual precision
160+
if (treatAsTimestamp) {
161+
// Convert Julian day and nanoseconds to a timestamp
162+
values.push(convertInt96ToTimestamp(high, low));
158163
} else {
159-
values.push(low); // truncate to 64 actual precision
164+
// For non-timestamp INT96 values, maintain existing behavior
165+
if (high === 0xffffffff) {
166+
values.push(~-low + 1); // negative value
167+
} else {
168+
values.push(low); // positive value
169+
}
160170
}
161171

162172
cursor.offset += 12;
@@ -165,6 +175,30 @@ function decodeValues_INT96(cursor: Cursor, count: number) {
165175
return values;
166176
}
167177

178+
/**
179+
* Convert INT96 to timestamp
180+
* In the Parquet format, INT96 timestamps are stored as:
181+
* - The first 8 bytes (low) represent nanoseconds within the day
182+
* - The last 4 bytes (high) represent the Julian day
183+
*
184+
* @param julianDay Julian day number
185+
* @param nanosSinceMidnight Nanoseconds since midnight
186+
* @returns JavaScript Date object (UTC)
187+
*/
188+
function convertInt96ToTimestamp(julianDay: number, nanosSinceMidnight: number | bigint): Date {
189+
// Julian day 2440588 corresponds to 1970-01-01 (Unix epoch)
190+
const daysSinceEpoch = julianDay - 2440588;
191+
192+
// Convert days to milliseconds (86,400,000 ms per day)
193+
const millisSinceEpoch = daysSinceEpoch * 86400000;
194+
195+
// Convert nanoseconds to milliseconds
196+
const nanosInMillis = Number(BigInt(nanosSinceMidnight) / 1000000n);
197+
198+
// Create a UTC Date
199+
return new Date(millisSinceEpoch + nanosInMillis);
200+
}
201+
168202
function encodeValues_FLOAT(values: number[]) {
169203
const buf = Buffer.alloc(4 * values.length);
170204
for (let i = 0; i < values.length; i++) {
@@ -322,7 +356,7 @@ export const decodeValues = function (type: ValidValueTypes | string, cursor: Cu
322356
return decodeValues_INT64(cursor, count, opts);
323357

324358
case 'INT96':
325-
return decodeValues_INT96(cursor, count);
359+
return decodeValues_INT96(cursor, count, opts);
326360

327361
case 'FLOAT':
328362
return decodeValues_FLOAT(cursor, count);

lib/codec/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export interface Options {
2222
name?: string;
2323
precision?: number;
2424
scale?: number;
25+
treatInt96AsTimestamp?: boolean;
2526
}
2627

2728
export interface Cursor {

lib/declare.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,17 @@ export class NewPageHeader extends parquet_thrift.PageHeader {
212212
headerSize?: number;
213213
}
214214

215+
export interface BufferReaderOptions {
216+
default_dictionary_size?: number;
217+
maxLength?: number;
218+
maxSpan?: number;
219+
queueWait?: number;
220+
metadata?: FileMetaDataExt;
221+
cache?: boolean;
222+
rawStatistics?: boolean;
223+
treatInt96AsTimestamp?: boolean; // Default to false for backward compatibility
224+
}
225+
215226
export interface WriterOptions {
216227
pageIndex?: boolean;
217228
pageSize?: number;

lib/reader.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ export class ParquetReader {
124124
envelopeReader: ParquetEnvelopeReader | null;
125125
metadata: FileMetaDataExt | null;
126126
schema: parquet_schema.ParquetSchema;
127+
treatInt96AsTimestamp: boolean;
127128

128129
/**
129130
* Open the parquet file pointed to by the specified path and return a new
@@ -199,6 +200,9 @@ export class ParquetReader {
199200
throw 'invalid parquet version';
200201
}
201202

203+
// Default to false for backward compatibility
204+
this.treatInt96AsTimestamp = opts.treatInt96AsTimestamp === true;
205+
202206
// If metadata is a json file then we need to convert INT64 and CTIME
203207
if (metadata.json) {
204208
const convert = (o: Record<string, any>) => {
@@ -411,6 +415,7 @@ export class ParquetEnvelopeReader {
411415
default_dictionary_size: number;
412416
metadata?: FileMetaDataExt;
413417
schema?: parquet_schema.ParquetSchema;
418+
treatInt96AsTimestamp?: boolean;
414419

415420
static async openFile(filePath: string | Buffer | URL, options?: BufferReaderOptions) {
416421
const fileStat = await parquet_util.fstat(filePath);
@@ -564,6 +569,7 @@ export class ParquetEnvelopeReader {
564569
this.fileSize = fileSize;
565570
this.default_dictionary_size = options.default_dictionary_size || 10000000;
566571
this.metadata = metadata;
572+
this.treatInt96AsTimestamp = options.treatInt96AsTimestamp === true;
567573
if (options.maxLength || options.maxSpan || options.queueWait) {
568574
const bufferReader = new BufferReader(this, options);
569575
this.read = (offset, length) => bufferReader.read(offset, length);
@@ -755,6 +761,7 @@ export class ParquetEnvelopeReader {
755761
compression: compression,
756762
column: field,
757763
num_values: metadata.num_values,
764+
treatInt96AsTimestamp: this.treatInt96AsTimestamp,
758765
});
759766

760767
// If this exists and is greater than zero then we need to have an offset
@@ -1046,6 +1053,7 @@ async function decodeDataPage(cursor: Cursor, header: parquet_thrift.PageHeader,
10461053
precision: opts.column!.precision,
10471054
scale: opts.column!.scale,
10481055
name: opts.column!.name,
1056+
treatInt96AsTimestamp: opts.treatInt96AsTimestamp,
10491057
});
10501058

10511059
cursor.offset = cursorEnd;
@@ -1109,6 +1117,7 @@ async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeade
11091117

11101118
const values = decodeValues(opts.type!, valueEncoding as ParquetCodec, valuesBufCursor, valueCountNonNull, {
11111119
bitWidth: opts.column!.typeLength!,
1120+
treatInt96AsTimestamp: opts.treatInt96AsTimestamp,
11121121
...opts.column!,
11131122
});
11141123

test/int96_timestamp.test.js

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
'use strict';
2+
const chai = require('chai');
3+
const assert = chai.assert;
4+
const { ParquetReader } = require('../lib/reader');
5+
6+
describe('INT96 timestamp handling', function () {
7+
this.timeout(30000); // Increase timeout for URL fetching
8+
9+
const testUrl =
10+
'https://aws-public-blockchain.s3.us-east-2.amazonaws.com/v1.0/eth/traces/date%3D2016-05-22/part-00000-54f4b70c-db10-479c-a117-e3cc760a7e26-c000.snappy.parquet';
11+
12+
it('should handle INT96 values as numbers by default', async function () {
13+
const reader = await ParquetReader.openUrl(testUrl);
14+
const cursor = reader.getCursor();
15+
16+
// Read the first row
17+
const row = await cursor.next();
18+
19+
// Find INT96 columns (if any)
20+
const schema = reader.getSchema();
21+
const fields = schema.fields;
22+
23+
// Check if there are any INT96 columns and verify they're numbers
24+
let foundInt96 = false;
25+
for (const fieldName in fields) {
26+
const field = fields[fieldName];
27+
if (field.primitiveType === 'INT96') {
28+
foundInt96 = true;
29+
assert.isNumber(row[fieldName], `Expected ${fieldName} to be a number`);
30+
}
31+
}
32+
33+
// If no INT96 columns were found, log a message
34+
if (!foundInt96) {
35+
console.log('No INT96 columns found in the test file');
36+
}
37+
38+
await reader.close();
39+
});
40+
41+
it('should convert INT96 values to timestamps when option is enabled', async function () {
42+
const reader = await ParquetReader.openUrl(testUrl, { treatInt96AsTimestamp: true });
43+
const cursor = reader.getCursor();
44+
45+
// Read the first row
46+
const row = await cursor.next();
47+
48+
// Find INT96 columns (if any)
49+
const schema = reader.getSchema();
50+
const fields = schema.fields;
51+
52+
// Check if there are any INT96 columns and verify they're Date objects
53+
let foundInt96 = false;
54+
for (const fieldName in fields) {
55+
const field = fields[fieldName];
56+
if (field.primitiveType === 'INT96') {
57+
foundInt96 = true;
58+
assert.instanceOf(row[fieldName], Date, `Expected ${fieldName} to be a Date object`);
59+
console.log(`${fieldName} timestamp:`, row[fieldName]);
60+
}
61+
}
62+
63+
// If no INT96 columns were found, log a message
64+
if (!foundInt96) {
65+
console.log('No INT96 columns found in the test file');
66+
}
67+
68+
await reader.close();
69+
});
70+
});

0 commit comments

Comments
 (0)