Skip to content

Commit 757a29e

Browse files
authored
Merge pull request #560 from awslabs/revert_CAA_commit
Reverting CAA related changes
2 parents 4355c41 + 1caa983 commit 757a29e

File tree

14 files changed

+114
-777
lines changed

14 files changed

+114
-777
lines changed

aws/kinesis/core/kinesis_producer.cc

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -222,12 +222,11 @@ void KinesisProducer::create_sts_client(const std::string& ca_path) {
222222
cfg);
223223
}
224224

225-
Pipeline* KinesisProducer::create_pipeline(const std::string& stream, const boost::optional<std::string>& stream_arn) {
225+
Pipeline* KinesisProducer::create_pipeline(const std::string& stream) {
226226
LOG(info) << "Created pipeline for stream \"" << stream << "\"";
227227
return new Pipeline(
228228
region_,
229229
stream,
230-
stream_arn,
231230
config_,
232231
executor_,
233232
kinesis_client_,
@@ -292,11 +291,7 @@ void KinesisProducer::on_put_record(aws::kinesis::protobuf::Message& m) {
292291
std::chrono::milliseconds(config_->record_max_buffered_time()));
293292
ur->set_expiration_from_now(
294293
std::chrono::milliseconds(config_->record_ttl()));
295-
if (ur->stream_arn()) {
296-
pipelines_[ur->stream_arn().get()].put(ur);
297-
} else {
298-
pipelines_[ur->stream()].put(ur);
299-
}
294+
pipelines_[ur->stream()].put(ur);
300295
}
301296

302297
void KinesisProducer::on_flush(const aws::kinesis::protobuf::Flush& flush_msg) {

aws/kinesis/core/kinesis_producer.h

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,8 @@ class KinesisProducer : boost::noncopyable {
4646
cw_creds_provider_(std::move(cw_creds_provider)),
4747
executor_(std::move(executor)),
4848
ipc_manager_(std::move(ipc_manager)),
49-
pipelines_([this](auto& stream_or_arn) {
50-
std::regex kinesisStreamArnRegex("^arn:aws.*:kinesis:.*:\\d{12}:stream/\\S+$");
51-
std::smatch match;
52-
if (std::regex_search(stream_or_arn, match, kinesisStreamArnRegex)) {
53-
return this->create_pipeline(match[1].str(), stream_or_arn);
54-
} else {
55-
return this->create_pipeline(stream_or_arn, boost::none);
56-
}
49+
pipelines_([this](auto& stream) {
50+
return this->create_pipeline(stream);
5751
}),
5852
shutdown_(false) {
5953
create_kinesis_client(ca_path);
@@ -86,7 +80,7 @@ class KinesisProducer : boost::noncopyable {
8680

8781
void create_sts_client(const std::string& ca_path);
8882

89-
Pipeline* create_pipeline(const std::string& stream, const boost::optional<std::string>& stream_arn);
83+
Pipeline* create_pipeline(const std::string& stream);
9084

9185
void drain_messages();
9286

aws/kinesis/core/pipeline.h

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ class Pipeline : boost::noncopyable {
5050
Pipeline(
5151
std::string region,
5252
std::string stream,
53-
boost::optional<std::string> stream_arn,
5453
std::shared_ptr<Configuration> config,
5554
std::shared_ptr<aws::utils::Executor> executor,
5655
std::shared_ptr<Aws::Kinesis::KinesisClient> kinesis_client,
@@ -59,7 +58,7 @@ class Pipeline : boost::noncopyable {
5958
Retrier::UserRecordCallback finish_user_record_cb)
6059
: stream_(std::move(stream)),
6160
region_(std::move(region)),
62-
stream_arn_(std::move(init_stream_arn(sts_client, region_, stream_, stream_arn_))),
61+
stream_arn_(std::move(init_stream_arn(sts_client, region_, stream_))),
6362
config_(std::move(config)),
6463
stats_logger_(stream_, config_->record_max_buffered_time()),
6564
executor_(std::move(executor)),
@@ -206,11 +205,7 @@ class Pipeline : boost::noncopyable {
206205
// Retrieve the account ID and partition from the STS service.
207206
static std::string init_stream_arn(const std::shared_ptr<Aws::STS::STSClient>& sts_client,
208207
const std::string &region,
209-
const std::string &stream_name,
210-
const boost::optional<std::string> &stream_arn_) {
211-
if (!stream_arn_) {
212-
return stream_arn_.get();
213-
}
208+
const std::string &stream_name) {
214209
Aws::STS::Model::GetCallerIdentityRequest request;
215210
auto outcome = sts_client->GetCallerIdentity(request);
216211
if (outcome.IsSuccess()) {

aws/kinesis/core/user_record.cc

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,6 @@ UserRecord::UserRecord(aws::kinesis::protobuf::Message& m)
3131
source_id_ = m.id();
3232
auto put_record = m.put_record();
3333
stream_ = std::move(put_record.stream_name());
34-
has_stream_arn_ = put_record.has_stream_arn();
35-
if (has_stream_arn_) {
36-
stream_arn_ = std::move(put_record.stream_arn());
37-
}
3834
partition_key_ = std::move(put_record.partition_key());
3935
data_ = std::move(put_record.data());
4036
has_explicit_hash_key_ = put_record.has_explicit_hash_key();

aws/kinesis/core/user_record.h

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,6 @@ class UserRecord : public aws::utils::TimeSensitive {
8484
return ss.str();
8585
}
8686

87-
boost::optional<std::string> stream_arn() const noexcept {
88-
if (has_stream_arn_) {
89-
return stream_arn_;
90-
} else {
91-
return boost::none;
92-
}
93-
}
94-
9587
boost::optional<std::string> explicit_hash_key() const noexcept {
9688
if (has_explicit_hash_key_) {
9789
return hash_key_decimal_str();
@@ -107,14 +99,12 @@ class UserRecord : public aws::utils::TimeSensitive {
10799
private:
108100
uint64_t source_id_;
109101
std::string stream_;
110-
std::string stream_arn_;
111102
std::string partition_key_;
112103
uint128_t hash_key_;
113104
std::string data_;
114105
std::vector<Attempt> attempts_;
115106
boost::optional<uint64_t> predicted_shard_;
116107
bool has_explicit_hash_key_;
117-
bool has_stream_arn_;
118108
bool finished_;
119109
};
120110

aws/kinesis/protobuf/messages.pb.cc

Lines changed: 37 additions & 85 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)