Skip to content

Commit 4a91a6e

Browse files
committed
add LUA consumer option
1 parent 87c16fe commit 4a91a6e

File tree

3 files changed

+105
-0
lines changed

3 files changed

+105
-0
lines changed

pom.xml

+11
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,17 @@
2020
<version>RELEASE</version>
2121
<scope>test</scope>
2222
</dependency>
23+
<dependency>
24+
<groupId>com.fasterxml.jackson.core</groupId>
25+
<artifactId>jackson-core</artifactId>
26+
<version>2.13.1</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>com.fasterxml.jackson.core</groupId>
30+
<artifactId>jackson-databind</artifactId>
31+
<version>2.13.1</version>
32+
<scope>compile</scope>
33+
</dependency>
2334
</dependencies>
2435

2536
<build>

src/main/java/com/jphaugla/redis/streams/RedisStreams101Consumer.java

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public static void main(String[] args) {
1818
if (args != null && args.length != 0 ) {
1919
portNumber = args[0];
2020
}
21+
System.out.println("port number is " + portNumber);
2122
RedisClient redisClient = RedisClient.create("redis://localhost:" + portNumber); // change to reflect your environment
2223
StatefulRedisConnection<String, String> connection = redisClient.connect();
2324
RedisCommands<String, String> syncCommands = connection.sync();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package com.jphaugla.redis.streams;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import io.lettuce.core.*;
6+
import io.lettuce.core.api.StatefulRedisConnection;
7+
import io.lettuce.core.api.sync.RedisCommands;
8+
9+
import java.io.BufferedReader;
10+
import java.io.InputStream;
11+
import java.io.InputStreamReader;
12+
import java.util.Arrays;
13+
import java.util.Collections;
14+
import java.util.List;
15+
import java.util.Map;
16+
import java.util.stream.Collectors;
17+
18+
public class RedisStreams101ConsumerLUA {
19+
20+
public final static String STREAMS_KEY = "weather_sensor:wind";
21+
public final static String HASH_KEY = "weather_sensor:wind:hash:";
22+
public final static String MESSAGE_KEY = "weather_sensor:wind:message:";
23+
24+
public static void main(String[] args) throws JsonProcessingException {
25+
String portNumber = "12000";
26+
if (args != null && args.length != 0 ) {
27+
portNumber = args[0];
28+
}
29+
System.out.println("port number is " + portNumber);
30+
RedisClient redisClient = RedisClient.create("redis://localhost:" + portNumber); // change to reflect your environment
31+
StatefulRedisConnection<String, String> connection = redisClient.connect();
32+
RedisCommands<String, String> syncCommands = connection.sync();
33+
InputStream luaInputStream =
34+
RedisStreams101ConsumerLUA.class
35+
.getClassLoader()
36+
.getResourceAsStream("hmset.lua");
37+
String luaScript =
38+
new BufferedReader(new InputStreamReader(luaInputStream))
39+
.lines()
40+
.collect(Collectors.joining("\n"));
41+
42+
String luaSHA = syncCommands.scriptLoad(luaScript);
43+
44+
try {
45+
syncCommands.xgroupCreate( XReadArgs.StreamOffset.from(STREAMS_KEY, "0-0"), "application_1", XGroupCreateArgs.Builder.mkstream() );
46+
}
47+
catch (RedisBusyException redisBusyException) {
48+
System.out.printf("\t Group '%s' already exists%n", "application_1");
49+
}
50+
51+
System.out.println("Waiting for new messages");
52+
53+
while(true) {
54+
55+
List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(
56+
Consumer.from("application_1", "consumer_1"),
57+
XReadArgs.StreamOffset.lastConsumed(STREAMS_KEY)
58+
);
59+
60+
if (!messages.isEmpty()) {
61+
for (StreamMessage<String, String> message : messages) {
62+
System.out.println(message);
63+
// Confirm that the message has been processed using XACK
64+
syncCommands.xack(STREAMS_KEY, "application_1", message.getId());
65+
Map<String, String> body = message.getBody();
66+
String hashKey = HASH_KEY + message.getId();
67+
String messageId = body.get("message_id");
68+
String messageKey = MESSAGE_KEY + messageId;
69+
String numberParts = body.get("total_parts");
70+
String thisPart = body.get("this_part");
71+
// List<String> KEYS = Collections.singletonList(hashKey);
72+
// List<Map<String, String>> ARGS = Collections.singletonList(body);
73+
// syncCommands.evalsha(luaSHA, ScriptOutputType.STATUS, KEYS, body.toString());
74+
String json = new ObjectMapper().writeValueAsString(body);
75+
System.out.println(body.toString());
76+
syncCommands.evalsha(luaSHA, ScriptOutputType.STATUS, Arrays.asList(hashKey).toArray(new String[0]), json);
77+
// write a hash for each message body
78+
// syncCommands.hmset(hashKey, body);
79+
// keep track of all the hash keys for this message body
80+
// syncCommands.sadd(messageKey, hashKey);
81+
if (Integer.parseInt(numberParts) == Integer.parseInt(thisPart)) {
82+
System.out.println("All Message parts received for " + messageKey);
83+
}
84+
}
85+
}
86+
87+
88+
}
89+
90+
}
91+
92+
93+
}

0 commit comments

Comments
 (0)