官网下载apache kafka 0.10.20版本,本例子基于jdk1.8环境,mac os el captain.
第一步:下载0.10.2.0压缩包,解压缩
官网http://kafka.apache.org/下载
> tar -xzf kafka_2.11-0.10.2.0.tgz
> cd kafka_2.11-0.10.2.0
第二步 启动服务器
启动zookeeper服务器
> bin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka服务器
> bin/kafka-server-start.sh config/server.properties
第三步 创建主题
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
> bin/kafka-topics.sh --list --zookeeper localhost:2181
第四步 给主题发信息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
第五步 创建用户(consumer)接收信息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
使用java创建demo主要针对创建主题,发信息,这主要靠kafka生产者(Producer).
//连接配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//生产者接口及相关实现类
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
//发送消息
/**
* ProducerRecord(String topic, K key, V value) Create a record to
* be sent to Kafka
*/
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.close();
从服务器主动拉取信息需要Consumer.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s%n",
record.topic(),record.partition(),record.offset(), record.key(), record.value());
}
}
使用main方法启动consumerTester类再启动producerTester,在console就可以看到接收到的信息了。
--- exec-maven-plugin:1.2.1:exec (default-cli) @ KafkaDemo ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details
topic = my-topic, partition = 0, offset = 300, key = 0, value = 0
topic = my-topic, partition = 0, offset = 301, key = 1, value = 1
topic = my-topic, partition = 0, offset = 302, key = 2, value = 2
topic = my-topic, partition = 0, offset = 303, key = 3, value = 3
topic = my-topic, partition = 0, offset = 304, key = 4, value = 4
topic = my-topic, partition = 0, offset = 305, key = 5, value = 5
topic = my-topic, partition = 0, offset = 306, key = 6, value = 6
topic = my-topic, partition = 0, offset = 307, key = 7, value = 7
topic = my-topic, partition = 0, offset = 308, key = 8, value = 8
topic = my-topic, partition = 0, offset = 309, key = 9, value = 9
具体相关的配置可以看附件maven项目。
分享到:
相关推荐
apache-kafka-1.0.0 java Demo(附jar),只是简单的实现,没有使用连接池。
Apache Kafka实战.pdf..
Apache Kafka Apache Kafka Apache Kafka Apache Kafka
kafka-java-demo 基于java的kafka生产消费者示例。 mvn
Apache Kafka源码剖析 PDF较大,分5份上传!一起解压即可。
Spring for Apache Kafka API。 Spring for Apache Kafka 开发文档。
This book is here to help you get familiar with Apache Kafka and to solve your challenges related to the consumption of millions of messages in publisher-subscriber architectures. It is aimed at ...
Apache Kafka is a popular distributed streaming platform that acts as a messaging queue or an enterprise messaging system. It lets you publish and subscribe to a stream of records and process them in ...
Building Data Streaming Applications with Apache Kafka 英文azw3 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除
Streaming Architecture New Designs Using Apache Kafka and MapR Streams
Apache Kafka 官方文档中文版,Apache Kafka 官方文档中文版
apache kafka技术内幕 和 apacke kafka源码分析2本PDF 电子书 网盘下载
Kafka是一个对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。 1.Kafka集群包含一个或多个服务器,这种服务器被称为broker 2.Partition是物理上的概念,每个Topic...
。Apache Kafka Cookbook(PACKT,2015)
Learning Apache Kafka Second Edition provides you with step-by-step, practical examples that help you take advantage of the real power of Kafka and handle hundreds of megabytes of messages per second ...
kafka-java-demo 基于java的kafka生产消费者的例子。。。
with no previous knowledge of Apache Kafka. As the book progresses, the difficulty level increases. The second half of this cookbook is about configuration; this is advanced material for those who ...
Apache Kafka(带书签)