# kafka-examples **Repository Path**: mill1983/kafka-examples ## Basic Information - **Project Name**: kafka-examples - **Description**: No description available - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-01-17 - **Last Updated**: 2020-12-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # kafka-examples ### 关于配置 [配置文件](./src/main/resources/kafka_config.properties) ##### 重要的配置项 ``` bootstrap.servers=lang7.liuxiaoyi.com.cn:9094 ``` ### 关于网络配置 > 集群内部寻找主机会使用内部的主机名来寻址,所以需要在hosts文件中配置好对应主机的地址 ```shell # vim /etc/hosts 117.50.42.138 master 117.50.42.138 slave1 117.50.42.138 slave2 ``` ### 关于发送内容 > 发送的内容压缩成行,使用分隔符进行分割,这样做的好处是最小的传输量,没有冗余,但是需要注意一下两点: - 必须严格按照顺序来组织字段 - 分隔符的选定:使用ascII中27,26两个隐藏字符拼合组成分隔符,基本可以避免文本内容被误认为分隔符的情况发生 ### 关于key > kafka只能保证同一分区内数据有序,而我们需要的信息要求同一用户的数据有序。因此,相同的客户必须分配到相同的分区。kafka会按照key的hash值来分配分区,因此,key需要和用户标示进行捆绑。 ### 最终发送代码 ``` @Test public void testSendContent() throws Exception { // 比如要发送这么一个数组 String[] fields = { "260", "5", "68Thread[pool-2-thread-1_5_main]", "261", "261", "True", "5", "7", "1578992268865", "0000", "1578992268865", "T03", "0" }; // 创建分隔符 char[] dlimit = { 27, 26 }; String sDlimit = String.valueOf(dlimit); // 创建一个kafka的生产者 Properties properties = new Properties(); properties.load(getClass().getClassLoader().getResourceAsStream("kafka_config.properties")); KafkaProducer kafkaProducer = new KafkaProducer<>(properties); // 创建消息对象 String topic = "first"; // 目标的主题 String key = "68Thread[pool-2-thread-1_5_main]"; // key的作用是分区,使用客户id,表示相同的客户在同一个分区 String value = Arrays.stream(fields).reduce((a, b) -> a + sDlimit + b).get(); ProducerRecord producerRecord = new ProducerRecord(topic, key, value); // 发送消息 kafkaProducer.send(producerRecord); // 一定要记得关闭流 kafkaProducer.close(); } ```