Skip to content
This repository was archived by the owner on Jul 8, 2024. It is now read-only.

Commit 4eee019

Browse files
author
karthitect
committed
Added datagen
1 parent 096bdd8 commit 4eee019

File tree

26 files changed

+982
-481
lines changed

26 files changed

+982
-481
lines changed

apps/java-datastream/kds-to-s3-datastream-java/cdk-infra/lib/cdk-infra-kda-kds-to-s3-stack.ts

+19
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,16 @@ export class CdkInfraKdaKdsToS3Stack extends cdk.Stack {
5858
],
5959
});
6060

61+
// our KDA app needs to be able to write metrics
62+
const accessCWMetricsPolicy = new iam.PolicyDocument({
63+
statements: [
64+
new iam.PolicyStatement({
65+
resources: ['*'],
66+
actions: ['cloudwatch:PutMetricData'],
67+
}),
68+
],
69+
});
70+
6171
// our KDA app needs access to read application jar from S3
6272
// as well as to write to S3 (from FileSink)
6373
const accessS3Policy = new iam.PolicyDocument({
@@ -129,6 +139,7 @@ export class CdkInfraKdaKdsToS3Stack extends cdk.Stack {
129139
inlinePolicies: {
130140
AccessKDSPolicy: accessKdsPolicy,
131141
AccessCWLogsPolicy: accessCWLogsPolicy,
142+
AccessCWMetricsPolicy: accessCWMetricsPolicy,
132143
AccessS3Policy: accessS3Policy,
133144
KDAAccessPolicy: kdaAccessPolicy,
134145
GlueAccessPolicy: glueAccessPolicy,
@@ -198,4 +209,12 @@ export class CdkInfraKdaKdsToS3Stack extends cdk.Stack {
198209
}
199210

200211
} // constructor
212+
213+
getParams(): void {
214+
const appTemplateBucket = new cdk.CfnParameter(this, "appTemplateBucket", {
215+
type: "String",
216+
description: "The (pre-existing) bucket that will hold the CFN template script and assets."
217+
});
218+
219+
}
201220
} // class

apps/java-datastream/msk-serverless-to-msk-serverless-datastream-java/cdk-infra/lib/cdk-infra-kda-kafka-to-kafka.ts

+12-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ export class CdkInfraKdaKafkaToKafkaStack extends cdk.Stack {
7474

7575
// This is the code for the lambda function that auto-creates the source topic
7676
// We need to pass in the path from the calling location
77-
const lambdaAssetLocation = '../../../../cdk-infra/shared/lambda/kafka-topic-gen-lambda-1.0.jar';
77+
const lambdaAssetLocation = '../../../../cdk-infra/shared/lambda/aws-lambda-helpers-1.0.jar';
7878

7979
const topicCreationLambda = new TopicCreationLambdaConstruct(this, 'TopicCreationLambda', {
8080
account: this.account,
@@ -164,6 +164,16 @@ export class CdkInfraKdaKafkaToKafkaStack extends cdk.Stack {
164164
],
165165
});
166166

167+
// our KDA app needs to be able to write metrics
168+
const accessCWMetricsPolicy = new iam.PolicyDocument({
169+
statements: [
170+
new iam.PolicyStatement({
171+
resources: ['*'],
172+
actions: ['cloudwatch:PutMetricData'],
173+
}),
174+
],
175+
});
176+
167177
// our KDA app needs access to read application jar from S3
168178
// as well as to write to S3 (from FileSink)
169179
const accessS3Policy = new iam.PolicyDocument({
@@ -237,6 +247,7 @@ export class CdkInfraKdaKafkaToKafkaStack extends cdk.Stack {
237247
AccessMSKPolicy: accessMSKPolicy,
238248
AccessMSKTopicsPolicy: accessMSKTopicsPolicy,
239249
AccessCWLogsPolicy: accessCWLogsPolicy,
250+
AccessCWMetricsPolicy: accessCWMetricsPolicy,
240251
AccessS3Policy: accessS3Policy,
241252
AccessVPCPolicy: accessVPCPolicy,
242253
KDAAccessPolicy: kdaAccessPolicy,

apps/java-datastream/msk-serverless-to-s3-datastream-java/cdk-infra/bin/main.d.ts

-2
This file was deleted.

apps/java-datastream/msk-serverless-to-s3-datastream-java/cdk-infra/bin/main.js

-21
This file was deleted.

apps/java-datastream/msk-serverless-to-s3-datastream-java/cdk-infra/cfn.yaml

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
1-
import * as cdk from 'aws-cdk-lib';
2-
import { StackProps } from 'aws-cdk-lib';
3-
import { Construct } from 'constructs';
4-
export interface GlobalProps extends StackProps {
5-
kdaAppName: string;
6-
appBucket: string;
7-
appFileKeyOnS3: string;
8-
runtimeEnvironment: string;
9-
appSinkBucket: string;
10-
deployDataGen: boolean;
11-
}
12-
export declare class CdkInfraKdaKafkaToS3Stack extends cdk.Stack {
13-
constructor(scope: Construct, id: string, props?: GlobalProps);
14-
}
1+
import * as cdk from 'aws-cdk-lib';
2+
import { StackProps } from 'aws-cdk-lib';
3+
import { Construct } from 'constructs';
4+
export interface GlobalProps extends StackProps {
5+
kdaAppName: string;
6+
appBucket: string;
7+
appFileKeyOnS3: string;
8+
runtimeEnvironment: string;
9+
appSinkBucket: string;
10+
deployDataGen: boolean;
11+
glueDatabaseName: string;
12+
flinkVersion: string;
13+
zepFlinkVersion: string;
14+
kdaLogGroup: string;
15+
kdaLogStream: string;
16+
mskClusterName: string;
17+
sourceTopicName: string;
18+
}
19+
export declare class CdkInfraKdaKafkaToS3Stack extends cdk.Stack {
20+
constructor(scope: Construct, id: string, props?: GlobalProps);
21+
}

apps/java-datastream/msk-serverless-to-s3-datastream-java/cdk-infra/lib/cdk-infra-kda-kafka-to-s3-stack.js

+265-242
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/java-datastream/msk-serverless-to-s3-datastream-java/cdk-infra/lib/cdk-infra-kda-kafka-to-s3-stack.ts

+78-6
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ import * as cdk from 'aws-cdk-lib';
22
import { StackProps } from 'aws-cdk-lib';
33
import { Construct } from 'constructs';
44
import * as ec2 from 'aws-cdk-lib/aws-ec2';
5+
import { aws_s3 as s3 } from 'aws-cdk-lib';
56
import * as iam from 'aws-cdk-lib/aws-iam';
7+
import * as glue from 'aws-cdk-lib/aws-glue';
68
import { aws_logs as logs } from 'aws-cdk-lib';
79
import { KDAConstruct } from '../../../../../cdk-infra/shared/lib/kda-construct';
810
import { KDAZepConstruct } from '../../../../../cdk-infra/shared/lib/kda-zep-construct';
@@ -29,6 +31,29 @@ export class CdkInfraKdaKafkaToS3Stack extends cdk.Stack {
2931
constructor(scope: Construct, id: string, props?: GlobalProps) {
3032
super(scope, id, props);
3133

34+
// we'll be generating a CFN script so we need CFN params
35+
let cfnParams = this.getParams(props);
36+
37+
// app package s3 bucket
38+
const s3_bucket = new s3.Bucket(this, 'AppPackageS3Bucket', {
39+
bucketName: cfnParams.get("appBucket")!.valueAsString,
40+
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
41+
encryption: s3.BucketEncryption.S3_MANAGED,
42+
enforceSSL: true,
43+
versioned: false,
44+
removalPolicy: cdk.RemovalPolicy.DESTROY,
45+
});
46+
47+
// sink s3 bucket
48+
const sink_s3_bucket = new s3.Bucket(this, 'SinkS3Bucket', {
49+
bucketName: cfnParams.get("appSinkBucket")!.valueAsString,
50+
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
51+
encryption: s3.BucketEncryption.S3_MANAGED,
52+
enforceSSL: true,
53+
versioned: false,
54+
removalPolicy: cdk.RemovalPolicy.DESTROY,
55+
});
56+
3257
// VPC
3358
const vpc = new ec2.Vpc(this, 'VPC', {
3459
enableDnsHostnames: true,
@@ -73,7 +98,7 @@ export class CdkInfraKdaKafkaToS3Stack extends cdk.Stack {
7398

7499
// This is the code for the lambda function that auto-creates the source topic
75100
// We need to pass in the path from the calling location
76-
const lambdaAssetLocation = '../../../../cdk-infra/shared/lambda/kafka-topic-gen-lambda-1.0.jar';
101+
const lambdaAssetLocation = '../../../../cdk-infra/shared/lambda/aws-lambda-helpers-1.0.jar';
77102

78103
const topicCreationLambda = new TopicCreationLambdaConstruct(this, 'TopicCreationLambda', {
79104
account: this.account,
@@ -147,13 +172,23 @@ export class CdkInfraKdaKafkaToS3Stack extends cdk.Stack {
147172
],
148173
});
149174

175+
// our KDA app needs to be able to write metrics
176+
const accessCWMetricsPolicy = new iam.PolicyDocument({
177+
statements: [
178+
new iam.PolicyStatement({
179+
resources: ['*'],
180+
actions: ['cloudwatch:PutMetricData'],
181+
}),
182+
],
183+
});
184+
150185
// our KDA app needs access to read application jar from S3
151186
// as well as to write to S3 (from FileSink)
152187
const accessS3Policy = new iam.PolicyDocument({
153188
statements: [
154189
new iam.PolicyStatement({
155-
resources: [`arn:aws:s3:::${props!.appBucket}/*`,
156-
`arn:aws:s3:::${props!.appSinkBucket}/*`],
190+
resources: [`arn:aws:s3:::${s3_bucket.bucketName}/*`,
191+
`arn:aws:s3:::${sink_s3_bucket.bucketName}/*`],
157192
actions: ['s3:ListBucket',
158193
's3:PutObject',
159194
's3:GetObject',
@@ -221,6 +256,7 @@ export class CdkInfraKdaKafkaToS3Stack extends cdk.Stack {
221256
AccessMSKPolicy: accessMSKPolicy,
222257
AccessMSKTopicsPolicy: accessMSKTopicsPolicy,
223258
AccessCWLogsPolicy: accessCWLogsPolicy,
259+
AccessCWMetricsPolicy: accessCWMetricsPolicy,
224260
AccessS3Policy: accessS3Policy,
225261
AccessVPCPolicy: accessVPCPolicy,
226262
KDAAccessPolicy: kdaAccessPolicy,
@@ -230,14 +266,14 @@ export class CdkInfraKdaKafkaToS3Stack extends cdk.Stack {
230266
});
231267

232268
const flinkApplicationProps = {
233-
"S3DestinationBucket": `s3://${props!.appSinkBucket}/`,
269+
"S3DestinationBucket": `s3://${sink_s3_bucket.bucketName}/`,
234270
"ServerlessMSKBootstrapServers": sourceServerlessMskCluster.bootstrapServersOutput.value,
235271
"KafkaSourceTopic": props!.sourceTopicName,
236272
"KafkaConsumerGroupId": "KDAFlinkConsumerGroup",
237273
"PartitionFormat": "yyyy-MM-dd-HH",
238274
};
239275

240-
// instantiate kda construct
276+
// // instantiate kda construct
241277
const kdaConstruct = new KDAConstruct(this, 'KDAConstruct', {
242278
account: this.account,
243279
region: this.region,
@@ -246,7 +282,7 @@ export class CdkInfraKdaKafkaToS3Stack extends cdk.Stack {
246282
logGroup: logGroup,
247283
logStream: logStream,
248284
kdaAppName: props!.kdaAppName,
249-
appBucket: props!.appBucket,
285+
appBucket: s3_bucket.bucketName,
250286
appFileKeyOnS3: props!.appFileKeyOnS3,
251287
runtimeEnvironment: props!.runtimeEnvironment,
252288
serviceExecutionRole: kdaAppRole.roleArn,
@@ -260,6 +296,14 @@ export class CdkInfraKdaKafkaToS3Stack extends cdk.Stack {
260296
kdaConstruct.node.addDependency(logGroup);
261297
kdaConstruct.node.addDependency(logStream);
262298

299+
// instantiate glue db
300+
const glueDB = new glue.CfnDatabase(this, 'GlueDB', {
301+
catalogId: this.account,
302+
databaseInput: {
303+
name: cfnParams.get("glueDatabaseName")!.valueAsString
304+
}
305+
});
306+
263307
// instantiate zep kda construct
264308
if (props?.deployDataGen) {
265309
const zepDataGenAppName = props!.kdaAppName + "-zep";
@@ -293,4 +337,32 @@ export class CdkInfraKdaKafkaToS3Stack extends cdk.Stack {
293337
}
294338

295339
} // constructor
340+
341+
getParams(props?: GlobalProps): Map<string, cdk.CfnParameter> {
342+
let params = new Map<string, cdk.CfnParameter>();
343+
344+
const appBucket = new cdk.CfnParameter(this, "appBucket", {
345+
type: "String",
346+
default: props!.appBucket,
347+
description: "The S3 bucket for storing app assets"
348+
});
349+
params.set("appBucket", appBucket);
350+
351+
const appSinkBucket = new cdk.CfnParameter(this, "appSinkBucket", {
352+
type: "String",
353+
default: props!.appSinkBucket,
354+
description: "The S3 bucket to be used as the sink"
355+
});
356+
params.set("appSinkBucket", appSinkBucket);
357+
358+
const glueDatabaseName = new cdk.CfnParameter(this, "glueDatabaseName", {
359+
type: "String",
360+
default: props!.glueDatabaseName,
361+
description: "The Glue catalog that will be used w/ Kinesis Data Analytics Studio"
362+
});
363+
params.set("glueDatabaseName", glueDatabaseName);
364+
365+
return params;
366+
367+
}
296368
} // class

apps/java-datastream/msk-serverless-to-s3-datastream-java/cdk-infra/test/cdk-infra-kda-kafka-to-s3.test.js

+16-16
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/python-table-api/msk-serverless-to-s3-tableapi-python/cdk-infra/lib/cdk-infra-kda-kafka-to-s3-stack.ts

+12-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ export class CdkInfraKdaKafkaToS3Stack extends cdk.Stack {
7575

7676
// This is the code for the lambda function that auto-creates the source topic
7777
// We need to pass in the path from the calling location
78-
const lambdaAssetLocation = '../../../../cdk-infra/shared/lambda/kafka-topic-gen-lambda-1.0.jar';
78+
const lambdaAssetLocation = '../../../../cdk-infra/shared/lambda/aws-lambda-helpers-1.0.jar';
7979

8080
const topicCreationLambda = new TopicCreationLambdaConstruct(this, 'TopicCreationLambda', {
8181
account: this.account,
@@ -149,6 +149,16 @@ export class CdkInfraKdaKafkaToS3Stack extends cdk.Stack {
149149
],
150150
});
151151

152+
// our KDA app needs to be able to write metrics
153+
const accessCWMetricsPolicy = new iam.PolicyDocument({
154+
statements: [
155+
new iam.PolicyStatement({
156+
resources: ['*'],
157+
actions: ['cloudwatch:PutMetricData'],
158+
}),
159+
],
160+
});
161+
152162
// our KDA app needs access to read application jar from S3
153163
// as well as to write to S3 (from FileSink)
154164
const accessS3Policy = new iam.PolicyDocument({
@@ -223,6 +233,7 @@ export class CdkInfraKdaKafkaToS3Stack extends cdk.Stack {
223233
AccessMSKPolicy: accessMSKPolicy,
224234
AccessMSKTopicsPolicy: accessMSKTopicsPolicy,
225235
AccessCWLogsPolicy: accessCWLogsPolicy,
236+
AccessCWMetricsPolicy: accessCWMetricsPolicy,
226237
AccessS3Policy: accessS3Policy,
227238
AccessVPCPolicy: accessVPCPolicy,
228239
KDAAccessPolicy: kdaAccessPolicy,

cdk-infra/shared/lambda/kafka-topic-gen-lambda/README.md renamed to cdk-infra/shared/lambda/aws-lambda-helpers/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ This project contains Java code for creating topics against MSK, and is meant to
55
## Building this project
66

77
- Run `mvn clean package shade:shade`
8-
- Copy the uber jar named `kafka-topic-gen-lambda-1.0.jar` from the `target` folder to the level above this project
8+
- Copy the uber jar named `aws-lambda-helpers-1.0.jar` from the `target` folder to the level above this project

cdk-infra/shared/lambda/kafka-topic-gen-lambda/pom.xml renamed to cdk-infra/shared/lambda/aws-lambda-helpers/pom.xml

+7-4
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,10 @@
55
<modelVersion>4.0.0</modelVersion>
66

77
<groupId>com.amazonaws</groupId>
8-
<artifactId>kafka-topic-gen-lambda</artifactId>
8+
<artifactId>aws-lambda-helpers</artifactId>
99
<version>1.0</version>
1010

11-
<name>kafka-topic-gen-lambda</name>
12-
<!-- FIXME change it to the project's website -->
13-
<url>http://www.example.com</url>
11+
<name>aws-lambda-helpers</name>
1412

1513
<properties>
1614
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -32,6 +30,11 @@
3230
<artifactId>kafka-clients</artifactId>
3331
<version>${kafka.clients.version}</version>
3432
</dependency>
33+
<dependency>
34+
<groupId>software.amazon.lambda</groupId>
35+
<artifactId>powertools-cloudformation</artifactId>
36+
<version>1.15.0</version>
37+
</dependency>
3538
<!-- IAM -->
3639
<dependency>
3740
<groupId>software.amazon.msk</groupId>

0 commit comments

Comments
 (0)