Add the following library dependency.
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.0"
Following is the scala code to get (consume) messages from Kafka.
import java.utilimport org.apache.kafka.clients.consumer.KafkaConsumerimport java.util.Propertiesimport scala.collection.JavaConverters._class GetMessagesFromKafka {def main(args: Array[String]): Unit = {getMessagesFromKafka("Topic1")}def getMessagesFromKafka(topic: String) = {val props = new Properties()props.put("bootstrap.servers", "localhost:9094")props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")props.put("auto.offset.reset", "latest")props.put("group.id", "consumer-group")val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)consumer.subscribe(util.Arrays.asList(topic))while (true) {val record = consumer.poll(1000).asScalafor (data <- record.iterator)println(data.value())}}}
That's it!
Cheers!
No comments:
Post a Comment