Thursday, March 02, 2017

Apache Kafka Message Producer/Consumer

Message Producer for Kafka


package edu.sjsu.amigo.mp.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.Closeable;
import java.util.Properties;
import static edu.sjsu.amigo.mp.kafka.MessageQueueConstants.USER_MSG_TOPIC;
/**
* A kafka message producer.
*
* @author rwatsh on 2/26/17.
*/
public class MessageProducer implements AutoCloseable {
private final Producer producer;
public MessageProducer() {
//Configure the Producer
Properties configProperties = new Properties();
// Assuming that localhost port 9092 is mapped to kafka container's port 9092
// TODO externalize the port
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(configProperties);
}
public void sendUserMessage(String message) {
sendMessage(USER_MSG_TOPIC, message);
}
private void sendMessage(String topicName, String message) {
ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, message);
producer.send(rec);
}
/**
* Closes this resource, relinquishing any underlying resources.
* This method is invoked automatically on objects managed by the
* {@code try}-with-resources statement.
* <p>
* <p>While this interface method is declared to throw {@code
* Exception}, implementers are <em>strongly</em> encouraged to
* declare concrete implementations of the {@code close} method to
* throw more specific exceptions, or to throw no exception at all
* if the close operation cannot fail.
* <p>
* <p> Cases where the close operation may fail require careful
* attention by implementers. It is strongly advised to relinquish
* the underlying resources and to internally <em>mark</em> the
* resource as closed, prior to throwing the exception. The {@code
* close} method is unlikely to be invoked more than once and so
* this ensures that the resources are released in a timely manner.
* Furthermore it reduces problems that could arise when the resource
* wraps, or is wrapped, by another resource.
* <p>
* <p><em>Implementers of this interface are also strongly advised
* to not have the {@code close} method throw {@link
* InterruptedException}.</em>
* <p>
* This exception interacts with a thread's interrupted status,
* and runtime misbehavior is likely to occur if an {@code
* InterruptedException} is {@linkplain Throwable#addSuppressed
* suppressed}.
* <p>
* More generally, if it would cause problems for an
* exception to be suppressed, the {@code AutoCloseable.close}
* method should not throw it.
* <p>
* <p>Note that unlike the {@link Closeable#close close}
* method of {@link Closeable}, this {@code close} method
* is <em>not</em> required to be idempotent. In other words,
* calling this {@code close} method more than once may have some
* visible side effect, unlike {@code Closeable.close} which is
* required to have no effect if called more than once.
* <p>
* However, implementers of this interface are strongly encouraged
* to make their {@code close} methods idempotent.
*
* @throws Exception if this resource cannot be closed
*/
@Override
public void close() throws Exception {
if (producer != null) {
producer.close();
}
}
}


Message Consumer for Kafka
/*
* Copyright (c) 2017 San Jose State University.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*/
package edu.sjsu.amigo.cp.kafka;
import edu.sjsu.amigo.cp.jobs.JobManager;
import edu.sjsu.amigo.cp.jobs.MessageProcessorJob;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.quartz.JobDataMap;
import org.quartz.SchedulerException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static edu.sjsu.amigo.mp.kafka.MessageQueueConstants.AMIGO_CHATBOT_GROUP;
import static edu.sjsu.amigo.mp.kafka.MessageQueueConstants.USER_MSG_TOPIC;
/**
* A Kafka message consumer. As soon as it receives a message it will spawn a job to process it.
*
* Based on https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
*
* @author rwatsh on 2/25/17.
*/
public class ConsumerLoop implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public ConsumerLoop(int id,
String groupId,
List<String> topics) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
String value = record.value();
data.put("value", value);
processMessageAsync(value);
System.out.println(this.id + ": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
private void processMessageAsync(String value) {
if (value != null && !value.trim().isEmpty()) {
try {
// Using Quartz job scheduler to process the message asynchronously
// Job code is not shown in this gist.
// Some unique job name
String jobName = "MESG-JOB-" + UUID.randomUUID().toString();
String groupName = "CHATBOT-GRP";
JobDataMap params = new JobDataMap();
params.put("message", value);
JobManager.getInstance().scheduleJob(MessageProcessorJob.class, jobName, groupName, params);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void shutdown() {
consumer.wakeup();
}
/**
* Run the client program.
*
* @param args
*/
public static void main(String[] args) throws SchedulerException {
//Start the job scheduler
JobManager.getInstance().startScheduler();
int numConsumers = 3;
String groupId = AMIGO_CHATBOT_GROUP;
List<String> topics = Arrays.asList(USER_MSG_TOPIC);
ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
final List<ConsumerLoop> consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
consumers.add(consumer);
executor.submit(consumer);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (ConsumerLoop consumer : consumers) {
consumer.shutdown();
}
executor.shutdown();
try {
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}

No comments:

Popular micro services patterns

Here are some popular Microservice design patterns that a programmer should know: Service Registry  pattern provides a  central location  fo...