集群规划
搭建 Pulsar 集群至少需要 3 个组件:ZooKeeper 集群、BookKeeper 集群和 broker 集群(Broker 是 Pulsar 的自身实例)。这三个集群组件如下:
bigdata12 | bigdata13 | bigdata14 |
ZooKeeper | ZooKeeper | ZooKeeper |
BookKeeper | BookKeeper | BookKeeper |
Broker | Broker | Broker |
集群部署
安装JDK1.8(略)
安装ZooKeeper(略)
pulsar 环境准备
解压安装包
[bigdata@bigdata12 software]$ tar -zxvf apache-pulsar-2.6.4-bin.tar.gz -C /opt/module/
bigdata@bigdata12 software]$ cd ../module/
[bigdata@bigdata12 module]$ mv apache-pulsar-2.6.4 pulsar-2.6.4
启动zk集群
[bigdata@bigdata12 software]$ zkcluster.sh start
[bigdata@bigdata12 software]$ zkcluster.sh status
初始化元数据
在任意一个ZooKeeper节点初始化元数据。
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar initialize-cluster-metadata \
--cluster pulsar-cluster-zk \
--zookeeper bigdata12:2181 \
--configuration-store bigdata12:2181 \
--web-service-url http://bigdata12:8088 \
--web-service-url-tls https://bigdata12:8443 \
--broker-service-url pulsar://bigdata12:6650 \
--broker-service-url-tls pulsar+ssl://bigdata12:6651
如果初始化成功,会有下面的提示:
19:41:47.168 [main] INFO org.apache.pulsar.PulsarClusterMetadataSetup - Cluster metadata for 'pulsar-cluster-zk' setup correctly
查看元数据是否初始化成功
如果zk下面有这些目录就说明初始化成功
[zk: localhost:2181(CONNECTED) 0] ls /admin/clusters/pulsar-cluster-zk
[zk: localhost:2181(CONNECTED) 1] ls /namespace
部署BookKeeper
部署bookies
你能够通过配置文件 conf/bookkeeper.conf 去配置 BookKeeper bookies。 配置 bookies 最重要的一步,是要确保zkServers设置为 Pulsar 集群的的本地 Zookeeper 集群的连接信息。
修改配置文件 bookkeeper.conf
[bigdata@bigdata12 conf]$ vi bookkeeper.conf
# advertisedAddress 修改为服务器对应的ip,在另外两台服务器也做对应的修改
advertisedAddress=bigdata12
# 修改以下两个文件目录地址
journalDirectories=/opt/module/pulsar-2.6.4/tmp/journal
ledgerDirectories=/opt/module/pulsar-2.6.4/tmp/ledger
# 修改zk地址和端口信息
zkServers=bigdata12:2181,bigdata13:2181,bigdata14:2181
初始化bookies元数据
# 执行初始化元数据命令;若出现提示,输入 Y,继续(只需在一个bookie节点执行一次)
[bigdata@bigdata12 pulsar-2.6.4]$ bin/bookkeeper shell metaformat
部署Broker集群
修改配置文件 broker.conf
[bigdata@bigdata12 conf]$ vi broker.conf
# 修改集群名,和 ZooKeeper 里初始化元数据时指定的集群名(--cluster pulsar-cluster-zk)相同
clusterName=pulsar-cluster-zk
# 修改如下两个配置,指定的都是 ZooKeeper 集群地址和端口号
zookeeperServers=bigdata12:2181,bigdata13:2181,bigdata14:2181
configurationStoreServers=bigdata12:2181,bigdata13:2181,bigdata14:2181
# 修改如下参数为本服务器ip地址,另外两个 broker 节点配置文件也做对应修改
advertisedAddress=bigdata12
分发安装包
[bigdata@bigdata12 module]$ xsync pulsar-2.6.4
修改bookkeeper.conf和broker.conf
修改配置文件中的 advertisedAddress=bigdata13 和 advertisedAddress=bigdata14
注: advertisedAddress 不得重复
修改client.conf
修改配置文件中的client.conf,将localhost改成本机IP
[bigdata@bigdata12 pulsar-2.6.4]$ vi conf/client.conf
webServiceUrl=http://bigdata12:8088/
brokerServiceUrl=pulsar://bigdata12:6650/
启动集群
启动bookie集群
# 以后台进程启动bookie
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-daemon start bookie
# 验证是否启动成功
[bigdata@bigdata12 pulsar-2.6.4]$ bin/bookkeeper shell bookiesanity
[bigdata@bigdata13 pulsar-2.6.4]$ bin/pulsar-daemon start bookie
[bigdata@bigdata13 pulsar-2.6.4]$ bin/bookkeeper shell bookiesanity
[bigdata@bigdata14 pulsar-2.6.4]$ bin/pulsar-daemon start bookie
[bigdata@bigdata14 pulsar-2.6.4]$ bin/bookkeeper shell bookiesanity
启动broker集群
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-daemon start broker
[bigdata@bigdata13 pulsar-2.6.4]$ bin/pulsar-daemon start broker
[bigdata@bigdata14 pulsar-2.6.4]$ bin/pulsar-daemon start broker
# 查看集群 brokers 节点情况
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-admin brokers list pulsar-cluster
群起脚本
#!/bin/bash
if [ $# -lt 1 ]
then
echo "no args"
exit;
fi
case $1 in
"start")
echo "=================== 启动 bookie 集群 ==================="
for i in bigdata12 bigdata13 bigdata14
do
echo "--------------- 启动 $i ---------------"
ssh $i "/opt/module/pulsar-2.6.4/bin/pulsar-daemon start bookie"
done
echo "=================== 启动 broker 集群 ==================="
for i in bigdata12 bigdata13 bigdata14
do
echo "--------------- 启动 $i ---------------"
ssh $i "/opt/module/pulsar-2.6.4/bin/pulsar-daemon start broker"
done
;;
"stop")
echo "=================== 关闭 bookie 集群 ==================="
for i in bigdata12 bigdata13 bigdata14
do
echo "--------------- 关闭 $i ---------------"
ssh $i "/opt/module/pulsar-2.6.4/bin/pulsar-daemon stop bookie"
done
echo "=================== 关闭 broker 集群 ==================="
for i in bigdata12 bigdata13 bigdata14
do
echo "--------------- 关闭 $i ---------------"
ssh $i "/opt/module/pulsar-2.6.4/bin/pulsar-daemon stop broker"
done
;;
*)
echo "Input args error"
echo "Input start or stop"
;;
esac
验证集群
# 创建租户
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-admin tenants create first-tenant
# 创建命名空间(命名空间名:first-tenant/first-ns1,它指定了租户 first-tenant)
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-admin namespaces create first-tenant/first-ns1
# 创建持久性分区topic(topic全名:persistent://first-tenant/first-ns1/first-topic;分区数为 3)
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-admin topics create-partitioned-topic persistent://first-tenant/first-ns1/my-topic -p 3
# 订阅test主题(-n:消费的消息数量,-s:订阅的名字,-t:订阅的类型)
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-client consume persistent://first-tenant/first-ns1/first-topic -n 100 -s "consumer-first-topic" -t "Exclusive"
# 发消息到first-topic主题(-n:发送消息的次数,-m:消息内容)
[bigdata@bigdata13 pulsar-2.6.4]$ bin/pulsar-client produce persistent://first-tenant/first-ns1/first-topic -n 10 -m "Hello Pulsar"
Pulsar的API操作
在pom文件中添加如下内容
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.6.4</version>
</dependency>
</dependencies>
准备log4j.properties文件
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/pulsar.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n
生产者
package com.yunclass.pulsar;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
public class ProducerAnalysis {
private static final String brokerList = "pulsar://bigdata12:6650,bigdata13:6650,bigdata14:6650";
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder().serviceUrl(brokerList).build();
Producer<byte[]> producerAnalysis = client.newProducer().topic("persistent://scott/ns1/topic-partition").create();
String msg = "Hello Pulsar!";
long start = System.currentTimeMillis();
MessageId msgId = producerAnalysis.send(msg.getBytes());
System.out.println("spend = " + (System.currentTimeMillis() - start) + ";send a message msgId =" + msgId.toString());
producerAnalysis.close();
client.close();
}
}
消费者
package com.yunclass.pulsar;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
public class ConsumerAnalysis {
private static final String brokerList = "pulsar://bigdata12:6650,bigdata13:6650,bigdata14:6650";
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder().serviceUrl(brokerList).build();
Consumer<byte[]> consumer = client.newConsumer()
.topic("persistent://scott/ns1/topic-partition")
.subscriptionName("ConsumerAnalysis")
.subscribe();
while (true) {
Message<byte[]> receive = consumer.receive();
System.out.println("consumer-Message recevied:" + new String(receive.getData()));
// 确认消息,以便broker删除消息
consumer.acknowledge(receive);
}
}
}
本文暂时没有评论,来添加一个吧(●'◡'●)