IDEA本地搭建Pulsar调试环境
部署zookeeper
本地debug部署个单机版的就够了,参考此教程。注意在zoo.cfg
文件中配置下admin.serverPort
,默认是8080
端口,会和Pulsar Broker的默认HTTP端口冲突,这里改为8888
端口,完整配置如下:
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
admin.serverPort=8888
启动broker
在conf/broker.conf
配置文件中,将metadataStoreUrl
改为上一步中启动的zk的地址localhost:2181
,如下:
# The metadata store URL
# Examples:
# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
# * my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 (will default to ZooKeeper when the schema is not specified)
# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181/my-chroot-path (to add a ZK chroot path)
metadataStoreUrl=zk:localhost:2181
创建Pulsar Cluster
找到PulsarAdminTool类,运行配置如下:
conf/client.conf --admin-url http://localhost:8080 clusters create cluster-1
创建tenant
找到PulsarAdminTool类,创建名为public的租户,运行配置如下:
conf/client.conf --admin-url http://localhost:8080 tenants create public
创建namespace
找到PulsarAdminTool类,在public租户下创建名为default的namespace,运行配置如下:
conf/client.conf --admin-url http://localhost:8080 namespaces create public/default
消费者消费消息
找到PulsarClientTool类,接收非持久化消息,运行配置如下:
conf/client.conf consume non-persistent://public/default/example-np-topic --subscription-name np-sub
生产者发送消息
找到PulsarClientTool类,发送一个非持久化消息,运行配置如下:
conf/client.conf produce non-persistent://public/default/example-np-topic --num-produce 1 --messages "This message will be stored only in memory"
此时会在consumer的控制台看到日志打印,说明成功收到了消息:
生产消费持久化消息
以上步骤介绍的是非持久化消息的生产和消费,持久化消息需要BookKeeper。不论是从源码编译,还是直接安装,BookKeeper目前均不支持Windows,具体见此issue,可以在WSL中编译。这里不涉及BookKeeper源码的debug,就直接按照官方教程安装。
下载bk压缩包
从https://bookkeeper.apache.org/releases下载压缩包,解压。
单机部署bk
在本地单机部署时,需要运行localbookie
命令,详见https://bookkeeper.apache.org/docs/getting-started/run-locally。这里会出现两个问题:
无法启动JVM,报错日志如下:
Unrecognized VM option 'PrintGCDateStamps' Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit.
这个问题,在这里有提及。我们直接在启动脚本
bookkeeper
中加一下-XX:+IgnoreUnrecognizedVMOptions
即可:... elif [ ${COMMAND} == "localbookie" ]; then NUMBER=$1 shift exec ${JAVA} ${OPTS} ${JMX_ARGS} -XX:+IgnoreUnrecognizedVMOptions -Dzookeeper.4lw.commands.whitelist='*' org.apache.bookkeeper.util.LocalBookKeeper ${NUMBER} ${BOOKIE_CONF} $@ ...
信息此问题,我在bookkeeper官方仓库中提了个issue2181端口绑定失败,这是因为单机版本的bk会自己创建并启动一个zk server,与文章开头我们自己创建的zk冲突了,停掉我们自己创建的zk即可。
消费者消费持久化消息
conf/client.conf consume persistent://public/default/example-topic --subscription-name p-sub
生产者生产持久化消息
conf/client.conf produce persistent://public/default/example-topic --num-produce 1 --messages "This message will be stored in bk"
独立版的Pulsar
根据Pulsar官方文档的描述,独立版的Pulsar用来本地开发和测试:
独立版Pulsar的启动类是org.apache.pulsar.PulsarStandaloneStarter
,这里踩了个坑:由于项目放在C盘,启动时创建RocksDB相关的文件时,会报权限不足的错:
2022-06-03T10:39:29,275 - ERROR - [io-write-scheduler-OrderedScheduler-0-0:RocksdbKVStore@195] - Failed to restore checkpoint: Checkpoint{ID='3014ae6e-5297-45aa-b688-2450d309691f', createdAt: 0 null}
org.apache.bookkeeper.statelib.api.exceptions.StateStoreException: Failed to create dir 000000000000000000/000000000000000000/000000000000000000
at org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.CheckpointInfo$1.restore(CheckpointInfo.java:81) ~[statelib-4.15.0.jar:4.15.0]
at org.apache.bookkeeper.statelib.impl.kv.RocksdbKVStore.loadRocksdbFromCheckpointStore(RocksdbKVStore.java:184) ~[statelib-4.15.0.jar:4.15.0]
at org.apache.bookkeeper.statelib.impl.kv.RocksdbKVStore.init(RocksdbKVStore.java:286) ~[statelib-4.15.0.jar:4.15.0]
at org.apache.bookkeeper.statelib.impl.journal.AbstractStateStoreWithJournal.lambda$initializeLocalStore$5(AbstractStateStoreWithJournal.java:216) ~[statelib-4.15.0.jar:4.15.0]
at org.apache.bookkeeper.statelib.impl.journal.AbstractStateStoreWithJournal.lambda$executeIO$17(AbstractStateStoreWithJournal.java:526) ~[statelib-4.15.0.jar:4.15.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131) [guava-31.0.1-jre.jar:?]
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74) [guava-31.0.1-jre.jar:?]
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82) [guava-31.0.1-jre.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.76.Final.jar:4.1.76.Final]
at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: java.nio.file.FileSystemException: C:\Users\boatrain\research\pulsar\data\standalone\bookkeeper\ranges\data\ranges\000000000000000000\000000000000000000\000000000000000000\current: A required privilege is not held by the client
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) ~[?:?]
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) ~[?:?]
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108) ~[?:?]
at sun.nio.fs.WindowsFileSystemProvider.createSymbolicLink(WindowsFileSystemProvider.java:598) ~[?:?]
at java.nio.file.Files.createSymbolicLink(Files.java:1069) ~[?:?]
at org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.CheckpointInfo.updateCurrent(CheckpointInfo.java:126) ~[statelib-4.15.0.jar:4.15.0]
at org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.CheckpointInfo$1.restore(CheckpointInfo.java:79) ~[statelib-4.15.0.jar:4.15.0]
... 15 more
解决方法就是以管理员启动IDEA,然后再启动org.apache.pulsar.PulsarStandaloneStarter
: