KafkaJava应用主要由Java应用程序与Apache Kafka进行交互来实现。
一、KafkaJava生产者示例
以下实例展示如何创建一个简单的Kafka生产者,向Kafka主题发送消息。
importorg.apache.kafka.clients.producer.*; importjava.util.Properties; publicclassSimpleProducer{ publicstaticvoidmain(String[]args){ StringtopicName="MyTopic"; Propertiesprops=newProperties(); props.put("bootstrap.servers","localhost:9092"); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String>producer=newKafkaProducer<>(props); for(inti=0;i<10;i++){ producer.send(newProducerRecord<String,String>(topicName,Integer.toString(i),Integer.toString(i))); } System.out.println("Messagesentsuccessfully"); producer.close(); } }
二、KafkaJava消费者示例
以下实例展示如何创建一个简单的Kafka消费者。
importorg.apache.kafka.clients.consumer.*; importjava.util.Arrays; importjava.util.Properties; publicclassSimpleConsumer{ publicstaticvoidmain(String[]args){ StringtopicName="MyTopic"; Propertiesprops=newProperties(); props.put("bootstrap.servers","localhost:9092"); props.put("group.id","test"); props.put("enable.auto.commit","true"); props.put("auto.commit.interval.ms","1000"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topicName)); while(true){ ConsumerRecords<String,String>records=consumer.poll(100); for(ConsumerRecord<String,String>record:records){ System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value()); } } } }
三、KafkaJava处理逻辑
在使用Kafka时,消息的处理往往需要自定义逻辑。一般我们会在接收消息和发送消息时进行添加处理逻辑。
同样处理逻辑也可以写在发送或接收消息的时候,例如:
... Converterconverter=newConverterObject();//假设你有一个消息转换类 StringprocessedValue=converter.process(record.value());//这就是你自定义处理逻辑的结果 ...
原创文章,作者:小蓝,如若转载,请注明出处:https://www.beidandianzhu.com/g/1211.html