The article discusses using Protobufs as the messages on Kafka Topics/Streams.

Protocol Buffers

Protobufs has many pluses that are easily measurable - data size and serialization performance are the most commonly attributed ones among them. However, I think the most important ones are the quality attributes that a data representation format backed by an IDL specification guarantee. Automatically the data quality in the data plane improves. There are fewer errors due to bad data. Consumers know how to parse the data.

All our micro-services talk protobufs over gRPC. In fact, all the Protobuf message types and gRPC service definitions live in a single repository that is a dependency of all services. So it was a natural progression to extend the same binary protocol to the async messages over Kafka.

Kafka Message Headers

The producer side of things is easy. All messages on Kafka are binary is a direct match to protobuf. However, there is a challenge on the consumer side. Unlike JSON which is self describing format, a protobuf message cannot be de-serialized without prior knowledge of the message type. So it is imperative that additional metadata is included by the producer to describe the message.

Similar to HTTP headers, Kafka supports message headers that can carry any metadata. The below snippets outline a strategy to serialize and de-serialize Protobuf messages.

The key abstractions are KafkaEventPublisher that takes a Domain Event object that can be converted to a proto message using the toProto method. The protobuf java type name is included in the header proto.eventType.

@Data
@SuperBuilder
public abstract class DomainEvent<T extends DomainEvent> {
    final Id<T> eventId = Id.of();
    final Instant when = Instant.now();

    public abstract com.google.protobuf.Message toProto();
}


// Annotate all DomainEvent implementations with this annotation.
// @EventMessageType("app.types.account.UserRegistered")
// public class UserRegistered {
// }
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface EventMessageType {
    String value();
}


public class KafkaEventSerializer implements Serializer<DomainEvent<?>> {

    public static final String EVENT_SERIALIZER_SELECTOR_KEY
            = "proto.eventSerializer";
    public static final String EVENT_TYPE_HEADER_KEY 
            = "proto.eventType";

    @Override
    public byte[] serialize(String topic, DomainEvent data) {
        Message message = data.toProto();
        return message.toByteArray();
    }
}

@RequiredArgsConstructor
@Slf4j
public class KafkaEventPublisher implements DomainEventPublisher {

    private final String topic;
    private final KafkaTemplate<String, DomainEvent<?>> kafkaTemplate;
    private final EventCounter eventCounter;

    @Override
    public void publish(@NonNull DomainEvent domainEvent) {
        Header serializerSelectorHdr = new RecordHeader(
            DelegatingSerializer.SERIALIZATION_SELECTOR,
            KafkaEventSerializer.EVENT_SERIALIZER_SELECTOR_KEY.getBytes());

        EventMessageType annotation = AnnotationUtils.findAnnotation(
            domainEvent.getClass(), EventMessageType.class);
        if (null == annotation) {
            throw new AppException("Event lacks EventMessageType annotation. " +
                    "Deserialization will fail during Consumption. Event=" + 
                    domainEvent.getClass().getName());
        }
        String eventType = annotation.value();

        Header eventTypeHdr = new RecordHeader(
            KafkaEventSerializer.EVENT_TYPE_HEADER_KEY,
            eventType.getBytes());

        ProducerRecord record = new ProducerRecord(
                topic,null, (Object) null, domainEvent,
                Arrays.asList(serializerSelectorHdr, eventTypeHdr));

        kafkaTemplate.send(record);

        eventCounter.increment("produced_" + eventType);
    }
}

On the producer side, create the KafkaTemplate employing the above Serializer.


@Configuration
@Slf4j
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaBootstrapServers;

    @Value("${spring.application.name}")
    private String applicationName;
    
    @Value("${app.topic.main.parition-count}")
    private int partitionCount;

    @Value("${app.topic.main.replica-count}")
    private int replicaCount;

    @Bean
    public DomainEventPublisher eventPublisher(MeterRegistry meterRegistry) {
        return new KafkaEventPublisher(topicName(), 
        	kafkaTemplate(), new EventCounter(meterRegistry));
    }

    @Bean
    @Qualifier("app.topic.main")
    public String topicName() {
        return applicationName;
    }

    @Bean
    public NewTopic applicationTopic() {
        return TopicBuilder
                .name(topicName())
                .partitions(partitionCount)
                .replicas(replicaCount)
                .build();
    }

    @Bean
    public ProducerFactory<String, DomainEvent<?>> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put("bootstrap.servers", kafkaBootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, applicationName);

        DelegatingSerializer delegatingSerializer = new DelegatingSerializer();
        delegatingSerializer.addDelegate(
                "kafka.eventSerilizer",
                new KafkaEventSerializer());

        return new DefaultKafkaProducerFactory(
        	props, new StringSerializer(), delegatingSerializer);
    }

    @Bean
    public KafkaTemplate<String, DomainEvent<?>> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

On the receiver side, the same header (kafka.eventType) is read and the class name is used to parse the protobuf object using reflection.


public class KafkaEventDeSerializer implements Deserializer<Message> {

    @Override
    public Message deserialize(String topic, byte[] data) {
        throw new IllegalStateException("Headers not available");
    }

    @Override
    public Message deserialize(String topic, Headers headers, byte[] data) {
        Header eventTypeHeader = headers.lastHeader(
            KafkaEventSerializer.EVENT_TYPE_HEADER_KEY);
        String eventType = new String(eventTypeHeader.value());
        try {
            Class clazz = ClassUtils.forName(eventType,
            		getClass().getClassLoader());
            if (!Message.class.isAssignableFrom(clazz)) {
                throw new AppException(String.format("The eventTypeClass [%s] is not a subtype of protobuf.Message.", clazz.getName()));
            }
            Method m = ReflectionUtils.findMethod(clazz, "parseFrom", byte[].class);
            if (null == m) {
                throw new AppException(String.format("The message class [%s] must have a parseFrom(byte[] bytes) method", clazz.getName()));
            }
            return (Message) m.invoke(null, (Object) data);
        } catch (Exception e) {
            throw new AppException("Error deserializing. EventType=" + eventType, e);
        }
    }
}

On the consumer side, create the KafkaListenerFactory employing the KafkaEventDeserializer. The property to set is ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG.


@Configuration
@EnableKafka
@Slf4j
public class KafkaConsumerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaBootstrapServers;

    @Value("${spring.application.name}")
    private String appName;

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, appName);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaEventDeSerializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, appName);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Now receiving a typed message is as simple as:

@Service
@Slf4j
public class Receiver {

    @KafkaListener(topics = {"registration"})
    public void receive(UserRegistered message) {
        log.info("Received UserRegistered message");
    }
}