1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public class KafkaProducerExample { private static Producer<Long, String> createProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return new KafkaProducer<>(props); }
static void runProducer(final int sendMessageCount) throws Exception { final Producer<Long, String> producer = createProducer(); long time = System.currentTimeMillis();
try { for (long index = time; index < time + sendMessageCount; index++) { final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index, "Hello Mom " + index);
RecordMetadata metadata = producer.send(record).get();
long elapsedTime = System.currentTimeMillis() - time; System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d) time=%d\n", record.key(), record.value(), metadata.partition(), metadata.offset(), elapsedTime);
} } finally { producer.flush(); producer.close(); } }
|