上一篇:dubbo 3.0服务调用者调用第三方接口源码主流程分析
zookeeper配置中心
zookeeper配置中心构建路径:
1、ZookeeperDynamicConfiguration构建
2、AbstractZookeeperTransporter的connect方法
3、Curator5ZookeeperTransporter的createZookeeperClient方法
4、Curator5ZookeeperClient构建,在构建时会创建具体的ZookeeperClient,后续对zk元数据的操作都基于这个Client;
public class ZookeeperDynamicConfiguration extends TreePathDynamicConfiguration {
private Executor executor;
private ZookeeperClient zkClient;
private CacheListener cacheListener;
private static final int DEFAULT_ZK_EXECUTOR_THREADS_NUM = 1;
private static final int DEFAULT_QUEUE = 10000;
private static final Long THREAD_KEEP_ALIVE_TIME = 0L;
ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
this.cacheListener = new CacheListener(rootPath);
final String threadName = this.getClass().getSimpleName();
/** 构建线程池,核心线程数和最大线程数:1,空闲被回收的时间:0 ,队列大小:10000 */
this.executor = new ThreadPoolExecutor(DEFAULT_ZK_EXECUTOR_THREADS_NUM, DEFAULT_ZK_EXECUTOR_THREADS_NUM,
THREAD_KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(DEFAULT_QUEUE),
new NamedThreadFactory(threadName, true),
new AbortPolicyWithReport(threadName, url));
/** 通过zookeeperTransporter的connect根据url与zk建立连接,拿到了一个ZookeeperClient */
zkClient = zookeeperTransporter.connect(url);
boolean isConnected = zkClient.isConnected();
if (!isConnected) {
throw new IllegalStateException("Failed to connect with zookeeper, pls check if url " + url + " is correct.");
}
}
/**
* @param key e.g., {service}.configurators, {service}.tagrouters, {group}.dubbo.properties
* @return
*/
@Override
public String getInternalProperty(String key) {
return zkClient.getContent(buildPathKey("", key));
}
@Override
protected void doClose() throws Exception {
// zkClient is shared in framework, should not close it here
// zkClient.close();
// See: org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter#destroy()
// All zk clients is created and destroyed in ZookeeperTransporter.
zkClient = null;
}
/**
* 发布配置
* @param pathKey
* @param content
* @return
* @throws Exception
*/
@Override
protected boolean doPublishConfig(String pathKey, String content) throws Exception {
/** 将配置项写入zk中,是否临时节点:false */
zkClient.create(pathKey, content, false);
return true;
}
/**
* 通过cas机制发布配置
* @param key
* @param group
* @param content
* @param ticket
* @return
*/
@Override
public boolean publishConfigCas(String key, String group, String content, Object ticket) {
try {
if (ticket != null && !(ticket instanceof Stat)) {
throw new IllegalArgumentException("zookeeper publishConfigCas requires stat type ticket");
}
/** 构建key */
String pathKey = buildPathKey(group, key);
/** 创建或者更新,是否临时节点:false */
zkClient.createOrUpdate(pathKey, content, false, ticket == null ? 0 : ((Stat) ticket).getVersion());
return true;
} catch (Exception e) {
logger.warn("zookeeper publishConfigCas failed.", e);
return false;
}
}
@Override
protected String doGetConfig(String pathKey) throws Exception {
return zkClient.getContent(pathKey);
}
@Override
public ConfigItem getConfigItem(String key, String group) {
String pathKey = buildPathKey(group, key);
return zkClient.getConfigItem(pathKey);
}
@Override
protected boolean doRemoveConfig(String pathKey) throws Exception {
zkClient.delete(pathKey);
return true;
}
@Override
protected Collection<String> doGetConfigKeys(String groupPath) {
return zkClient.getChildren(groupPath);
}
@Override
protected void doAddListener(String pathKey, ConfigurationListener listener) {
cacheListener.addListener(pathKey, listener);
zkClient.addDataListener(pathKey, cacheListener, executor);
}
@Override
protected void doRemoveListener(String pathKey, ConfigurationListener listener) {
cacheListener.removeListener(pathKey, listener);
Set<ConfigurationListener> configurationListeners = cacheListener.getConfigurationListeners(pathKey);
if (CollectionUtils.isNotEmpty(configurationListeners)) {
zkClient.removeDataListener(pathKey, cacheListener);
}
}
}
AbstractZookeeperTransporter的connect方法
public ZookeeperClient connect(URL url) {
ZookeeperClient zookeeperClient;
// address format: {[username:password@]address}
List<String> addressList = getURLBackupAddress(url);
// The field define the zookeeper server , including protocol, host, port, username, password
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
// avoid creating too many connections, so add lock
synchronized (zookeeperClientMap) {
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
/** 构建ZookeeperClient */
zookeeperClient = createZookeeperClient(url);
logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
writeToClientMap(addressList, zookeeperClient);
}
return zookeeperClient;
}
Curator5ZookeeperTransporter的createZookeeperClient方法
public ZookeeperClient createZookeeperClient(URL url) {
return new Curator5ZookeeperClient(url);
}
Curator5ZookeeperClient的构建
public class Curator5ZookeeperClient extends AbstractZookeeperClient<Curator5ZookeeperClient.NodeCacheListenerImpl, Curator5ZookeeperClient.CuratorWatcherImpl> {
protected static final Logger logger = LoggerFactory.getLogger(Curator5ZookeeperClient.class);
private static final Charset CHARSET = StandardCharsets.UTF_8;
private final CuratorFramework client;
private static Map<String, NodeCache> nodeCacheMap = new ConcurrentHashMap<>();
public Curator5ZookeeperClient(URL url) {
super(url);
try {
/** 超时时间,默认:30秒 */
int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
/** session过期时间,默认:60秒 */
int sessionExpireMs = url.getParameter(SESSION_KEY, DEFAULT_SESSION_TIMEOUT_MS);
/**
* 基于Curator框架去构建ZookeeperClient
* Curator目前是zk在使用中最为常用的框架,对原生zk的client做了一层包装
*/
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
/** 设置zk的url地址 */
.connectString(url.getBackupAddress())
/** 如果失败了间隔一秒钟重试一次 */
.retryPolicy(new RetryNTimes(1, 1000))
/** 设置连接超时时间 */
.connectionTimeoutMs(timeout)
/** 设置session会话过期时间 */
.sessionTimeoutMs(sessionExpireMs);
String userInformation = url.getUserInformation();
/** 权限控制 */
if (userInformation != null && userInformation.length() > 0) {
builder = builder.authorization("digest", userInformation.getBytes());
}
/** 构建client */
client = builder.build();
/**
* 添加监听器
* 跟zk的连接建立了之后,一般来说得关注一下跟这个zk之间的连接
* 如果跟zk的连接有断开此时是会通知的
*/
client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
/** 开始建立连接 */
client.start();
/** 阻塞住,置到跟zk成功建立连接,超时时间:30秒 */
boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
if (!connected) {
/** 如果没建立成功则抛异常 */
throw new IllegalStateException("zookeeper not connected");
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public void createPersistent(String path) {
try {
client.create().forPath(path);
} catch (NodeExistsException e) {
logger.warn("ZNode " + path + " already exists.", e);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public void createEphemeral(String path) {
try {
client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (NodeExistsException e) {
logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
" may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
"we can just try to delete and create again.", e);
deletePath(path);
createEphemeral(path);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
protected void createPersistent(String path, String data) {
byte[] dataBytes = data.getBytes(CHARSET);
try {
client.create().forPath(path, dataBytes);
} catch (NodeExistsException e) {
try {
client.setData().forPath(path, dataBytes);
} catch (Exception e1) {
throw new IllegalStateException(e.getMessage(), e1);
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
protected void createEphemeral(String path, String data) {
byte[] dataBytes = data.getBytes(CHARSET);
try {
client.create().withMode(CreateMode.EPHEMERAL).forPath(path, dataBytes);
} catch (NodeExistsException e) {
logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
" may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
"we can just try to delete and create again.", e);
deletePath(path);
createEphemeral(path, data);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
protected void update(String path, String data, int version) {
byte[] dataBytes = data.getBytes(CHARSET);
try {
client.setData().withVersion(version).forPath(path, dataBytes);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
protected void createOrUpdatePersistent(String path, String data, int version) {
try {
/** 检查节点是否存在 */
if (checkExists(path)) {
/** 如果存在则进行更新 */
update(path, data, version);
} else {
/** 创建节点 */
createPersistent(path, data);
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
protected void createOrUpdateEphemeral(String path, String data, int version) {
try {
if (checkExists(path)) {
update(path, data, version);
} else {
createEphemeral(path, data);
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
protected void deletePath(String path) {
try {
client.delete().deletingChildrenIfNeeded().forPath(path);
} catch (NoNodeException ignored) {
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public List<String> getChildren(String path) {
try {
return client.getChildren().forPath(path);
} catch (NoNodeException e) {
return null;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public boolean checkExists(String path) {
try {
if (client.checkExists().forPath(path) != null) {
return true;
}
} catch (Exception ignored) {
}
return false;
}
@Override
public boolean isConnected() {
return client.getZookeeperClient().isConnected();
}
@Override
public String doGetContent(String path) {
try {
byte[] dataBytes = client.getData().forPath(path);
return (dataBytes == null || dataBytes.length == 0) ? null : new String(dataBytes, CHARSET);
} catch (NoNodeException e) {
// ignore NoNode Exception.
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
return null;
}
@Override
public ConfigItem doGetConfigItem(String path) {
String content;
Stat stat;
try {
stat = new Stat();
byte[] dataBytes = client.getData().storingStatIn(stat).forPath(path);
content = (dataBytes == null || dataBytes.length == 0) ? null : new String(dataBytes, CHARSET);
} catch (NoNodeException e) {
return new ConfigItem();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
return new ConfigItem(content, stat);
}
@Override
public void doClose() {
super.doClose();
client.close();
}
@Override
public CuratorWatcherImpl createTargetChildListener(String path, ChildListener listener) {
return new CuratorWatcherImpl(client, listener, path);
}
@Override
public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
try {
return client.getChildren().usingWatcher(listener).forPath(path);
} catch (NoNodeException e) {
return null;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
protected NodeCacheListenerImpl createTargetDataListener(String path, DataListener listener) {
return new NodeCacheListenerImpl(client, listener, path);
}
@Override
protected void addTargetDataListener(String path, NodeCacheListenerImpl nodeCacheListener) {
this.addTargetDataListener(path, nodeCacheListener, null);
}
@Override
protected void addTargetDataListener(String path, NodeCacheListenerImpl nodeCacheListener, Executor executor) {
try {
NodeCache nodeCache = new NodeCache(client, path);
if (nodeCacheMap.putIfAbsent(path, nodeCache) != null) {
return;
}
if (executor == null) {
nodeCache.getListenable().addListener(nodeCacheListener);
} else {
nodeCache.getListenable().addListener(nodeCacheListener, executor);
}
nodeCache.start();
} catch (Exception e) {
throw new IllegalStateException("Add nodeCache listener for path:" + path, e);
}
}
@Override
protected void removeTargetDataListener(String path, NodeCacheListenerImpl nodeCacheListener) {
NodeCache nodeCache = nodeCacheMap.get(path);
if (nodeCache != null) {
nodeCache.getListenable().removeListener(nodeCacheListener);
}
nodeCacheListener.dataListener = null;
}
@Override
public void removeTargetChildListener(String path, CuratorWatcherImpl listener) {
listener.unwatch();
}
static class NodeCacheListenerImpl implements NodeCacheListener {
private CuratorFramework client;
private volatile DataListener dataListener;
private String path;
protected NodeCacheListenerImpl() {
}
public NodeCacheListenerImpl(CuratorFramework client, DataListener dataListener, String path) {
this.client = client;
this.dataListener = dataListener;
this.path = path;
}
@Override
public void nodeChanged() throws Exception {
ChildData childData = nodeCacheMap.get(path).getCurrentData();
String content = null;
EventType eventType;
if (childData == null) {
eventType = EventType.NodeDeleted;
} else if (childData.getStat().getVersion() == 0) {
content = new String(childData.getData(), CHARSET);
eventType = EventType.NodeCreated;
} else {
content = new String(childData.getData(), CHARSET);
eventType = EventType.NodeDataChanged;
}
dataListener.dataChanged(path, content, eventType);
}
}
static class CuratorWatcherImpl implements CuratorWatcher {
private CuratorFramework client;
private volatile ChildListener childListener;
private String path;
public CuratorWatcherImpl(CuratorFramework client, ChildListener listener, String path) {
this.client = client;
this.childListener = listener;
this.path = path;
}
protected CuratorWatcherImpl() {
}
public void unwatch() {
this.childListener = null;
}
@Override
public void process(WatchedEvent event) throws Exception {
// if client connect or disconnect to server, zookeeper will queue
// watched event(Watcher.Event.EventType.None, .., path = null).
if (event.getType() == Watcher.Event.EventType.None) {
return;
}
if (childListener != null) {
childListener.childChanged(path, client.getChildren().usingWatcher(this).forPath(path));
}
}
}
private class CuratorConnectionStateListener implements ConnectionStateListener {
private final long UNKNOWN_SESSION_ID = -1L;
private long lastSessionId;
private int timeout;
private int sessionExpireMs;
public CuratorConnectionStateListener(URL url) {
this.timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
this.sessionExpireMs = url.getParameter(SESSION_KEY, DEFAULT_SESSION_TIMEOUT_MS);
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
long sessionId = UNKNOWN_SESSION_ID;
try {
sessionId = client.getZookeeperClient().getZooKeeper().getSessionId();
} catch (Exception e) {
logger.warn("Curator client state changed, but failed to get the related zk session instance.");
}
if (state == ConnectionState.LOST) {
logger.warn("Curator zookeeper session " + Long.toHexString(lastSessionId) + " expired.");
Curator5ZookeeperClient.this.stateChanged(StateListener.SESSION_LOST);
} else if (state == ConnectionState.SUSPENDED) {
logger.warn("Curator zookeeper connection of session " + Long.toHexString(sessionId) + " timed out. " +
"connection timeout value is " + timeout + ", session expire timeout value is " + sessionExpireMs);
Curator5ZookeeperClient.this.stateChanged(StateListener.SUSPENDED);
} else if (state == ConnectionState.CONNECTED) {
lastSessionId = sessionId;
logger.info("Curator zookeeper client instance initiated successfully, session id is " + Long.toHexString(sessionId));
Curator5ZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
if (lastSessionId == sessionId && sessionId != UNKNOWN_SESSION_ID) {
logger.warn("Curator zookeeper connection recovered from connection lose, " +
"reuse the old session " + Long.toHexString(sessionId));
Curator5ZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
} else {
logger.warn("New session created after old session lost, " +
"old session " + Long.toHexString(lastSessionId) + ", new session " + Long.toHexString(sessionId));
lastSessionId = sessionId;
Curator5ZookeeperClient.this.stateChanged(StateListener.NEW_SESSION_CREATED);
}
}
}
}
/**
* just for unit test
*
* @return
*/
CuratorFramework getClient() {
return client;
}
}
Apollo配置中心
public class ApolloDynamicConfiguration implements DynamicConfiguration {
private static final Logger logger = LoggerFactory.getLogger(ApolloDynamicConfiguration.class);
private static final String APOLLO_ENV_KEY = "env";
private static final String APOLLO_ADDR_KEY = "apollo.meta";
private static final String APOLLO_CLUSTER_KEY = "apollo.cluster";
private static final String APOLLO_PROTOCOL_PREFIX = "http://";
private static final String APOLLO_APPLICATION_KEY = "application";
private static final String APOLLO_APPID_KEY = "app.id";
private final URL url;
private final Config dubboConfig;
private final ConfigFile dubboConfigFile;
private final ConcurrentMap<String, ApolloListener> listeners = new ConcurrentHashMap<>();
ApolloDynamicConfiguration(URL url) {
this.url = url;
// Instead of using Dubbo's configuration, I would suggest use the original configuration method Apollo provides.
String configEnv = url.getParameter(APOLLO_ENV_KEY);
String configAddr = getAddressWithProtocolPrefix(url);
String configCluster = url.getParameter(CLUSTER_KEY);
String configAppId = url.getParameter(APOLLO_APPID_KEY);
if (StringUtils.isEmpty(System.getProperty(APOLLO_ENV_KEY)) && configEnv != null) {
System.setProperty(APOLLO_ENV_KEY, configEnv);
}
if (StringUtils.isEmpty(System.getProperty(APOLLO_ADDR_KEY)) && !ANYHOST_VALUE.equals(url.getHost())) {
System.setProperty(APOLLO_ADDR_KEY, configAddr);
}
if (StringUtils.isEmpty(System.getProperty(APOLLO_CLUSTER_KEY)) && configCluster != null) {
System.setProperty(APOLLO_CLUSTER_KEY, configCluster);
}
if (StringUtils.isEmpty(System.getProperty(APOLLO_APPID_KEY)) && configAppId != null) {
System.setProperty(APOLLO_APPID_KEY, configAppId);
}
String namespace = url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP);
String apolloNamespace = StringUtils.isEmpty(namespace) ? url.getGroup(DEFAULT_GROUP) : namespace;
dubboConfig = ConfigService.getConfig(apolloNamespace);
dubboConfigFile = ConfigService.getConfigFile(apolloNamespace, ConfigFileFormat.Properties);
// Decide to fail or to continue when failed to connect to remote server.
boolean check = url.getParameter(CHECK_KEY, true);
if (dubboConfig.getSourceType() != ConfigSourceType.REMOTE) {
if (check) {
throw new IllegalStateException("Failed to connect to config center, the config center is Apollo, " +
"the address is: " + (StringUtils.isNotEmpty(configAddr) ? configAddr : configEnv));
} else {
logger.warn("Failed to connect to config center, the config center is Apollo, " +
"the address is: " + (StringUtils.isNotEmpty(configAddr) ? configAddr : configEnv) +
", will use the local cache value instead before eventually the connection is established.");
}
}
}
@Override
public void close() {
try {
listeners.clear();
} catch (UnsupportedOperationException e) {
logger.warn("Failed to close connect from config center, the config center is Apollo");
}
}
private String getAddressWithProtocolPrefix(URL url) {
String address = url.getBackupAddress();
if (StringUtils.isNotEmpty(address)) {
address = Arrays.stream(COMMA_SPLIT_PATTERN.split(address))
.map(addr -> {
if (addr.startsWith(APOLLO_PROTOCOL_PREFIX)) {
return addr;
}
return APOLLO_PROTOCOL_PREFIX + addr;
})
.collect(Collectors.joining(","));
}
return address;
}
/**
* Since all governance rules will lay under dubbo group, this method now always uses the default dubboConfig and
* ignores the group parameter.
*/
@Override
public void addListener(String key, String group, ConfigurationListener listener) {
ApolloListener apolloListener = listeners.computeIfAbsent(group + key, k -> createTargetListener(key, group));
apolloListener.addListener(listener);
dubboConfig.addChangeListener(apolloListener, Collections.singleton(key));
}
@Override
public void removeListener(String key, String group, ConfigurationListener listener) {
ApolloListener apolloListener = listeners.get(group + key);
if (apolloListener != null) {
apolloListener.removeListener(listener);
if (!apolloListener.hasInternalListener()) {
dubboConfig.removeChangeListener(apolloListener);
}
}
}
@Override
public String getConfig(String key, String group, long timeout) throws IllegalStateException {
if (StringUtils.isNotEmpty(group)) {
if (group.equals(url.getApplication())) {
return ConfigService.getAppConfig().getProperty(key, null);
} else {
return ConfigService.getConfig(group).getProperty(key, null);
}
}
return dubboConfig.getProperty(key, null);
}
/**
* Recommend specify namespace and group when using Apollo.
* <p>
* <dubbo:config-center namespace="governance" group="dubbo" />, 'dubbo=governance' is for governance rules while
* 'group=dubbo' is for properties files.
*
* @param key default value is 'dubbo.properties', currently useless for Apollo.
* @param group
* @param timeout
* @return
* @throws IllegalStateException
*/
@Override
public String getProperties(String key, String group, long timeout) throws IllegalStateException {
if (StringUtils.isEmpty(group)) {
return dubboConfigFile.getContent();
}
if (group.equals(url.getApplication())) {
return ConfigService.getConfigFile(APOLLO_APPLICATION_KEY, ConfigFileFormat.Properties).getContent();
}
ConfigFile configFile = ConfigService.getConfigFile(group, ConfigFileFormat.Properties);
if (configFile == null) {
throw new IllegalStateException("There is no namespace named " + group + " in Apollo.");
}
return configFile.getContent();
}
/**
* This method will be used by Configuration to get valid value at runtime.
* The group is expected to be 'app level', which can be fetched from the 'config.appnamespace' in url if necessary.
* But I think Apollo's inheritance feature of namespace can solve the problem .
*/
@Override
public String getInternalProperty(String key) {
return dubboConfig.getProperty(key, null);
}
/**
* Ignores the group parameter.
*
* @param key property key the native listener will listen on
* @param group to distinguish different set of properties
* @return
*/
private ApolloListener createTargetListener(String key, String group) {
return new ApolloListener();
}
public class ApolloListener implements ConfigChangeListener {
private Set<ConfigurationListener> listeners = new CopyOnWriteArraySet<>();
ApolloListener() {
}
@Override
public void onChange(com.ctrip.framework.apollo.model.ConfigChangeEvent changeEvent) {
for (String key : changeEvent.changedKeys()) {
ConfigChange change = changeEvent.getChange(key);
if ("".equals(change.getNewValue())) {
logger.warn("an empty rule is received for " + key + ", the current working rule is " +
change.getOldValue() + ", the empty rule will not take effect.");
return;
}
ConfigChangedEvent event = new ConfigChangedEvent(key, change.getNamespace(), change.getNewValue(), getChangeType(change));
listeners.forEach(listener -> listener.process(event));
}
}
private ConfigChangeType getChangeType(ConfigChange change) {
if (change.getChangeType() == PropertyChangeType.DELETED) {
return ConfigChangeType.DELETED;
}
return ConfigChangeType.MODIFIED;
}
void addListener(ConfigurationListener configurationListener) {
this.listeners.add(configurationListener);
}
void removeListener(ConfigurationListener configurationListener) {
this.listeners.remove(configurationListener);
}
boolean hasInternalListener() {
return listeners != null && listeners.size() > 0;
}
}
}
Nacos配置中心
public class NacosDynamicConfiguration implements DynamicConfiguration {
private static final String GET_CONFIG_KEYS_PATH = "/v1/cs/configs";
private final Logger logger = LoggerFactory.getLogger(getClass());
/**
* the default timeout in millis to get config from nacos
*/
private static final long DEFAULT_TIMEOUT = 5000L;
private Properties nacosProperties;
/**
* The nacos configService
*/
private final NacosConfigServiceWrapper configService;
private HttpAgent httpAgent;
/**
* The map store the key to {@link NacosConfigListener} mapping
*/
private final Map<String, NacosConfigListener> watchListenerMap;
private MD5Utils md5Utils = new MD5Utils();
NacosDynamicConfiguration(URL url) {
this.nacosProperties = buildNacosProperties(url);
this.configService = buildConfigService(url);
this.httpAgent = getHttpAgent(configService.getConfigService());
watchListenerMap = new ConcurrentHashMap<>();
}
private NacosConfigServiceWrapper buildConfigService(URL url) {
ConfigService configService = null;
try {
configService = NacosFactory.createConfigService(nacosProperties);
} catch (NacosException e) {
if (logger.isErrorEnabled()) {
logger.error(e.getErrMsg(), e);
}
throw new IllegalStateException(e);
}
return new NacosConfigServiceWrapper(configService);
}
private HttpAgent getHttpAgent(ConfigService configService) {
HttpAgent agent = null;
try {
Field field = configService.getClass().getDeclaredField("agent");
field.setAccessible(true);
agent = (HttpAgent) field.get(configService);
} catch (Exception e) {
throw new IllegalStateException(e);
}
return agent;
}
private Properties buildNacosProperties(URL url) {
Properties properties = new Properties();
setServerAddr(url, properties);
setProperties(url, properties);
return properties;
}
private void setServerAddr(URL url, Properties properties) {
StringBuilder serverAddrBuilder =
new StringBuilder(url.getHost()) // Host
.append(':')
.append(url.getPort()); // Port
// Append backup parameter as other servers
String backup = url.getParameter(BACKUP_KEY);
if (backup != null) {
serverAddrBuilder.append(',').append(backup);
}
String serverAddr = serverAddrBuilder.toString();
properties.put(SERVER_ADDR, serverAddr);
}
private static void setProperties(URL url, Properties properties) {
// Get the parameters from constants
Map<String, String> parameters = url.getParameters(of(PropertyKeyConst.class));
// Put all parameters
properties.putAll(parameters);
}
private static void putPropertyIfAbsent(URL url, Properties properties, String propertyName) {
String propertyValue = url.getParameter(propertyName);
if (StringUtils.isNotEmpty(propertyValue)) {
properties.setProperty(propertyName, propertyValue);
}
}
private static void putPropertyIfAbsent(URL url, Properties properties, String propertyName, String defaultValue) {
String propertyValue = url.getParameter(propertyName);
if (StringUtils.isNotEmpty(propertyValue)) {
properties.setProperty(propertyName, propertyValue);
} else {
properties.setProperty(propertyName, defaultValue);
}
}
/**
* Ignores the group parameter.
*
* @param key property key the native listener will listen on
* @param group to distinguish different set of properties
* @return
*/
private NacosConfigListener createTargetListener(String key, String group) {
NacosConfigListener configListener = new NacosConfigListener();
configListener.fillContext(key, group);
return configListener;
}
@Override
public void close() throws Exception {
configService.shutdown();
}
@Override
public void addListener(String key, String group, ConfigurationListener listener) {
String listenerKey = buildListenerKey(key, group);
NacosConfigListener nacosConfigListener =
watchListenerMap.computeIfAbsent(listenerKey, k -> createTargetListener(key, group));
nacosConfigListener.addListener(listener);
try {
configService.addListener(key, group, nacosConfigListener);
} catch (NacosException e) {
logger.error(e.getMessage());
}
}
@Override
public void removeListener(String key, String group, ConfigurationListener listener) {
String listenerKey = buildListenerKey(key, group);
NacosConfigListener eventListener = watchListenerMap.get(listenerKey);
if (eventListener != null) {
eventListener.removeListener(listener);
}
}
@Override
public String getConfig(String key, String group, long timeout) throws IllegalStateException {
try {
long nacosTimeout = timeout < 0 ? getDefaultTimeout() : timeout;
if (StringUtils.isEmpty(group)) {
group = DEFAULT_GROUP;
}
return configService.getConfig(key, group, nacosTimeout);
} catch (NacosException e) {
logger.error(e.getMessage());
}
return null;
}
@Override
public ConfigItem getConfigItem(String key, String group) {
String content = getConfig(key, group);
String casMd5 = "";
if (StringUtils.isNotEmpty(content)) {
casMd5 = md5Utils.getMd5(content);
}
return new ConfigItem(content, casMd5);
}
@Override
public Object getInternalProperty(String key) {
try {
return configService.getConfig(key, DEFAULT_GROUP, getDefaultTimeout());
} catch (NacosException e) {
logger.error(e.getMessage());
}
return null;
}
@Override
public boolean publishConfig(String key, String group, String content) {
boolean published = false;
try {
published = configService.publishConfig(key, group, content);
} catch (NacosException e) {
logger.error(e.getErrMsg(), e);
}
return published;
}
@Override
public boolean publishConfigCas(String key, String group, String content, Object ticket) {
try {
if (!(ticket instanceof String)) {
throw new IllegalArgumentException("nacos publishConfigCas requires string type ticket");
}
return configService.publishConfigCas(key, group, content, (String) ticket);
} catch (NacosException e) {
logger.warn("nacos publishConfigCas failed.", e);
return false;
}
}
@Override
public long getDefaultTimeout() {
return DEFAULT_TIMEOUT;
}
/**
* TODO Nacos does not support atomic update of the value mapped to a key.
*
* @param group the specified group
* @return
*/
@Override
public SortedSet<String> getConfigKeys(String group) {
// TODO use Nacos Client API to replace HTTP Open API
SortedSet<String> keys = new TreeSet<>();
try {
Map<String, String> paramsValues = new HashMap<>();
paramsValues.put("search", "accurate");
paramsValues.put("dataId", "");
paramsValues.put("group", group.replace(SLASH_CHAR, HYPHEN_CHAR));
paramsValues.put("pageNo", "1");
paramsValues.put("pageSize", String.valueOf(Integer.MAX_VALUE));
String encoding = getProperty(ENCODE, "UTF-8");
HttpRestResult<String> result = httpAgent.httpGet(GET_CONFIG_KEYS_PATH, emptyMap(), paramsValues, encoding, 5 * 1000);
Stream<String> keysStream = toKeysStream(result.getData());
if (keysStream != null) {
keysStream.forEach(keys::add);
}
} catch (Exception e) {
if (logger.isErrorEnabled()) {
logger.error(e.getMessage(), e);
}
}
return keys;
}
@Override
public boolean removeConfig(String key, String group) {
boolean removed = false;
try {
removed = configService.removeConfig(key, group);
} catch (NacosException e) {
if (logger.isErrorEnabled()) {
logger.error(e.getMessage(), e);
}
}
return removed;
}
private Stream<String> toKeysStream(String content) {
JSONObject jsonObject = JSON.parseObject(content);
if (jsonObject == null) {
return null;
}
JSONArray pageItems = jsonObject.getJSONArray("pageItems");
if (pageItems == null) {
return null;
}
return pageItems.stream()
.map(object -> (JSONObject) object)
.map(json -> json.getString("dataId"));
}
private String getProperty(String name, String defaultValue) {
return nacosProperties.getProperty(name, defaultValue);
}
public class NacosConfigListener extends AbstractSharedListener {
private Set<ConfigurationListener> listeners = new CopyOnWriteArraySet<>();
/**
* cache data to store old value
*/
private Map<String, String> cacheData = new ConcurrentHashMap<>();
@Override
public Executor getExecutor() {
return null;
}
/**
* receive
*
* @param dataId data ID
* @param group group
* @param configInfo content
*/
@Override
public void innerReceive(String dataId, String group, String configInfo) {
String oldValue = cacheData.get(dataId);
ConfigChangedEvent event = new ConfigChangedEvent(dataId, group, configInfo, getChangeType(configInfo, oldValue));
if (configInfo == null) {
cacheData.remove(dataId);
} else {
cacheData.put(dataId, configInfo);
}
listeners.forEach(listener -> listener.process(event));
}
void addListener(ConfigurationListener configurationListener) {
this.listeners.add(configurationListener);
}
void removeListener(ConfigurationListener configurationListener) {
this.listeners.remove(configurationListener);
}
private ConfigChangeType getChangeType(String configInfo, String oldValue) {
if (StringUtils.isBlank(configInfo)) {
return ConfigChangeType.DELETED;
}
if (StringUtils.isBlank(oldValue)) {
return ConfigChangeType.ADDED;
}
return ConfigChangeType.MODIFIED;
}
}
protected String buildListenerKey(String key, String group) {
return key + HYPHEN_CHAR + group;
}
}
本文暂时没有评论,来添加一个吧(●'◡'●)