How to write Kafka Consumer Client in java to consume the messages from multiple brokers? -
i looking java client (kafka consumer) consume messages multiple brokers. please advice
below code written publish messages multiple brokers using simple partitioner.
topic created replication factor "2" , partition "3".
public int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster) { list<partitioninfo> partitions = cluster.partitionsfortopic(topic); int numpartitions = partitions.size(); logger.info("number of partitions " + numpartitions); if (keybytes == null) { int nextvalue = counter.getandincrement(); list<partitioninfo> availablepartitions = cluster.availablepartitionsfortopic(topic); if (availablepartitions.size() > 0) { int part = topositive(nextvalue) % availablepartitions.size(); int selectedpartition = availablepartitions.get(part).partition(); logger.info("selected partition " + selectedpartition); return selectedpartition; } else { // no partitions available, give non-available partition return topositive(nextvalue) % numpartitions; } } else { // hash keybytes choose partition return topositive(utils.murmur2(keybytes)) % numpartitions; } } public void publishmessage(string message , string topic) { producer<string, string> producer = null; try { producer = new kafkaproducer<>(producerconfigs()); logger.info("topic publish message --" + this.topic); for(int =0 ; < 10 ; i++) { producer.send(new producerrecord<string, string>(this.topic, message)); logger.info("message published successfully"); } } catch(exception e) { logger.error("exception occured " + e.getmessage()) ; } { producer.close(); } } public map<string, object> producerconfigs() { loadpropertyfile(); map<string, object> propsmap = new hashmap<>(); propsmap.put(producerconfig.bootstrap_servers_config, brokerlist); propsmap.put(producerconfig.key_serializer_class_config, stringserializer.class); propsmap.put(producerconfig.value_serializer_class_config, stringserializer.class); propsmap.put(producerconfig.partitioner_class_config, simplepartitioner.class); propsmap.put(producerconfig.acks_config, "1"); return propsmap; } public map<string, object> consumerconfigs() { map<string, object> propsmap = new hashmap<>(); system.out.println("properties.getbootstrap()" + properties.getbootstrap()); propsmap.put(consumerconfig.bootstrap_servers_config, properties.getbootstrap()); propsmap.put(consumerconfig.enable_auto_commit_config, false); propsmap.put(consumerconfig.auto_commit_interval_ms_config, properties.getautocommit()); propsmap.put(consumerconfig.session_timeout_ms_config, properties.gettimeout()); propsmap.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class); propsmap.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class); propsmap.put(consumerconfig.group_id_config, properties.getgroupid()); propsmap.put(consumerconfig.auto_offset_reset_config, properties.getautooffset()); return propsmap; } @kafkalistener(id = "id1", topics = "${config.topic}", group = "${config.groupid}") public void listen(consumerrecord<?, ?> record) { logger.info("message consumed " + record); logger.info("partition record received " + record.partition()); this.message = record.value().tostring(); }
bootstrap.servers = [localhost:9092, localhost:9093, localhost:9094]
if use regular java consumer, automatically read multiple brokers. there no special code need write. subscribe topic(s) want consumer , consumer connect corresponding brokers automatically. provide "single entry point" broker -- client figures out other broker of cluster automatically.
Comments
Post a Comment