编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

Pulsar集群搭建(快速入门篇)(pulsar架构)

wxchong 2024-09-02 03:41:18 开源技术 16 ℃ 0 评论

集群规划

搭建 Pulsar 集群至少需要 3 个组件:ZooKeeper 集群、BookKeeper 集群和 broker 集群(Broker 是 Pulsar 的自身实例)。这三个集群组件如下:

bigdata12

bigdata13

bigdata14

ZooKeeper

ZooKeeper

ZooKeeper

BookKeeper

BookKeeper

BookKeeper

Broker

Broker

Broker

集群部署

安装JDK1.8(略)

安装ZooKeeper(略)

pulsar 环境准备

解压安装包

[bigdata@bigdata12 software]$ tar -zxvf apache-pulsar-2.6.4-bin.tar.gz -C /opt/module/

bigdata@bigdata12 software]$ cd ../module/
[bigdata@bigdata12 module]$ mv apache-pulsar-2.6.4 pulsar-2.6.4

启动zk集群

[bigdata@bigdata12 software]$ zkcluster.sh start
[bigdata@bigdata12 software]$ zkcluster.sh status

初始化元数据

在任意一个ZooKeeper节点初始化元数据。

[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar initialize-cluster-metadata \
  --cluster pulsar-cluster-zk \
  --zookeeper bigdata12:2181 \
  --configuration-store bigdata12:2181 \
  --web-service-url http://bigdata12:8088 \
  --web-service-url-tls https://bigdata12:8443 \
  --broker-service-url pulsar://bigdata12:6650 \
  --broker-service-url-tls pulsar+ssl://bigdata12:6651

如果初始化成功,会有下面的提示:

19:41:47.168 [main] INFO org.apache.pulsar.PulsarClusterMetadataSetup - Cluster metadata for 'pulsar-cluster-zk' setup correctly

查看元数据是否初始化成功

如果zk下面有这些目录就说明初始化成功

[zk: localhost:2181(CONNECTED) 0] ls /admin/clusters/pulsar-cluster-zk 
[zk: localhost:2181(CONNECTED) 1] ls /namespace 

部署BookKeeper

部署bookies

你能够通过配置文件 conf/bookkeeper.conf 去配置 BookKeeper bookies。 配置 bookies 最重要的一步,是要确保zkServers设置为 Pulsar 集群的的本地 Zookeeper 集群的连接信息。

修改配置文件 bookkeeper.conf

[bigdata@bigdata12 conf]$ vi bookkeeper.conf
# advertisedAddress 修改为服务器对应的ip,在另外两台服务器也做对应的修改
advertisedAddress=bigdata12

# 修改以下两个文件目录地址
journalDirectories=/opt/module/pulsar-2.6.4/tmp/journal
ledgerDirectories=/opt/module/pulsar-2.6.4/tmp/ledger

# 修改zk地址和端口信息
zkServers=bigdata12:2181,bigdata13:2181,bigdata14:2181

初始化bookies元数据

# 执行初始化元数据命令;若出现提示,输入 Y,继续(只需在一个bookie节点执行一次)
[bigdata@bigdata12 pulsar-2.6.4]$ bin/bookkeeper shell metaformat

部署Broker集群

修改配置文件 broker.conf

[bigdata@bigdata12 conf]$ vi broker.conf
# 修改集群名,和 ZooKeeper 里初始化元数据时指定的集群名(--cluster pulsar-cluster-zk)相同
clusterName=pulsar-cluster-zk

# 修改如下两个配置,指定的都是 ZooKeeper 集群地址和端口号
zookeeperServers=bigdata12:2181,bigdata13:2181,bigdata14:2181
configurationStoreServers=bigdata12:2181,bigdata13:2181,bigdata14:2181

# 修改如下参数为本服务器ip地址,另外两个 broker 节点配置文件也做对应修改
advertisedAddress=bigdata12

分发安装包

[bigdata@bigdata12 module]$ xsync pulsar-2.6.4

修改bookkeeper.conf和broker.conf

修改配置文件中的 advertisedAddress=bigdata13 和 advertisedAddress=bigdata14

注: advertisedAddress 不得重复

修改client.conf

修改配置文件中的client.conf,将localhost改成本机IP

[bigdata@bigdata12 pulsar-2.6.4]$ vi conf/client.conf 
webServiceUrl=http://bigdata12:8088/
brokerServiceUrl=pulsar://bigdata12:6650/

启动集群

启动bookie集群

# 以后台进程启动bookie
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-daemon start bookie
# 验证是否启动成功
[bigdata@bigdata12 pulsar-2.6.4]$ bin/bookkeeper shell bookiesanity

[bigdata@bigdata13 pulsar-2.6.4]$ bin/pulsar-daemon start bookie
[bigdata@bigdata13 pulsar-2.6.4]$ bin/bookkeeper shell bookiesanity
[bigdata@bigdata14 pulsar-2.6.4]$ bin/pulsar-daemon start bookie
[bigdata@bigdata14 pulsar-2.6.4]$ bin/bookkeeper shell bookiesanity

启动broker集群

[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-daemon start broker
[bigdata@bigdata13 pulsar-2.6.4]$ bin/pulsar-daemon start broker
[bigdata@bigdata14 pulsar-2.6.4]$ bin/pulsar-daemon start broker

# 查看集群 brokers 节点情况
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-admin brokers list pulsar-cluster

群起脚本

#!/bin/bash
if [ $# -lt 1 ]
then
  echo "no args"
  exit;
fi

case $1 in
"start")
  echo "=================== 启动 bookie 集群 ==================="
  for i in bigdata12 bigdata13 bigdata14
  do
	echo "--------------- 启动 $i ---------------"
	ssh $i "/opt/module/pulsar-2.6.4/bin/pulsar-daemon start bookie"
  done
  
  echo "=================== 启动 broker 集群 ==================="
  for i in bigdata12 bigdata13 bigdata14
  do
	echo "--------------- 启动 $i ---------------"
	ssh $i "/opt/module/pulsar-2.6.4/bin/pulsar-daemon start broker"
  done
;;


"stop")
  echo "=================== 关闭 bookie 集群 ==================="
  for i in bigdata12 bigdata13 bigdata14
  do
	echo "--------------- 关闭 $i ---------------"
	ssh $i "/opt/module/pulsar-2.6.4/bin/pulsar-daemon stop bookie"
  done
  
  echo "=================== 关闭 broker 集群 ==================="
  for i in bigdata12 bigdata13 bigdata14
  do
	echo "--------------- 关闭 $i ---------------"
	ssh $i "/opt/module/pulsar-2.6.4/bin/pulsar-daemon stop broker"
  done
;;

*)
  echo "Input args error"
  echo "Input start or stop"
