Monday, August 3, 2020

Scala Kafka Consumer

In this post we will learn how to get messages from Kafka using Scala.

Add the following library dependency.

libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.0"

Following is the scala code to get (consume) messages from Kafka.

import java.util
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.Properties
import scala.collection.JavaConverters._
class GetMessagesFromKafka {
  def main(args: Array[String]): Unit = {
    getMessagesFromKafka("Topic1")
  }
  def getMessagesFromKafka(topic: String) = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9094")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "latest")
    props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Arrays.asList(topic))
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator)
        println(data.value())
    }
  }
}

That's it!
Cheers!

No comments:

Post a Comment