Skip to content

Commit 6b7bea9

Browse files
Logical type : TIME (#143)
# Problem Part of #99 ![image](https://github.com/user-attachments/assets/3526d69a-b89b-4513-b02c-39ff03e91af3) Support logical types in parquetjs starting with `TIME` support Solution ======== Implementation following the parquet [spec](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype) ## Change summary: - Added a logical type to support parquet types - Implemented type conversions for logical type time to primitive type according to spec - Implement field, schema and file tests to ensure everything works together ## Steps to Verify: 1. npm run test
1 parent 6797c99 commit 6b7bea9

19 files changed

+956
-11
lines changed

lib/codec/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import { PrimitiveType } from '../declare';
22
import { ParquetCodec, OriginalType, ParquetField } from '../declare';
3-
import { Statistics } from '../../gen-nodejs/parquet_types';
3+
import { LogicalType, Statistics } from '../../gen-nodejs/parquet_types';
44

55
export interface Options {
66
typeLength: number;
77
bitWidth: number;
88
disableEnvelope?: boolean;
99
primitiveType?: PrimitiveType;
1010
originalType?: OriginalType;
11+
logicalType?: LogicalType;
1112
encoding?: ParquetCodec;
1213
compression?: string;
1314
column?: ParquetField;

lib/declare.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Thanks to https://github.com/kbajalc/parquets
22

3-
import parquet_thrift from '../gen-nodejs/parquet_types';
3+
import parquet_thrift, { LogicalType } from '../gen-nodejs/parquet_types';
44
import {
55
Statistics,
66
OffsetIndex,
@@ -61,6 +61,7 @@ export type SchemaDefinition = Record<string, FieldDefinition>;
6161
export interface FieldDefinition {
6262
type?: ParquetType;
6363
typeLength?: number;
64+
logicalType?: LogicalType;
6465
encoding?: ParquetCodec;
6566
compression?: ParquetCompression;
6667
optional?: boolean;
@@ -80,6 +81,7 @@ export interface ParquetField {
8081
primitiveType?: PrimitiveType;
8182
originalType?: OriginalType;
8283
repetitionType: RepetitionType;
84+
logicalType?: LogicalType;
8385
typeLength?: number;
8486
encoding?: ParquetCodec;
8587
compression?: ParquetCompression;

lib/fields.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Helper functions for creating fields
22

3-
import { FieldDefinition, ParquetType, SchemaDefinition } from './declare';
3+
import { LogicalType, TimeType } from '../gen-nodejs/parquet_types';
4+
import { FieldDefinition, ParquetType, PrimitiveType, SchemaDefinition } from './declare';
45

56
export function createStringField(optional = true, fieldOptions: FieldDefinition = {}): FieldDefinition {
67
return { ...fieldOptions, optional, type: 'UTF8' };
@@ -80,3 +81,24 @@ export function createListField(
8081
},
8182
};
8283
}
84+
85+
export function createTimeField(
86+
logicalType: TimeType,
87+
optional = true,
88+
fieldOptions: FieldDefinition = {}
89+
): FieldDefinition {
90+
let primitiveType: PrimitiveType;
91+
if (logicalType.unit.MILLIS) {
92+
primitiveType = 'INT32'; // TIME_MILLIS uses INT32
93+
} else if (logicalType.unit.MICROS || logicalType.unit.NANOS) {
94+
primitiveType = 'INT64'; // TIME_MICROS and TIME_NANOS use INT64
95+
} else {
96+
throw new Error('Unsupported time unit in logicalType');
97+
}
98+
return {
99+
...fieldOptions,
100+
optional,
101+
type: primitiveType,
102+
logicalType: new LogicalType({ TIME: logicalType }),
103+
};
104+
}

lib/jsonSchema.ts

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import { JSONSchema4 } from 'json-schema';
33
import { FieldDefinition, SchemaDefinition } from './declare';
44
import * as fields from './fields';
5+
import { TimeUnit } from '../gen-nodejs/parquet_types';
6+
import { TimeType } from '../gen-nodejs/parquet_types';
57

68
type SupportedJSONSchema4 = Omit<
79
JSONSchema4,
@@ -70,18 +72,52 @@ const fromJsonSchemaArray = (fieldValue: SupportedJSONSchema4, optionalFieldList
7072

7173
switch (fieldValue.items.type) {
7274
case 'string':
73-
if (fieldValue.items.format && fieldValue.items.format == 'date-time') {
75+
if (fieldValue.items.format && fieldValue.items.format === 'date-time') {
7476
return fields.createListField('TIMESTAMP_MILLIS', optionalFieldList);
7577
}
7678
return fields.createListField('UTF8', optionalFieldList);
79+
7780
case 'integer':
7881
return fields.createListField('INT64', optionalFieldList);
82+
7983
case 'number':
8084
return fields.createListField('DOUBLE', optionalFieldList);
85+
8186
case 'boolean':
8287
return fields.createListField('BOOLEAN', optionalFieldList);
88+
8389
case 'object':
90+
// Handle array of time fields
91+
if (
92+
fieldValue.items.properties &&
93+
fieldValue.items.properties.unit &&
94+
fieldValue.items.properties.isAdjustedToUTC
95+
) {
96+
if (!fieldValue.items.properties.unit.enum) {
97+
throw new UnsupportedJsonSchemaError('Unit enum is not defined');
98+
}
99+
const unit = fieldValue.items.properties.unit.default || fieldValue.items.properties.unit.enum[0];
100+
const isAdjustedToUTC = !!fieldValue.items.properties.isAdjustedToUTC.default;
101+
let timeUnit: TimeUnit;
102+
103+
switch (unit) {
104+
case 'MICROS':
105+
timeUnit = new TimeUnit({ MICROS: true });
106+
break;
107+
case 'NANOS':
108+
timeUnit = new TimeUnit({ NANOS: true });
109+
break;
110+
default:
111+
timeUnit = new TimeUnit({ MILLIS: true });
112+
break;
113+
}
114+
115+
const timeLogicalType = new TimeType({ isAdjustedToUTC, unit: timeUnit });
116+
return fields.createTimeField(timeLogicalType, optionalFieldList);
117+
}
118+
84119
return fields.createStructListField(fromJsonSchema(fieldValue.items), optionalFieldList);
120+
85121
default:
86122
throw new UnsupportedJsonSchemaError(`Array field type ${JSON.stringify(fieldValue.items)} is unsupported.`);
87123
}
@@ -100,20 +136,49 @@ const fromJsonSchemaField =
100136

101137
switch (fieldValue.type) {
102138
case 'string':
103-
if (fieldValue.format && fieldValue.format == 'date-time') {
139+
if (fieldValue.format && fieldValue.format === 'date-time') {
104140
return fields.createTimestampField(optional);
105141
}
106142
return fields.createStringField(optional);
143+
107144
case 'integer':
108145
return fields.createIntField(64, optional);
146+
109147
case 'number':
110148
return fields.createDoubleField(optional);
149+
111150
case 'boolean':
112151
return fields.createBooleanField(optional);
152+
113153
case 'array':
114154
return fromJsonSchemaArray(fieldValue, optional);
155+
115156
case 'object':
157+
if (fieldValue.properties && fieldValue.properties.unit && fieldValue.properties.isAdjustedToUTC) {
158+
if (!fieldValue.properties.unit.enum) {
159+
throw new UnsupportedJsonSchemaError('Unit enum is not defined');
160+
}
161+
const unit = fieldValue.properties.unit.default || fieldValue.properties.unit.enum[0];
162+
const isAdjustedToUTC = !!fieldValue.properties.isAdjustedToUTC.default;
163+
let timeUnit: TimeUnit;
164+
switch (unit) {
165+
case 'MICROS':
166+
timeUnit = new TimeUnit({ MICROS: true });
167+
break;
168+
case 'NANOS':
169+
timeUnit = new TimeUnit({ NANOS: true });
170+
break;
171+
default:
172+
timeUnit = new TimeUnit({ MILLIS: true });
173+
break;
174+
}
175+
176+
const timeLogicalType = new TimeType({ isAdjustedToUTC, unit: timeUnit });
177+
return fields.createTimeField(timeLogicalType, optional);
178+
}
179+
116180
return fields.createStructField(fromJsonSchema(fieldValue), optional);
181+
117182
default:
118183
throw new UnsupportedJsonSchemaError(
119184
`Unable to convert "${fieldName}" with JSON Schema type "${fieldValue.type}" to a Parquet Schema.`

lib/schema.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
126126
statistics: opts.statistics,
127127
fieldCount: Object.keys(opts.fields).length,
128128
fields: buildFields(opts.fields, rLevelMax, dLevelMax, path.concat(name)),
129+
logicalType: opts.logicalType,
129130
};
130131

131132
if (opts.type == 'LIST' || opts.type == 'MAP') fieldList[name].originalType = opts.type;
@@ -174,6 +175,7 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
174175
name: name,
175176
primitiveType: typeDef.primitiveType,
176177
originalType: typeDef.originalType,
178+
logicalType: opts.logicalType,
177179
path: path.concat([name]),
178180
repetitionType: repetitionType,
179181
encoding: opts.encoding,

lib/types.ts

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,28 @@ interface INTERVAL {
2121
milliseconds: number;
2222
}
2323

24+
interface TIME {
25+
value: string | bigint | number;
26+
unit: 'MILLIS' | 'MICROS' | 'NANOS';
27+
isAdjustedToUTC: boolean;
28+
}
29+
2430
export function getParquetTypeDataObject(
2531
type: ParquetType,
2632
field?: ParquetField | Options | FieldDefinition
2733
): ParquetTypeDataObject {
2834
if (type === 'DECIMAL') {
29-
if (field?.typeLength !== undefined && field?.typeLength !== null) {
35+
if (field?.typeLength !== undefined) {
3036
return {
3137
primitiveType: 'FIXED_LEN_BYTE_ARRAY',
3238
originalType: 'DECIMAL',
3339
typeLength: field.typeLength,
3440
toPrimitive: toPrimitive_FIXED_LEN_BYTE_ARRAY_DECIMAL,
3541
};
36-
} else if (field?.precision !== undefined && field?.precision !== null && field.precision > 18) {
42+
} else if (field?.precision && field.precision > 18) {
3743
return {
3844
primitiveType: 'BYTE_ARRAY',
3945
originalType: 'DECIMAL',
40-
typeLength: field.typeLength,
4146
toPrimitive: toPrimitive_BYTE_ARRAY_DECIMAL,
4247
};
4348
} else {
@@ -47,6 +52,29 @@ export function getParquetTypeDataObject(
4752
toPrimitive: toPrimitive_INT64,
4853
};
4954
}
55+
} else if (field?.logicalType?.TIME) {
56+
const unit = field.logicalType.TIME.unit;
57+
if (unit.MILLIS) {
58+
return {
59+
originalType: 'TIME_MILLIS',
60+
primitiveType: 'INT32',
61+
toPrimitive: toPrimitive_TIME,
62+
};
63+
}
64+
if (unit.MICROS) {
65+
return {
66+
originalType: 'TIME_MICROS',
67+
primitiveType: 'INT64',
68+
toPrimitive: toPrimitive_TIME,
69+
};
70+
}
71+
if (unit.NANOS) {
72+
return {
73+
primitiveType: 'INT64',
74+
toPrimitive: toPrimitive_TIME,
75+
};
76+
}
77+
throw new Error('TIME type must have a valid unit (MILLIS, MICROS, NANOS).');
5078
} else {
5179
return PARQUET_LOGICAL_TYPE_DATA[type];
5280
}
@@ -560,3 +588,41 @@ function checkValidValue(lowerRange: number | bigint, upperRange: number | bigin
560588
throw 'invalid value';
561589
}
562590
}
591+
592+
function toPrimitive_TIME(time: TIME): bigint | number {
593+
const { value, unit, isAdjustedToUTC } = time;
594+
595+
const timeValue = typeof value === 'string' ? BigInt(value) : BigInt(value);
596+
597+
if (isAdjustedToUTC) {
598+
return unit === 'MILLIS' ? Number(timeValue) : timeValue;
599+
} else {
600+
switch (unit) {
601+
case 'MILLIS':
602+
return Number(adjustToLocalTimestamp(timeValue, { MILLIS: true }));
603+
case 'MICROS':
604+
return adjustToLocalTimestamp(timeValue, { MICROS: true });
605+
case 'NANOS':
606+
return adjustToLocalTimestamp(timeValue, { NANOS: true });
607+
default:
608+
throw new Error(`Unsupported time unit: ${unit}`);
609+
}
610+
}
611+
}
612+
613+
function adjustToLocalTimestamp(
614+
timestamp: bigint,
615+
unit: { MILLIS?: boolean; MICROS?: boolean; NANOS?: boolean }
616+
): bigint {
617+
const localOffset = BigInt(new Date().getTimezoneOffset()) * 60n * 1000n; // Offset in milliseconds
618+
619+
if (unit.MILLIS) {
620+
return timestamp - localOffset;
621+
} else if (unit.MICROS) {
622+
return timestamp - localOffset * 1000n;
623+
} else if (unit.NANOS) {
624+
return timestamp - localOffset * 1000000n;
625+
}
626+
627+
throw new Error('Unsupported time unit');
628+
}

test/decodeSchema.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ describe('ParquetSchema', function () {
120120
dLevelMax: 0,
121121
isNested: true,
122122
fieldCount: 2,
123+
logicalType: undefined,
123124
fields: {
124125
b: {
125126
name: 'b',
@@ -130,6 +131,7 @@ describe('ParquetSchema', function () {
130131
dLevelMax: 0,
131132
isNested: true,
132133
fieldCount: 2,
134+
logicalType: undefined,
133135
fields: {
134136
c: {
135137
name: 'c',
@@ -140,6 +142,7 @@ describe('ParquetSchema', function () {
140142
dLevelMax: 0,
141143
isNested: true,
142144
fieldCount: 1,
145+
logicalType: undefined,
143146
fields: {
144147
d: {
145148
name: 'd',
@@ -150,6 +153,7 @@ describe('ParquetSchema', function () {
150153
statistics: undefined,
151154
typeLength: undefined,
152155
encoding: 'PLAIN',
156+
logicalType: undefined,
153157
compression: 'UNCOMPRESSED',
154158
rLevelMax: 0,
155159
dLevelMax: 0,
@@ -167,6 +171,7 @@ describe('ParquetSchema', function () {
167171
dLevelMax: 0,
168172
isNested: true,
169173
fieldCount: 2,
174+
logicalType: undefined,
170175
fields: {
171176
f: {
172177
name: 'f',
@@ -177,6 +182,7 @@ describe('ParquetSchema', function () {
177182
statistics: undefined,
178183
typeLength: undefined,
179184
encoding: 'PLAIN',
185+
logicalType: undefined,
180186
compression: 'UNCOMPRESSED',
181187
rLevelMax: 0,
182188
dLevelMax: 0,
@@ -192,6 +198,7 @@ describe('ParquetSchema', function () {
192198
statistics: undefined,
193199
typeLength: undefined,
194200
encoding: 'PLAIN',
201+
logicalType: undefined,
195202
compression: 'UNCOMPRESSED',
196203
rLevelMax: 0,
197204
dLevelMax: 0,
@@ -211,6 +218,7 @@ describe('ParquetSchema', function () {
211218
statistics: undefined,
212219
typeLength: undefined,
213220
encoding: 'PLAIN',
221+
logicalType: undefined,
214222
compression: 'UNCOMPRESSED',
215223
rLevelMax: 0,
216224
dLevelMax: 0,

0 commit comments

Comments
 (0)