;;

esac

验证集群

# 创建租户
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-admin tenants create first-tenant

# 创建命名空间(命名空间名:first-tenant/first-ns1,它指定了租户 first-tenant)
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-admin namespaces create first-tenant/first-ns1

# 创建持久性分区topic(topic全名:persistent://first-tenant/first-ns1/first-topic;分区数为 3)
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-admin topics create-partitioned-topic persistent://first-tenant/first-ns1/my-topic -p 3

# 订阅test主题(-n:消费的消息数量,-s:订阅的名字,-t:订阅的类型)
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-client consume persistent://first-tenant/first-ns1/first-topic -n 100 -s "consumer-first-topic" -t "Exclusive"

# 发消息到first-topic主题(-n:发送消息的次数,-m:消息内容)
[bigdata@bigdata13 pulsar-2.6.4]$ bin/pulsar-client produce persistent://first-tenant/first-ns1/first-topic -n 10 -m "Hello Pulsar"

Pulsar的API操作

在pom文件中添加如下内容

    <dependencies>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>2.6.4</version>
        </dependency>
    </dependencies>

准备log4j.properties文件

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/pulsar.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n

生产者

package com.yunclass.pulsar;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

public class ProducerAnalysis {
    private static final String brokerList = "pulsar://bigdata12:6650,bigdata13:6650,bigdata14:6650";

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient client = PulsarClient.builder().serviceUrl(brokerList).build();

        Producer<byte[]> producerAnalysis = client.newProducer().topic("persistent://scott/ns1/topic-partition").create();

        String msg = "Hello Pulsar!";
        long start = System.currentTimeMillis();

        MessageId msgId = producerAnalysis.send(msg.getBytes());
        System.out.println("spend = " + (System.currentTimeMillis() - start) + ";send a message msgId =" + msgId.toString());

        producerAnalysis.close();
        client.close();
    }
}

消费者

package com.yunclass.pulsar;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

public class ConsumerAnalysis {

    private static final String brokerList = "pulsar://bigdata12:6650,bigdata13:6650,bigdata14:6650";

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient client = PulsarClient.builder().serviceUrl(brokerList).build();

        Consumer<byte[]> consumer = client.newConsumer()
                .topic("persistent://scott/ns1/topic-partition")
                .subscriptionName("ConsumerAnalysis")
                .subscribe();

        while (true) {
            Message<byte[]> receive = consumer.receive();
            System.out.println("consumer-Message recevied:" + new String(receive.getData()));

            // 确认消息,以便broker删除消息
            consumer.acknowledge(receive);
        }

    }
}

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表