How to write Kafka Consumer Client in java to consume the messages from multiple brokers? -


i looking java client (kafka consumer) consume messages multiple brokers. please advice

below code written publish messages multiple brokers using simple partitioner.

topic created replication factor "2" , partition "3".

public int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster) {     list<partitioninfo> partitions = cluster.partitionsfortopic(topic);     int numpartitions = partitions.size();     logger.info("number of partitions " + numpartitions);     if (keybytes == null)      {         int nextvalue = counter.getandincrement();         list<partitioninfo> availablepartitions = cluster.availablepartitionsfortopic(topic);         if (availablepartitions.size() > 0)          {             int part = topositive(nextvalue) % availablepartitions.size();             int selectedpartition = availablepartitions.get(part).partition();             logger.info("selected partition " + selectedpartition);             return selectedpartition;         }          else          {             // no partitions available, give non-available partition             return topositive(nextvalue) % numpartitions;         }     }      else      {         // hash keybytes choose partition         return topositive(utils.murmur2(keybytes)) % numpartitions;     }  }   public void publishmessage(string message , string topic) {     producer<string, string> producer = null;     try     {      producer = new kafkaproducer<>(producerconfigs());      logger.info("topic publish message --" + this.topic);      for(int =0 ; < 10 ; i++)      {      producer.send(new producerrecord<string, string>(this.topic, message));      logger.info("message published successfully");      }     }     catch(exception e)     {         logger.error("exception occured " + e.getmessage()) ;     }         {      producer.close();     } }  public map<string, object> producerconfigs()  {     loadpropertyfile();     map<string, object> propsmap = new hashmap<>();     propsmap.put(producerconfig.bootstrap_servers_config, brokerlist);     propsmap.put(producerconfig.key_serializer_class_config, stringserializer.class);     propsmap.put(producerconfig.value_serializer_class_config, stringserializer.class);     propsmap.put(producerconfig.partitioner_class_config, simplepartitioner.class);     propsmap.put(producerconfig.acks_config, "1");     return propsmap; }  public map<string, object> consumerconfigs() {     map<string, object> propsmap = new hashmap<>();     system.out.println("properties.getbootstrap()"  + properties.getbootstrap());     propsmap.put(consumerconfig.bootstrap_servers_config, properties.getbootstrap());     propsmap.put(consumerconfig.enable_auto_commit_config, false);     propsmap.put(consumerconfig.auto_commit_interval_ms_config, properties.getautocommit());     propsmap.put(consumerconfig.session_timeout_ms_config, properties.gettimeout());     propsmap.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);     propsmap.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);     propsmap.put(consumerconfig.group_id_config, properties.getgroupid());     propsmap.put(consumerconfig.auto_offset_reset_config, properties.getautooffset());     return propsmap; }  @kafkalistener(id = "id1", topics = "${config.topic}", group = "${config.groupid}") public void listen(consumerrecord<?, ?> record)  {     logger.info("message consumed " + record);     logger.info("partition record received " + record.partition());     this.message = record.value().tostring();    } 

bootstrap.servers = [localhost:9092, localhost:9093, localhost:9094]

if use regular java consumer, automatically read multiple brokers. there no special code need write. subscribe topic(s) want consumer , consumer connect corresponding brokers automatically. provide "single entry point" broker -- client figures out other broker of cluster automatically.


Comments

Popular posts from this blog

javascript - Clear button on addentry page doesn't work -

c# - Selenium Authentication Popup preventing driver close or quit -

tensorflow when input_data MNIST_data , zlib.error: Error -3 while decompressing: invalid block type -