-
Notifications
You must be signed in to change notification settings - Fork 27
Logical type : TIME #143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Logical type : TIME #143
Changes from all commits
fb4b59a
1ca98b8
8fcafe2
9949fd4
679c96c
d3a1382
e22fbe4
ddfcc0e
05e2414
38a1d57
dddb09a
9401f67
13007a5
ae58c96
42d72ad
510f402
bc9e11c
44f64aa
97b1475
6485753
6927f31
bd1cb5d
abf7f0b
0a5040f
119bc70
6958dab
6ec10b8
96ec9f7
06b7441
8ca323a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,8 @@ | |
import { JSONSchema4 } from 'json-schema'; | ||
import { FieldDefinition, SchemaDefinition } from './declare'; | ||
import * as fields from './fields'; | ||
import { TimeUnit } from '../gen-nodejs/parquet_types'; | ||
import { TimeType } from '../gen-nodejs/parquet_types'; | ||
|
||
type SupportedJSONSchema4 = Omit< | ||
JSONSchema4, | ||
|
@@ -70,18 +72,52 @@ const fromJsonSchemaArray = (fieldValue: SupportedJSONSchema4, optionalFieldList | |
|
||
switch (fieldValue.items.type) { | ||
case 'string': | ||
if (fieldValue.items.format && fieldValue.items.format == 'date-time') { | ||
if (fieldValue.items.format && fieldValue.items.format === 'date-time') { | ||
return fields.createListField('TIMESTAMP_MILLIS', optionalFieldList); | ||
} | ||
return fields.createListField('UTF8', optionalFieldList); | ||
|
||
case 'integer': | ||
return fields.createListField('INT64', optionalFieldList); | ||
|
||
case 'number': | ||
return fields.createListField('DOUBLE', optionalFieldList); | ||
|
||
case 'boolean': | ||
return fields.createListField('BOOLEAN', optionalFieldList); | ||
|
||
case 'object': | ||
// Handle array of time fields | ||
if ( | ||
fieldValue.items.properties && | ||
fieldValue.items.properties.unit && | ||
fieldValue.items.properties.isAdjustedToUTC | ||
) { | ||
if (!fieldValue.items.properties.unit.enum) { | ||
throw new UnsupportedJsonSchemaError('Unit enum is not defined'); | ||
} | ||
const unit = fieldValue.items.properties.unit.default || fieldValue.items.properties.unit.enum[0]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A question here, how will units be defined in schema, an enum? or plain string. if enum do we initialize all unit types as separate fields? |
||
const isAdjustedToUTC = !!fieldValue.items.properties.isAdjustedToUTC.default; | ||
let timeUnit: TimeUnit; | ||
|
||
switch (unit) { | ||
case 'MICROS': | ||
timeUnit = new TimeUnit({ MICROS: true }); | ||
break; | ||
case 'NANOS': | ||
timeUnit = new TimeUnit({ NANOS: true }); | ||
break; | ||
default: | ||
timeUnit = new TimeUnit({ MILLIS: true }); | ||
break; | ||
} | ||
|
||
const timeLogicalType = new TimeType({ isAdjustedToUTC, unit: timeUnit }); | ||
return fields.createTimeField(timeLogicalType, optionalFieldList); | ||
} | ||
|
||
return fields.createStructListField(fromJsonSchema(fieldValue.items), optionalFieldList); | ||
|
||
default: | ||
throw new UnsupportedJsonSchemaError(`Array field type ${JSON.stringify(fieldValue.items)} is unsupported.`); | ||
} | ||
|
@@ -100,20 +136,49 @@ const fromJsonSchemaField = | |
|
||
switch (fieldValue.type) { | ||
case 'string': | ||
if (fieldValue.format && fieldValue.format == 'date-time') { | ||
if (fieldValue.format && fieldValue.format === 'date-time') { | ||
shannonwells marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return fields.createTimestampField(optional); | ||
} | ||
return fields.createStringField(optional); | ||
|
||
case 'integer': | ||
return fields.createIntField(64, optional); | ||
|
||
case 'number': | ||
return fields.createDoubleField(optional); | ||
|
||
case 'boolean': | ||
return fields.createBooleanField(optional); | ||
|
||
case 'array': | ||
return fromJsonSchemaArray(fieldValue, optional); | ||
|
||
case 'object': | ||
if (fieldValue.properties && fieldValue.properties.unit && fieldValue.properties.isAdjustedToUTC) { | ||
if (!fieldValue.properties.unit.enum) { | ||
throw new UnsupportedJsonSchemaError('Unit enum is not defined'); | ||
} | ||
const unit = fieldValue.properties.unit.default || fieldValue.properties.unit.enum[0]; | ||
const isAdjustedToUTC = !!fieldValue.properties.isAdjustedToUTC.default; | ||
let timeUnit: TimeUnit; | ||
switch (unit) { | ||
case 'MICROS': | ||
timeUnit = new TimeUnit({ MICROS: true }); | ||
break; | ||
case 'NANOS': | ||
timeUnit = new TimeUnit({ NANOS: true }); | ||
break; | ||
default: | ||
timeUnit = new TimeUnit({ MILLIS: true }); | ||
break; | ||
} | ||
|
||
const timeLogicalType = new TimeType({ isAdjustedToUTC, unit: timeUnit }); | ||
return fields.createTimeField(timeLogicalType, optional); | ||
} | ||
|
||
return fields.createStructField(fromJsonSchema(fieldValue), optional); | ||
|
||
default: | ||
throw new UnsupportedJsonSchemaError( | ||
`Unable to convert "${fieldName}" with JSON Schema type "${fieldValue.type}" to a Parquet Schema.` | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,23 +21,28 @@ interface INTERVAL { | |
milliseconds: number; | ||
} | ||
|
||
interface TIME { | ||
value: string | bigint | number; | ||
unit: 'MILLIS' | 'MICROS' | 'NANOS'; | ||
isAdjustedToUTC: boolean; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Type for handling incoming time type and handle accordingly |
||
|
||
export function getParquetTypeDataObject( | ||
type: ParquetType, | ||
field?: ParquetField | Options | FieldDefinition | ||
): ParquetTypeDataObject { | ||
if (type === 'DECIMAL') { | ||
if (field?.typeLength !== undefined && field?.typeLength !== null) { | ||
if (field?.typeLength !== undefined) { | ||
return { | ||
primitiveType: 'FIXED_LEN_BYTE_ARRAY', | ||
originalType: 'DECIMAL', | ||
typeLength: field.typeLength, | ||
toPrimitive: toPrimitive_FIXED_LEN_BYTE_ARRAY_DECIMAL, | ||
}; | ||
} else if (field?.precision !== undefined && field?.precision !== null && field.precision > 18) { | ||
} else if (field?.precision && field.precision > 18) { | ||
saraswatpuneet marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return { | ||
primitiveType: 'BYTE_ARRAY', | ||
originalType: 'DECIMAL', | ||
typeLength: field.typeLength, | ||
toPrimitive: toPrimitive_BYTE_ARRAY_DECIMAL, | ||
}; | ||
} else { | ||
|
@@ -47,6 +52,29 @@ export function getParquetTypeDataObject( | |
toPrimitive: toPrimitive_INT64, | ||
}; | ||
} | ||
} else if (field?.logicalType?.TIME) { | ||
const unit = field.logicalType.TIME.unit; | ||
if (unit.MILLIS) { | ||
return { | ||
originalType: 'TIME_MILLIS', | ||
primitiveType: 'INT32', | ||
toPrimitive: toPrimitive_TIME, | ||
}; | ||
} | ||
if (unit.MICROS) { | ||
return { | ||
originalType: 'TIME_MICROS', | ||
primitiveType: 'INT64', | ||
toPrimitive: toPrimitive_TIME, | ||
}; | ||
} | ||
if (unit.NANOS) { | ||
return { | ||
primitiveType: 'INT64', | ||
toPrimitive: toPrimitive_TIME, | ||
}; | ||
} | ||
throw new Error('TIME type must have a valid unit (MILLIS, MICROS, NANOS).'); | ||
} else { | ||
return PARQUET_LOGICAL_TYPE_DATA[type]; | ||
} | ||
|
@@ -560,3 +588,41 @@ function checkValidValue(lowerRange: number | bigint, upperRange: number | bigin | |
throw 'invalid value'; | ||
} | ||
} | ||
|
||
function toPrimitive_TIME(time: TIME): bigint | number { | ||
const { value, unit, isAdjustedToUTC } = time; | ||
|
||
const timeValue = typeof value === 'string' ? BigInt(value) : BigInt(value); | ||
|
||
if (isAdjustedToUTC) { | ||
return unit === 'MILLIS' ? Number(timeValue) : timeValue; | ||
} else { | ||
switch (unit) { | ||
case 'MILLIS': | ||
return Number(adjustToLocalTimestamp(timeValue, { MILLIS: true })); | ||
case 'MICROS': | ||
return adjustToLocalTimestamp(timeValue, { MICROS: true }); | ||
case 'NANOS': | ||
return adjustToLocalTimestamp(timeValue, { NANOS: true }); | ||
default: | ||
throw new Error(`Unsupported time unit: ${unit}`); | ||
} | ||
} | ||
} | ||
|
||
function adjustToLocalTimestamp( | ||
timestamp: bigint, | ||
unit: { MILLIS?: boolean; MICROS?: boolean; NANOS?: boolean } | ||
): bigint { | ||
const localOffset = BigInt(new Date().getTimezoneOffset()) * 60n * 1000n; // Offset in milliseconds | ||
|
||
if (unit.MILLIS) { | ||
return timestamp - localOffset; | ||
} else if (unit.MICROS) { | ||
return timestamp - localOffset * 1000n; | ||
} else if (unit.NANOS) { | ||
return timestamp - localOffset * 1000000n; | ||
} | ||
|
||
throw new Error('Unsupported time unit'); | ||
} | ||
saraswatpuneet marked this conversation as resolved.
Show resolved
Hide resolved
|
Uh oh!
There was an error while loading. Please reload this page.