1
1
# Basic Producer Example
2
2
3
3
``` javascript
4
- const { Kafka } = require (' ../.. ' ).KafkaJS
4
+ const { Kafka } = require (' @confluentinc/kafka-javascript ' ).KafkaJS ;
5
5
6
6
async function producerStart () {
7
7
const producer = new Kafka ().producer ({
@@ -27,38 +27,40 @@ producerStart();
27
27
# Basic Consumer Example
28
28
29
29
``` javascript
30
- const { Kafka } = require (' ../.. ' ).KafkaJS
30
+ const { Kafka } = require (' @confluentinc/kafka-javascript ' ).KafkaJS ;
31
31
32
32
async function consumerStart () {
33
- const consumer = new Kafka ().consumer ({
33
+ let consumer;
34
+ let stopped = false ;
35
+
36
+ // Initialization
37
+ consumer = new Kafka ().consumer ({
34
38
' bootstrap.servers' : ' <fill>' ,
35
39
' group.id' : ' test' ,
36
40
' auto.offset.reset' : ' earliest' ,
37
41
});
38
42
39
43
await consumer .connect ();
40
-
41
- await consumer .subscribe ({ topics: [ " topic" ] });
42
-
43
- let stopped = false ;
44
- while (! stopped) {
45
- const message = await consumer .consume (1000 );
46
- if (! message) {
47
- continue ;
44
+ await consumer .subscribe ({ topics: [" topic" ] });
45
+
46
+ consumer .run ({
47
+ eachMessage: async ({ topic, partition, message }) => {
48
+ console .log ({
49
+ topic,
50
+ partition,
51
+ offset: message .offset ,
52
+ key: message .key ? .toString (),
53
+ value: message .value .toString (),
54
+ });
48
55
}
49
- console .log ({
50
- topic: message .topic ,
51
- partition: message .partition ,
52
- offset: message .offset ,
53
- key: message .key ? .toString (),
54
- value: message .value .toString (),
55
- });
56
+ });
56
57
57
- // Update stopped whenever we're done consuming.
58
- // stopped = true;
58
+ // Update stopped whenever we're done consuming.
59
+ // The update can be in another async function or scheduled with setTimeout etc.
60
+ while (! stopped) {
61
+ await new Promise (resolve => setTimeout (resolve, 1000 ));
59
62
}
60
63
61
- // Disconnect and clean up.
62
64
await consumer .disconnect ();
63
65
}
64
66
0 commit comments