I'm doing a test with consuming sparkstreaming of a kafka topic. When you start the job it displays the following message:
18/06/06 20:39:25 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:211)
at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:61)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply$mcV$sp(ReceiverSupervisor.scala:198)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
If I open the prompt and access via nc -lp 9999, it works. More precise to consume the topic. Can anyone help me?
I'm using Spark 2.3.0 and Kafka 0-10_2
Follow my code:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerConfig
import scala.collection.mutable
object WorldCount_Kafka {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("WorldCount-kafka")
//Read messages in batch of 20 seconds
val ssc = new StreamingContext(conf, Seconds(20))
val sc = ssc.sparkContext
sc.setLogLevel("ERROR")
val kafkaParam = new mutable.HashMap[String, String]()
kafkaParam.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092")
kafkaParam.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
kafkaParam.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
kafkaParam.put(ConsumerConfig.GROUP_ID_CONFIG, "test")
kafkaParam.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
kafkaParam.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
//Configure Spark to listen messages in topic test
val topicList = List("first_topic")
// Read value of each message from Kafka and return it
val messageStream = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](topicList, kafkaParam)
)
val lines = messageStream.map(consumerRecord => consumerRecord.value().asInstanceOf[String])
// Break every message into words and return list of words
val words = lines.flatMap(_.split(" "))
// Take every word and return Tuple with (word,1)
val wordMap = words.map(word => (word, 1))
// Count occurance of each word
val wordCount = wordMap.reduceByKey((first, second) => first + second)
//Print the word count
wordCount.print()
ssc.start()
ssc.awaitTermination()
}
}