Kafka Schema Registry 使用教程(JAVA)[转]

https://www.jianshu.com/p/d5ed58d7aa65

如果没有 Schema Registry服务,自定义的schema需要在数据生产端和数据消费端都保存,有了 Schema Registry服务,数据生成方设置好Schema,会自动注册到 registry服务中,这样数据消费方就不需要保存schema,直接消费就可以。

实例以kafka 0.10.2.0 版本举例,kafka和Schema Registry的部署步骤在此省略。

  1. 相关jar包在 Maven仓库中下载不到,所以需要在maven的setting文件添加如下信息:
<mirrors>
 <mirror>
         <id>confluent</id>
         <mirrorOf>confluent</mirrorOf>
         <name>Nexus public mirror</name>
         <url>http://packages.confluent.io/maven/</url>
 </mirror>
 ...
</mirrors>

<profiles>
 <profile>
     <repository>
         <id>confluent</id>
         <url>http://packages.confluent.io/maven/</url>
         <releases>
             <enabled>true</enabled>
         </releases>
         <snapshots>
             <enabled>true</enabled>
         </snapshots>
     </repository>
     ...
 </profile>
</profiles>

在maven中引入依赖

<dependency>
 <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
 <version>3.1.1</version>
</dependency>

生成数据

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaSchemaProducer {

    public static final String SCHEMA = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\", \"type\": \"string\"}, {\"name\":\"age\", \"type\": \"int\"}, {\"name\":\"sex\",\"type\": \"string\"}, {\"name\":\"comment\",\"type\": \"string\"}]}";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
        properties.put("schema.registry.url", "http://localhost:8081");

        Producer<String, GenericRecord> producer = new KafkaProducer<>(properties);

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(SCHEMA);

        ProducerRecord producerRecord;
        for (int i = 0; i < 50; i++) {
            GenericRecord record = new Record(schema);
            record.put("id", i);
            record.put("name", "name" + i);
            record.put("age", i);
            record.put("sex", "male");
            record.put("comment", "comment" + i);
            producerRecord = new ProducerRecord("kyle-test-schema", record);
            producer.send(producerRecord);
        }
        producer.flush();
        System.out.println("complete");
        producer.close();
    }
}

消费数据

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaSchemaConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("schema.registry.url", "http://localhost:8081");
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kyle-test");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());

        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("kyle-test-schema"));
        ConsumerRecords<String, GenericRecord> records;
        try {
            while(true) {
                records = consumer.poll(1000);
                for (ConsumerRecord<String, GenericRecord> record : records) {
                    GenericRecord user = record.value();
                    System.out.println("id: " + user.get("id") + ", name: " + user.get("name") + ", age: " + user.get("age") + ", comment: " + user.get("comment") + ", sex: " + user.get("sex"));
                }
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

消费结果如下

id: 6, name: name6, age: 6, comment: comment6, sex: male
id: 16, name: name16, age: 16, comment: comment16, sex: male
id: 26, name: name26, age: 26, comment: comment26, sex: male
id: 36, name: name36, age: 36, comment: comment36, sex: male
id: 46, name: name46, age: 46, comment: comment46, sex: male
id: 4, name: name4, age: 4, comment: comment4, sex: male
id: 14, name: name14, age: 14, comment: comment14, sex: male
...

关于xmsg

技术面前人人平等.同时技术也不分高低贵贱.正所谓学无大小,达者为尊.
此条目发表在未分类分类目录。将固定链接加入收藏夹。