I Apache Kafka skriver Java-applikationer som kallas producenter strukturerade meddelanden till ett Kafka-kluster (som består av mäklare). På liknande sätt läser Java-applikationer som kallas konsumenter dessa meddelanden från samma kluster. I vissa organisationer finns det olika grupper som ansvarar för att skriva och hantera producenter och konsumenter. I sådana fall kan en stor smärtpunkt vara samordningen av det överenskomna meddelandeformatet mellan producenter och konsumenter.
Det här exemplet visar hur man använder Apache Avro för att serialisera poster som produceras till Apache Kafka samtidigt som det tillåter utveckling av scheman och osynkron uppdatering av producent- och konsumentapplikationer.
Serialisering och deserialisering
En Kafka-post (tidigare kallad meddelande) består av en nyckel, ett värde och rubriker. Kafka är inte medveten om strukturen av data i posters nyckel och värde. Den hanterar dem som byte-arrayer. Men system som läser poster från Kafka bryr sig om data i dessa poster. Så du behöver producera data i ett läsbart format. Dataformatet du använder bör
- Var kompakt
- Var snabb med att koda och avkoda
- Tillåt utveckling
- Tillåt uppströmssystem (de som skriver till ett Kafka-kluster) och nedströmssystem (de som läser från samma Kafka-kluster) att uppgradera till nyare scheman vid olika tidpunkter
JSON, till exempel, är självförklarande men är inte ett kompakt dataformat och är långsam att tolka. Avro är en snabb serialiseringsram som skapar relativt kompakt utdata. Men för att läsa Avro-poster behöver du schemat som data serialiserades med.
Ett alternativ är att lagra och överföra schemat med själva posten. Detta är bra i en fil där du lagrar schemat en gång och använder det för ett stort antal poster. Att lagra schemat i varje Kafka-post lägger dock till betydande omkostnader när det gäller lagringsutrymme och nätverksanvändning. Ett annat alternativ är att ha en överenskommen uppsättning identifierare-schema-mappningar och hänvisa till scheman genom deras identifierare i posten.
Från objekt till Kafka Record and Back
Producentapplikationer behöver inte konvertera data direkt till byte-arrayer. KafkaProducer är en generisk klass som kräver att användaren specificerar nyckel- och värdetyper. Sedan accepterar producenter instanser av ProducerRecord
som har samma typparametrar. Konvertering från objektet till byte-array görs av en Serializer. Kafka tillhandahåller några primitiva serialiserare:till exempel IntegerSerializer
, ByteArraySerializer
, StringSerializer
. På konsumentsidan konverterar liknande Deserializers byte-arrayer till ett objekt som programmet kan hantera.
Så det är vettigt att koppla in på Serializer- och Deserializer-nivå och låta utvecklare av producent- och konsumentapplikationer använda det bekväma gränssnittet från Kafka. Även om de senaste versionerna av Kafka tillåter ExtendedSerializers
och ExtendedDeserializers
för att komma åt rubriker bestämde vi oss för att inkludera schemaidentifieraren i Kafka-posternas nyckel och värde istället för att lägga till postrubriker.
Avro Essentials
Avro är ett ramverk för dataserialisering (och fjärranrop). Den använder ett JSON-dokument som kallas schema för att beskriva datastrukturer. Den mesta användningen av Avro sker genom antingen GenericRecord eller underklasser av SpecificRecord. Java-klasser som genereras från Avro-scheman är underklasser till de senare, medan de förra kan användas utan förkunskaper om den datastruktur som man arbetar med.
När två scheman uppfyller en uppsättning kompatibilitetsregler kan data som skrivits med ett schema (kallat writer-schema) läsas som om det skrevs med det andra (kallat läsarschema). Scheman har en kanonisk form som har alla detaljer som är irrelevanta för serialiseringen, såsom kommentarer, avskalade för att underlätta likvärdighetskontrollen.
VersionedSchema och SchemaProvider
Som nämnts tidigare behöver vi en en-till-en-mappning mellan scheman och deras identifierare. Ibland är det lättare att referera till scheman med namn. När ett kompatibelt schema skapas kan det betraktas som en nästa version av schemat. Således kan vi referera till scheman med ett namn, versionspar. Låt oss kalla schemat, dess identifierare, namn och version tillsammans ett VersionedSchema
. Det här objektet kan innehålla ytterligare metadata som programmet kräver.
public class VersionedSchema { private final int id; private final String name; private final int version; private final Schema schema; public VersionedSchema(int id, String name, int version, Schema schema) { this.id = id; this.name = name; this.version = version; this.schema = schema; } public String getName() { return name; } public int getVersion() { return version; } public Schema getSchema() { return schema; } public int getId() { return id; } }
SchemaProvider
objekt kan slå upp instanserna av VersionedSchema
.
public interface SchemaProvider extends AutoCloseable { public VersionedSchema get(int id); public VersionedSchema get(String schemaName, int schemaVersion); public VersionedSchema getMetadata(Schema schema); }
Hur detta gränssnitt implementeras beskrivs i "Implementera en Schema Store" i ett framtida blogginlägg.
Serialisera generiska data
När vi serialiserar en post måste vi först ta reda på vilket schema vi ska använda. Varje post har ett getSchema
metod. Men att ta reda på identifieraren från schemat kan vara tidskrävande. Det är generellt sett mer effektivt att ställa in schemat vid initialiseringstidpunkten. Detta kan göras direkt med hjälp av identifierare eller med namn och version. Vidare, när vi producerar för flera ämnen, kanske vi vill ställa in olika scheman för olika ämnen och ta reda på schemat från ämnesnamnet som tillhandahålls som parameter till metoden serialize(T, String)
. Denna logik utelämnas i våra exempel för korthetens och enkelhetens skull.
private VersionedSchema getSchema(T data, String topic) { return schemaProvider.getMetadata( data.getSchema()); }
Med schemat i handen måste vi lagra det i vårt meddelande. Att serialisera ID:t som en del av meddelandet ger oss en kompakt lösning, eftersom all magi sker i Serializer/Deserializer. Det möjliggör också mycket enkel integration med andra ramverk och bibliotek som redan stöder Kafka och låter användaren använda sin egen serializer (som Spark).
Med detta tillvägagångssätt skriver vi först schemaidentifieraren på de första fyra byten.
private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException { try (DataOutputStream os = new DataOutputStream(stream)) { os.writeInt(id); } }
Sedan kan vi skapa en DatumWriter
och serialisera objektet.
private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException { BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null); DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema); datumWriter.write(data, encoder); encoder.flush(); }
För att sammanställa allt detta har vi implementerat en generisk dataserializer.
public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> { private SchemaProvider schemaProvider; @Override public void configure(Map<String, ?> configs, boolean isKey) { schemaProvider = SchemaUtils.getSchemaProvider(configs); } @Override public byte[] serialize(String topic, T data) { try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { VersionedSchema schema = getSchema(data, topic); writeSchemaId(stream, schema.getId()); writeSerializedAvro(stream, data, schema.getSchema()); return stream.toByteArray(); } catch (IOException e) { throw new RuntimeException("Could not serialize data", e); } } private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...} private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...} private VersionedSchema getSchema(T data, String topic) {...} @Override public void close() { try { schemaProvider.close(); } catch (Exception e) { throw new RuntimeException(e); } } }
Deserialisering av generisk data
Deserialisering kan fungera med ett enda schema (schemadata skrevs med) men du kan ange ett annat läsarschema. Läsarschemat måste vara kompatibelt med schemat som data serialiserades med, men behöver inte vara likvärdigt. Av denna anledning introducerade vi schemanamn. Vi kan nu specificera att vi vill läsa data med specifik version av ett schema. Vid initialisering läser vi önskade schemaversioner per schemanamn och lagrar metadata i readerSchemasByName
för snabb åtkomst. Nu kan vi läsa varje post som skrivits med en kompatibel version av schemat som om den var skriven med den angivna versionen.
@Override public void configure(Map<String, ?> configs, boolean isKey) { this.schemaProvider = SchemaUtils.getSchemaProvider(configs); this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider); }
När en post behöver deserialiseras läser vi först identifieraren för writer-schemat. Detta gör det möjligt att slå upp läsarschemat efter namn. Med båda scheman tillgängliga kan vi skapa en GeneralDatumReader
och läs posten.
@Override public GenericData.Record deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { int schemaId = readSchemaId(stream); VersionedSchema writerSchema = schemaProvider.get(schemaId); VersionedSchema readerSchema = readerSchemasByName.get(writerSchema.getName()); GenericData.Record avroRecord = readAvroRecord(stream, writerSchema.getSchema(), readerSchema.getSchema()); return avroRecord; } catch (IOException e) { throw new RuntimeException(e); } } private int readSchemaId(InputStream stream ) throws IOException { try(DataInputStream is = new DataInputStream(stream)) { return is.readInt(); } } private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException { DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); GenericData.Record record = new GenericData.Record(readerSchema); datumReader.read(record, decoder); return record; }
Hantera SpecificRecords
Oftast finns det en klass vi vill använda för våra rekord. Denna klass genereras sedan vanligtvis från ett Avro-schema. Apache Avro tillhandahåller verktyg för att generera Java-kod från scheman. Ett sådant verktyg är plugin-programmet Avro Maven. Genererade klasser har schemat de genererades från tillgängligt vid körning. Detta gör serialisering och deserialisering enklare och mer effektiv. För serialisering kan vi använda klassen för att ta reda på vilken schemaidentifierare som ska användas.
@Override public void configure(Map<String, ?> configs, boolean isKey) { String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString(); try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) { Class<?> recordClass = Class.forName(className); Schema writerSchema = new SpecificData(recordClass.getClassLoader()).getSchema(recordClass); this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId(); } catch (Exception e) { throw new RuntimeException(e); } }
Därför behöver vi inte logiken för att bestämma schema från ämne och data. Vi använder schemat som finns i postklassen för att skriva poster.
På samma sätt, för deserialisering, kan läsarschemat hittas från själva klassen. Deserialiseringslogiken blir enklare eftersom läsarschemat är fixat vid konfigurationstillfället och behöver inte slås upp med schemanamn.
@Override public T deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { int schemaId = readSchemaId(stream); VersionedSchema writerSchema = schemaProvider.get(schemaId); return readAvroRecord(stream, writerSchema.getSchema(), readerSchema); } catch (IOException e) { throw new RuntimeException(e); } } private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException { DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); return datumReader.read(null, decoder); }
Ytterligare läsning
För mer information om schemakompatibilitet, se Avro-specifikationen för Schema Resolution.
För mer information om kanoniska former, se Avro-specifikationen för att analysera kanoniska formulär för scheman.
Nästa gång...
Del 2 kommer att visa en implementering av ett system för att lagra Avro-schemadefinitionerna.