Skip to content

Commit 6acb348

Browse files
committed
create readme and simple basic run through
1 parent 09dce18 commit 6acb348

File tree

6 files changed

+130
-26
lines changed

6 files changed

+130
-26
lines changed

README.md

+84-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,85 @@
11
# redisSMSLuaA-A
2-
redis SMS with LUA active/active
2+
Provides a quick-start example of using Redis Streams for parsing incoming messages in an active/active redis enterprise.
3+
## Overview
4+
A java producer will produce messages on a redis stream across two redis clusters in an active/active database.
5+
A java consumer will consume from the stream and create redis structures from the streams messages.
6+
First, on simple consumer will be used but a lua consumer will be developed next to eliminate round trips to the client java application
7+
8+
## Redis Advantages for message partition streaming
9+
* Redis easily handles high write transaction volume
10+
* Redis enterprise scales vertically (large nodes) and horizontally (many nodes)
11+
* Redis enterprise active/active allows adding to a stream from either "region"
12+
* Redis allows putting TTL on the hash
13+
14+
## Requirements
15+
* Docker installed on your local system, see [Docker Installation Instructions](https://docs.docker.com/engine/installation/).
16+
17+
* When using Docker for Mac or Docker for Windows, the default resources allocated to the linux VM running docker are 2GB RAM and 2 CPU's. Make sure to adjust these resources to meet the resource requirements for the containers you will be running. More information can be found here on adjusting the resources allocated to docker.
18+
19+
[Docker for mac](https://docs.docker.com/docker-for-mac/#advanced)
20+
21+
[Docker for windows](https://docs.docker.com/docker-for-windows/#advanced)
22+
23+
## Links that help!
24+
* [Active/Active docker under crdt-application](https://github.com/RedisLabs/redis-for-dummies)
25+
* [Getting Started with Redis Streams and java github](https://github.com/tgrall/redis-streams-101-java)
26+
* [Redis Active/Active CLI reference](https://docs.redis.com/latest/rs/references/crdb-cli-reference/)
27+
## Create Redis Enterprise Active/Active database
28+
* Prepare Docker environment-see the Prerequisites section above...
29+
* Clone the github
30+
```bash
31+
git clone https://github.com/jphaugla/redisSMSLuaA-A
32+
```
33+
* Refer to the notes for redis Docker images used but don't get too bogged down as docker compose handles everything except for a few admin steps on tomcat.
34+
* [https://hub.docker.com/r/redislabs/redis/](https://hub.docker.com/r/redislabs/redis/)
35+
* Open terminal and change to the github home to create the clusters and the clustered database
36+
```bash
37+
./create_redis_enterprise_clusters.sh
38+
./setup_redis_enterprise_clusters.sh
39+
./create_crdb.sh
40+
```
41+
To access the databases using redis-cli, leverage the two different port numbers
42+
```bash
43+
# this is first redis cluster
44+
redis-cli -p 12000
45+
# this is second redis cluster
46+
redis-cli -p 12002
47+
```
48+
## The java code
49+
## To execute the java code
50+
(Alternatively, this can be run through intellij)
51+
52+
* Compile the code
53+
```bash
54+
mvn clean verify
55+
```
56+
* run the consumer
57+
```bash
58+
./runconsumer.sh
59+
```
60+
* run the producer
61+
```bash
62+
./runproducer.sh
63+
```
64+
## Verify the results
65+
```bash
66+
redis-cli -p 12000
67+
> keys *
68+
```
69+
### Should see something similar to this![redis keys](images/keys.png)
70+
* There is a Redis set for each message containing the hash key for each message part belonging to this message
71+
```bash
72+
type weather_sensor:wind:message:MSG0
73+
```
74+
returns "set"
75+
```bash
76+
smembers weather_sensor:wind:message:MSG0
77+
```
78+
returns all the message parts for this message
79+
```bash
80+
hgetall weather_sensor:wind:hash:1640798739595-2
81+
```
82+
return all the hash key value pairs from the message part
83+
## Additional tooling
84+
scripts are provided to support typical [crdb-cli](https://docs.redis.com/latest/rs/references/crdb-cli-reference/) commands
85+
The crdpurge.sh is very handy to efficiently do a "purgeall" of the database contents

crdpurge.sh

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
guid=$1
2+
docker exec -it rp1 bash -c "/opt/redislabs/bin/crdb-cli crdb flush --crdb-guid $guid"

images/keys.png

125 KB
Loading

pom.xml

+4-4
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<dependency>
1313
<groupId>io.lettuce</groupId>
1414
<artifactId>lettuce-core</artifactId>
15-
<version>5.1.8.RELEASE</version>
15+
<version>6.1.5.RELEASE</version>
1616
</dependency>
1717
<dependency>
1818
<groupId>org.junit.jupiter</groupId>
@@ -26,10 +26,10 @@
2626
<plugins>
2727
<plugin>
2828
<artifactId>maven-compiler-plugin</artifactId>
29-
<version>3.2</version>
29+
<version>3.8.1</version>
3030
<configuration>
31-
<source>1.8</source>
32-
<target>1.8</target>
31+
<source>8</source>
32+
<target>8</target>
3333
</configuration>
3434
</plugin>
3535
</plugins>

src/main/java/com/jphaugla/redis/streams/RedisStreams101Consumer.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@
44
import io.lettuce.core.api.StatefulRedisConnection;
55
import io.lettuce.core.api.sync.RedisCommands;
66

7-
8-
import java.util.Arrays;
97
import java.util.List;
8+
import java.util.Map;
109

1110
public class RedisStreams101Consumer {
1211

1312
public final static String STREAMS_KEY = "weather_sensor:wind";
13+
public final static String HASH_KEY = "weather_sensor:wind:hash:";
14+
public final static String MESSAGE_KEY = "weather_sensor:wind:message:";
1415

1516

1617
public static void main(String[] args) {
@@ -23,7 +24,7 @@ public static void main(String[] args) {
2324
syncCommands.xgroupCreate( XReadArgs.StreamOffset.from(STREAMS_KEY, "0-0"), "application_1" );
2425
}
2526
catch (RedisBusyException redisBusyException) {
26-
System.out.println( String.format("\t Group '%s' already exists", "application_1"));
27+
System.out.printf("\t Group '%s' already exists%n", "application_1");
2728
}
2829

2930

@@ -41,7 +42,19 @@ public static void main(String[] args) {
4142
System.out.println(message);
4243
// Confirm that the message has been processed using XACK
4344
syncCommands.xack(STREAMS_KEY, "application_1", message.getId());
44-
// syncCommands.hset(message.getId(), message.getBody());
45+
Map<String, String> body = message.getBody();
46+
String hashKey = HASH_KEY + message.getId();
47+
String messageId = body.get("message_id");
48+
String messageKey = MESSAGE_KEY + messageId;
49+
String numberParts = body.get("total_parts");
50+
String thisPart = body.get("this_part");
51+
// write a hash for each message body
52+
syncCommands.hmset(hashKey, body);
53+
// keep track of all the hash keys for this message body
54+
syncCommands.sadd(messageKey, hashKey);
55+
if (numberParts == thisPart) {
56+
System.out.println("All Message parts received for " + messageKey);
57+
}
4558
}
4659
}
4760

src/main/java/com/jphaugla/redis/streams/RedisStreams101Producer.java

+23-17
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,24 @@
44
import io.lettuce.core.api.StatefulRedisConnection;
55
import io.lettuce.core.api.sync.RedisCommands;
66

7-
import java.util.Collections;
87
import java.util.HashMap;
98
import java.util.Map;
109
import java.util.Random;
1110

1211
public class RedisStreams101Producer {
1312

1413
public final static String STREAMS_KEY = "weather_sensor:wind";
14+
public final static String MESSAGE_KEY = "MSG";
1515

1616
public static void main(String[] args) {
1717

1818
int nbOfMessageToSend = 1;
1919

2020
if (args != null && args.length != 0 ) {
21-
nbOfMessageToSend = Integer.valueOf(args[0]);
21+
nbOfMessageToSend = Integer.parseInt(args[0]);
2222
}
2323

24-
System.out.println( String.format("\n Sending %s message(s)", nbOfMessageToSend));
24+
System.out.printf("\n Sending %s message(s)%n", nbOfMessageToSend);
2525

2626

2727
RedisClient redisClient = RedisClient.create("redis://localhost:12000"); // change to reflect your environment
@@ -34,25 +34,31 @@ public static void main(String[] args) {
3434
for (int i = 0 ; i < nbOfMessageToSend ; i++) {
3535

3636
Map<String, String> messageBody = new HashMap<>();
37+
messageBody.put("message_id", MESSAGE_KEY + i);
38+
int numberParts = new Random().nextInt(5) + 2;
39+
3740
messageBody.put("speed", "15");
3841
messageBody.put("direction", "270");
3942
messageBody.put("sensor_ts", String.valueOf(System.currentTimeMillis()));
40-
messageBody.put("loop_info", String.valueOf( i ));
41-
Boolean whichone = new Random().nextBoolean();
42-
String messageId = "";
43-
if (whichone) {
44-
messageBody.put("source_port", "12000");
45-
messageId = syncCommands.xadd(
46-
STREAMS_KEY,
47-
messageBody);
48-
} else {
49-
messageBody.put("source_port", "12002");
50-
messageId = syncCommands2.xadd(
51-
STREAMS_KEY,
52-
messageBody);
43+
for (int j = 1 ; j <= numberParts ; j++) {
44+
messageBody.put("total_parts", String.valueOf(numberParts));
45+
messageBody.put("this_part", String.valueOf(j));
46+
boolean whichRedis = new Random().nextBoolean();
47+
String messageId;
48+
if (whichRedis) {
49+
messageBody.put("source_port", "12000");
50+
messageId = syncCommands.xadd(
51+
STREAMS_KEY,
52+
messageBody);
53+
} else {
54+
messageBody.put("source_port", "12002");
55+
messageId = syncCommands2.xadd(
56+
STREAMS_KEY,
57+
messageBody);
58+
}
59+
System.out.printf("\tMessage %s : %s posted%n", messageId, messageBody);
5360
}
5461

55-
System.out.println(String.format("\tMessage %s : %s posted", messageId, messageBody));
5662
}
5763

5864
System.out.println("\n");

0 commit comments

Comments
 (0)