https://www.jianshu.com/p/d5ed58d7aa65
如果没有 Schema Registry服务,自定义的schema需要在数据生产端和数据消费端都保存,有了 Schema Registry服务,数据生成方设置好Schema,会自动注册到 registry服务中,这样数据消费方就不需要保存schema,直接消费就可以。
实例以kafka 0.10.2.0 版本举例,kafka和Schema Registry的部署步骤在此省略。
- 相关jar包在 Maven仓库中下载不到,所以需要在maven的setting文件添加如下信息:
<mirrors>
<mirror>
<id>confluent</id>
<mirrorOf>confluent</mirrorOf>
<name>Nexus public mirror</name>
<url>http://packages.confluent.io/maven/</url>
</mirror>
...
</mirrors>
<profiles>
<profile>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
...
</profile>
</profiles>
在maven中引入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.1.1</version>
</dependency>
生成数据
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaSchemaProducer {
public static final String SCHEMA = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\", \"type\": \"string\"}, {\"name\":\"age\", \"type\": \"int\"}, {\"name\":\"sex\",\"type\": \"string\"}, {\"name\":\"comment\",\"type\": \"string\"}]}";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.put("schema.registry.url", "http://localhost:8081");
Producer<String, GenericRecord> producer = new KafkaProducer<>(properties);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(SCHEMA);
ProducerRecord producerRecord;
for (int i = 0; i < 50; i++) {
GenericRecord record = new Record(schema);
record.put("id", i);
record.put("name", "name" + i);
record.put("age", i);
record.put("sex", "male");
record.put("comment", "comment" + i);
producerRecord = new ProducerRecord("kyle-test-schema", record);
producer.send(producerRecord);
}
producer.flush();
System.out.println("complete");
producer.close();
}
}
消费数据
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaSchemaConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("schema.registry.url", "http://localhost:8081");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kyle-test");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("kyle-test-schema"));
ConsumerRecords<String, GenericRecord> records;
try {
while(true) {
records = consumer.poll(1000);
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord user = record.value();
System.out.println("id: " + user.get("id") + ", name: " + user.get("name") + ", age: " + user.get("age") + ", comment: " + user.get("comment") + ", sex: " + user.get("sex"));
}
}
} catch (Exception e){
e.printStackTrace();
} finally {
consumer.close();
}
}
}
消费结果如下
id: 6, name: name6, age: 6, comment: comment6, sex: male
id: 16, name: name16, age: 16, comment: comment16, sex: male
id: 26, name: name26, age: 26, comment: comment26, sex: male
id: 36, name: name36, age: 36, comment: comment36, sex: male
id: 46, name: name46, age: 46, comment: comment46, sex: male
id: 4, name: name4, age: 4, comment: comment4, sex: male
id: 14, name: name14, age: 14, comment: comment14, sex: male
...