首页下载资源大数据kafka(2).zip

ZIPkafka(2).zip

weixin_4217115931.08KB需要积分:1

资源文件列表:

kafka(2).zip 大约有50个文件
  1. kafka/
  2. kafka/.idea/
  3. kafka/.idea/compiler.xml 637B
  4. kafka/.idea/libraries/
  5. kafka/.idea/libraries/Maven__com_github_luben_zstd_jni_1_5_0_2.xml 541B
  6. kafka/.idea/libraries/Maven__org_apache_kafka_kafka_clients_3_0_0.xml 562B
  7. kafka/.idea/libraries/Maven__org_lz4_lz4_java_1_7_1.xml 491B
  8. kafka/.idea/libraries/Maven__org_slf4j_slf4j_api_1_7_30.xml 513B
  9. kafka/.idea/libraries/Maven__org_xerial_snappy_snappy_java_1_1_8_1.xml 566B
  10. kafka/.idea/misc.xml 366B
  11. kafka/.idea/modules.xml 257B
  12. kafka/.idea/workspace.xml 24.53KB
  13. kafka/kafka.iml 1.36KB
  14. kafka/pom.xml 665B
  15. kafka/src/
  16. kafka/src/main/
  17. kafka/src/main/java/
  18. kafka/src/main/java/com/
  19. kafka/src/main/java/com/gansu/
  20. kafka/src/main/java/com/gansu/kafka/
  21. kafka/src/main/java/com/gansu/kafka/producter/
  22. kafka/src/main/java/com/gansu/kafka/producter/CustomProducerParameters.java 1.85KB
  23. kafka/src/main/java/com/gansu/kafka/producter/CustomProducerTransactions.java 1.89KB
  24. kafka/src/main/java/com/gansu/kafka/producter/CustomProducter.java 1.19KB
  25. kafka/src/main/java/com/gansu/kafka/producter/CustomProducterAcks.java 1.67KB
  26. kafka/src/main/java/com/gansu/kafka/producter/CustomProducterCallbacck.java 2.04KB
  27. kafka/src/main/java/com/gansu/kafka/producter/CustomProducterCallbackPartitions.java 2.3KB
  28. kafka/src/main/java/com/gansu/kafka/producter/CustomProducterSycn.java 1.5KB
  29. kafka/src/main/java/com/gansu/kafka/producter/MyPartitioner.java 1.4KB
  30. kafka/src/main/resources/
  31. kafka/src/test/
  32. kafka/src/test/java/
  33. kafka/target/
  34. kafka/target/classes/
  35. kafka/target/classes/com/
  36. kafka/target/classes/com/gansu/
  37. kafka/target/classes/com/gansu/kafka/
  38. kafka/target/classes/com/gansu/kafka/producter/
  39. kafka/target/classes/com/gansu/kafka/producter/CustomProducerParameters.class 1.8KB
  40. kafka/target/classes/com/gansu/kafka/producter/CustomProducerTransactions.class 2.22KB
  41. kafka/target/classes/com/gansu/kafka/producter/CustomProducter.class 1.78KB
  42. kafka/target/classes/com/gansu/kafka/producter/CustomProducterAcks.class 1.92KB
  43. kafka/target/classes/com/gansu/kafka/producter/CustomProducterCallbacck$1.class 1.38KB
  44. kafka/target/classes/com/gansu/kafka/producter/CustomProducterCallbacck.class 2.02KB
  45. kafka/target/classes/com/gansu/kafka/producter/CustomProducterCallbackPartitions$1.class 1.42KB
  46. kafka/target/classes/com/gansu/kafka/producter/CustomProducterCallbackPartitions.class 2.07KB
  47. kafka/target/classes/com/gansu/kafka/producter/CustomProducterSycn.class 1.92KB
  48. kafka/target/classes/com/gansu/kafka/producter/MyPartitioner.class 1.29KB
  49. kafka/target/generated-sources/
  50. kafka/target/generated-sources/annotations/

资源介绍:

kafka(2).zip
package com.gansu.kafka.producter; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class CustomProducterCallbackPartitions { public static void main(String[] args) throws InterruptedException { // 1. 创建kafka生产者的配置对象 Properties properties = new Properties(); // 2. 给kafka配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.102:9092"); //关联添加自定义分区器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.gansu.kafka.producter.MyPartitioner"); // key,value序列化(必须):key.serializer,value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 3. 创建kafka生产者对象 KafkaProducer kafkaProducer = new KafkaProducer<>(properties); // 4. 调用send方法,发送消息 for (int i = 0; i < 3; i++) { // 指定数据发送到1号分区,key为空(IDEA中ctrl + p查看参数) // 依次指定key值为a,b,f ,数据key的hash值与3个分区求余,分别发往1、2、0 kafkaProducer.send(new ProducerRecord("first","root001"+i) // 添加回调 , new Callback(){ // 该方法在Producer收到ack时调用,为异步调用 @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception == null){ // 没有异常,输出信息到控制台 System.out.println("主题是"+metadata.topic()+",分区是"+metadata.partition()); }else{ // 出现异常打印 exception.printStackTrace(); } } }); // 延迟一会会看到数据发往不同分区 // Thread.sleep(10); } // 5. 关闭资源 kafkaProducer.close(); } }
100+评论
captcha