object MyProducer extends App { //配置 val properties = new Properties() properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,;master:9092,slave1:9092,slave2:9092;) properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getName) properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getName) //创建生产者 val kafkaProducer = new KafkaProducer[String,String](properties) for (i <- 0 to 100) { kafkaProducer.send(new ProducerRecord(;reback;, 3,;;, ;; ; i),new Callback { //将值存入指定的分区里面 topic = ;reback;,partition = 3,key = ;;,value = ;; ; i override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = { if (e == null) println(;主题:; ; recordMetadata.topic() ; ;分区:; ; recordMetadata.partition() ; ;值长度:; ; recordMetadata.serializedValueSize())//serializedValueSize;返回序列化后值得长度 else e.printStackTrace() } }) } kafkaProducer.close(); }
建议收藏:「保姆级」系统安装教程,WinXP~Win11 都适用