private void updateTopics(Set<String> topicNames) {
    Set<String> currentTopics = topicsByName.keySet();
    Set<String> topicsToAdd = Sets.difference(topicNames, currentTopics);
    Set<String> topicsToRemove = Sets.difference(currentTopics, topicNames);

    topicsToAdd.stream()
            .filter(topicName -> !topicName.equals("__consumer_offsets"))
            .forEach(topicName -> {
                try {
                    String partitionsPath = "/brokers/topics/" + topicName + "/partitions";
                    zkClient.waitUntilExists(partitionsPath, TimeUnit.SECONDS, 5);
                    topicsByName.put(topicName, new KafkaTopic(topicName, zkClient.getChildren(partitionsPath).size()));
                    log.info("Topic added: [{}]", topicName);
                } catch (Exception e) {
                    log.error("Exception fetching info for topic [{}]: {}", topicName, e.getMessage(), e);
                }
            });

    topicsToRemove.forEach(topicName -> {
        topicsByName.remove(topicName);
        log.info("Topic deleted: [{}]", topicName);
    });
}        
main