Skip to content

Commit d5c2bba

Browse files
sahilpalviapfifer
authored andcommitted
Introducing Enhanced Fan-Out support (#31)
* Introducing support for KCL 2.x Changes for EFO support * Introducing input objects for ruby * Introducing kclrb v2 module which supports interfaces from KCL 2.x The sample no longer uses an external file, but instead emits records to stderror. Updating all KCL dependencies to 2.1.1
1 parent fc670d5 commit d5c2bba

File tree

9 files changed

+319
-107
lines changed

9 files changed

+319
-107
lines changed

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.0.1
1+
2.0.0

aws-kclrb.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Gem::Specification.new do |spec|
88
spec.files = Dir['lib/**/*.rb']
99
spec.files += Dir['spec/**/*.rb']
1010
spec.files += ['README.md', 'LICENSE.txt', 'VERSION', 'NOTICE.txt', '.yardopts', '.rspec']
11-
spec.license = 'Amazon Software License'
11+
spec.licenses = ['Amazon Software License']
1212
spec.platform = Gem::Platform::RUBY
1313
spec.homepage = 'http://github.com/aws/amazon-kinesis-client-ruby'
1414
spec.require_paths = ['lib']

lib/aws/kclrb.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@
1313
# permissions and limitations under the License.
1414

1515
require 'aws/kclrb/record_processor'
16+
require 'aws/kclrb/messages'
1617
require 'aws/kclrb/kcl_process'

lib/aws/kclrb/kcl_process.rb

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@
1414

1515
require 'aws/kclrb/io_proxy'
1616
require 'aws/kclrb/checkpointer'
17+
require 'aws/kclrb/messages'
18+
require 'aws/kclrb/record_processor'
1719

1820
module Aws
1921
module KCLrb
2022
# Error raised if the {KCLProcess} received an input action that it
2123
# could not parse or it could not handle.
22-
class MalformedAction < RuntimeError; end
23-
24+
class MalformedAction < RuntimeError;
25+
end
26+
2427
# Entry point for a KCL application in Ruby.
2528
#
2629
# Implementers of KCL applications in Ruby should instantiate this
@@ -31,12 +34,16 @@ class KCLProcess
3134
# @param input [IO] An `IO`-like object to read input lines from.
3235
# @param output [IO] An `IO`-like object to write output lines to.
3336
# @param error [IO] An `IO`-like object to write error lines to.
34-
def initialize(processor, input=$stdin, output=$stdout, error=$stderr)
35-
@processor = processor
37+
def initialize(processor, input = $stdin, output = $stdout, error = $stderr)
38+
if processor.version == 1
39+
@processor = Aws::KCLrb::V2::V2ToV1Adapter.new(processor)
40+
else
41+
@processor = processor
42+
end
3643
@io_proxy = IOProxy.new(input, output, error)
3744
@checkpointer = CheckpointerImpl.new(@io_proxy)
3845
end
39-
46+
4047
# Starts this KCL processor's main loop.
4148
def run
4249
action = @io_proxy.read_action
@@ -45,9 +52,9 @@ def run
4552
action = @io_proxy.read_action
4653
end
4754
end
48-
55+
4956
private
50-
57+
5158
# @api private
5259
# Parses an input action and invokes the appropriate method of the
5360
# record processor.
@@ -63,21 +70,28 @@ def process_action(action)
6370
action_name = action.fetch('action')
6471
case action_name
6572
when 'initialize'
66-
dispatch_to_processor(:init_processor, action.fetch('shardId'))
73+
dispatch_to_processor(:init_processor,
74+
Aws::KCLrb::V2::InitializeInput.new(action.fetch('shardId'),
75+
action.fetch('sequenceNumber')))
6776
when 'processRecords'
68-
dispatch_to_processor(:process_records, action.fetch('records'), @checkpointer)
69-
when 'shutdown'
70-
dispatch_to_processor(:shutdown, @checkpointer, action.fetch('reason'))
77+
dispatch_to_processor(:process_records,
78+
Aws::KCLrb::V2::ProcessRecordsInput.new(action.fetch('records'),
79+
action.fetch('millisBehindLatest'),
80+
@checkpointer))
81+
when 'leaseLost'
82+
dispatch_to_processor(:lease_lost, Aws::KCLrb::V2::LeaseLostInput.new)
83+
when 'shardEnded'
84+
dispatch_to_processor(:shard_ended, Aws::KCLrb::V2::ShardEndedInput.new(@checkpointer))
7185
when 'shutdownRequested'
72-
dispatch_to_processor(:shutdown_requested, @checkpointer)
86+
dispatch_to_processor(:shutdown_requested, Aws::KCLrb::V2::ShutdownRequestedInput.new(@checkpointer))
7387
else
7488
raise MalformedAction.new("Received an action which couldn't be understood. Action was '#{action}'")
7589
end
7690
@io_proxy.write_action('status', {'responseFor' => action_name})
7791
rescue KeyError => ke
7892
raise MalformedAction.new("Action '#{action}': #{ke.message}")
7993
end
80-
94+
8195
# @api private
8296
# Calls the specified method on the record processor, and handles
8397
# any resulting exceptions by writing to the error stream.
@@ -91,7 +105,7 @@ def dispatch_to_processor(method, *args)
91105
# of issue.
92106
@io_proxy.write_error(processor_error)
93107
end
94-
108+
95109
end
96110
end
97111
end

lib/aws/kclrb/messages.rb

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
module Aws
2+
module KCLrb
3+
module V2
4+
# @abstract
5+
# Input object for RecordProcessorBase#init_processor method.
6+
class InitializeInput
7+
attr_reader :shard_id, :sequence_number
8+
9+
def initialize(shard_id, sequence_number)
10+
@shard_id = shard_id
11+
@sequence_number = sequence_number
12+
end
13+
end
14+
15+
# @abstract
16+
# Input object for RecordProcessorBase#process_records method.
17+
class ProcessRecordsInput
18+
attr_reader :records, :millis_behind_latest, :checkpointer
19+
20+
def initialize(records, millis_behind_latest, checkpointer = nil)
21+
@records = records
22+
@millis_behind_latest = millis_behind_latest
23+
@checkpointer = checkpointer
24+
end
25+
end
26+
27+
# @abstract
28+
# Input object for RecordProcessorBase#lease_lost method.
29+
class LeaseLostInput
30+
end
31+
32+
# @abstract
33+
# Input object forRecordProcessorBase#shard_ended method.
34+
class ShardEndedInput
35+
attr_reader :checkpointer
36+
37+
def initialize(checkpointer = nil)
38+
@checkpointer = checkpointer
39+
end
40+
end
41+
42+
# @abstract
43+
# Input object for RecordProcessorBase#shutdown_requested method.
44+
class ShutdownRequestedInput
45+
attr_reader :checkpointer
46+
47+
def initialize(checkpointer = nil)
48+
@checkpointer = checkpointer
49+
end
50+
end
51+
end
52+
end
53+
end

lib/aws/kclrb/record_processor.rb

Lines changed: 106 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
33
#
44
# Licensed under the Amazon Software License (the "License").
55
# You may not use this file except in compliance with the License.
@@ -32,7 +32,7 @@ class RecordProcessorBase
3232
def init_processor(shard_id)
3333
fail NotImplementedError.new
3434
end
35-
35+
3636
# @abstract
3737
# Called by a KCLProcess with a list of records to be processed and a checkpointer
3838
# which accepts sequence numbers from the records to indicate where in the stream
@@ -52,7 +52,7 @@ def init_processor(shard_id)
5252
def process_records(records, checkpointer)
5353
fail NotImplementedError.new
5454
end
55-
55+
5656
# @abstract
5757
# Called by a KCLProcess instance to indicate that this record processor
5858
# should shutdown. After this is called, there will be no more calls to
@@ -81,6 +81,109 @@ def shutdown(checkpointer, reason)
8181
# number or no parameters.
8282
def shutdown_requested(checkpointer)
8383
end
84+
85+
def version
86+
1
87+
end
88+
end
89+
90+
module V2
91+
# @abstract
92+
# Base class for implementing a record processor.
93+
#
94+
# A `RecordProcessor` processes a shard in a stream. See {https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java the corresponding KCL interface}.
95+
# Its methods will be called as follows:
96+
#
97+
# 1. {#init_processor} will be called once
98+
# 2. {#process_records} will be called zero or more times
99+
# 3. {#lease_lost} will be called zero to one time
100+
# 4. {#shard_ended} will be called zero or more times
101+
# 5. {#shutdown_requested} will be called zero to one time
102+
class RecordProcessorBase
103+
# @abstract
104+
# Called once by a KCLProcess before any calls to process_records.
105+
#
106+
# @param initialize_input [InitializeInput] Initialize processor input
107+
# object
108+
def init_processor(initialize_input)
109+
fail NotImplementedError.new
110+
end
111+
112+
# @abstract
113+
# Called by a KCLProcess with a list of records to be processed and a
114+
# checkpointer which accepts sequence numbers from the records to
115+
# indicate where in the stream to checkpoint.
116+
#
117+
# @param record_processor_input [RecordProcessorInput] Process records
118+
# input object
119+
def process_records(process_records_input)
120+
fail NotImplementedError.new
121+
end
122+
123+
# @abstract
124+
# Called by a KCLProcess instance to indicate that this record processor
125+
# should shutdown. After this is called, there will be no more calls to
126+
# any other methods of this record processor.
127+
#
128+
# @param lease_lost_input [LeaseLostInput] Lease lost input object
129+
#
130+
# - Clients should not checkpoint because there is possibly another
131+
# record processor which has acquired the lease for this shard.
132+
def lease_lost(lease_lost_input)
133+
fail NotImplementedError.new
134+
end
135+
136+
# @abstract
137+
# Called by a KCLProcess instance to indicate that this record processor
138+
# should shutdown. After this is called, there will be no more calls to
139+
# any other methods of this record processor.
140+
#
141+
# @param shard_ended_input [ShardEndedInput] Shard ended input object
142+
#
143+
# - Clients need to checkpoint at this time.
144+
def shard_ended(shard_ended_input)
145+
fail NotImplementedError.new
146+
end
147+
148+
# @abstract
149+
# Called by a KCLProcess instance to indicate that this record processor
150+
# is requesting a shutdown. This method should be overriden if required.
151+
#
152+
# @param shutdown_requested_input [ShutdownRequestedInput]
153+
def shutdown_requested(shutdown_requested_input)
154+
end
155+
156+
def version
157+
2
158+
end
159+
end
160+
161+
class V2ToV1Adapter < Aws::KCLrb::V2::RecordProcessorBase
162+
def initialize(processor)
163+
@processor = processor
164+
end
165+
166+
def init_processor(initialize_input)
167+
@processor.init_processor(initialize_input.shard_id)
168+
end
169+
170+
def process_records(process_records_input)
171+
@processor.process_records(process_records_input.records,
172+
process_records_input.checkpointer)
173+
end
174+
175+
def lease_lost(lease_lost_input)
176+
@processor.shutdown(nil, 'ZOMBIE')
177+
end
178+
179+
def shard_ended(shard_ended_input)
180+
@processor.shutdown(shard_ended_input.checkpointer, 'TERMINATE')
181+
end
182+
183+
def shutdown_requested(shutdown_requested_input)
184+
@processor.shutdown_requested(shutdown_requested_input.checkpointer)
185+
end
186+
end
84187
end
85188
end
86189
end

0 commit comments

Comments
 (0)