网站首页 > 开源技术 正文
序
本文主要研究一下debezium的ChangeEventQueue
ChangeEventQueueMetrics
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java
public interface ChangeEventQueueMetrics {
?
int totalCapacity();
?
int remainingCapacity();
}
- ChangeEventQueueMetrics接口定义了totalCapacity、remainingCapacity方法
ChangeEventQueue
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java
public class ChangeEventQueue<T> implements ChangeEventQueueMetrics {
?
private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventQueue.class);
?
private final Duration pollInterval;
private final int maxBatchSize;
private final int maxQueueSize;
private final BlockingQueue<T> queue;
private final Metronome metronome;
private final Supplier<PreviousContext> loggingContextSupplier;
?
private volatile RuntimeException producerException;
?
private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier<LoggingContext.PreviousContext> loggingContextSupplier) {
this.pollInterval = pollInterval;
this.maxBatchSize = maxBatchSize;
this.maxQueueSize = maxQueueSize;
this.queue = new LinkedBlockingDeque<>(maxQueueSize);
this.metronome = Metronome.sleeper(pollInterval, Clock.SYSTEM);
this.loggingContextSupplier = loggingContextSupplier;
}
?
public static class Builder<T> {
?
private Duration pollInterval;
private int maxQueueSize;
private int maxBatchSize;
private Supplier<LoggingContext.PreviousContext> loggingContextSupplier;
?
public Builder<T> pollInterval(Duration pollInterval) {
this.pollInterval = pollInterval;
return this;
}
?
public Builder<T> maxQueueSize(int maxQueueSize) {
this.maxQueueSize = maxQueueSize;
return this;
}
?
public Builder<T> maxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
return this;
}
?
public Builder<T> loggingContextSupplier(Supplier<LoggingContext.PreviousContext> loggingContextSupplier) {
this.loggingContextSupplier = loggingContextSupplier;
return this;
}
?
public ChangeEventQueue<T> build() {
return new ChangeEventQueue<T>(pollInterval, maxQueueSize, maxBatchSize, loggingContextSupplier);
}
}
?
/**
* Enqueues a record so that it can be obtained via {@link #poll()}. This method
* will block if the queue is full.
*
* @param record
* the record to be enqueued
* @throws InterruptedException
* if this thread has been interrupted
*/
public void enqueue(T record) throws InterruptedException {
if (record == null) {
return;
}
?
// The calling thread has been interrupted, let's abort
if (Thread.interrupted()) {
throw new InterruptedException();
}
?
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Enqueuing source record '{}'", record);
}
?
// this will also raise an InterruptedException if the thread is interrupted while waiting for space in the queue
queue.put(record);
}
?
/**
* Returns the next batch of elements from this queue. May be empty in case no
* elements have arrived in the maximum waiting time.
*
* @throws InterruptedException
* if this thread has been interrupted while waiting for more
* elements to arrive
*/
public List<T> poll() throws InterruptedException {
LoggingContext.PreviousContext previousContext = loggingContextSupplier.get();
?
try {
LOGGER.debug("polling records...");
List<T> records = new ArrayList<>();
final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
while (!timeout.expired() && queue.drainTo(records, maxBatchSize) == 0) {
throwProducerExceptionIfPresent();
?
LOGGER.debug("no records available yet, sleeping a bit...");
// no records yet, so wait a bit
metronome.pause();
LOGGER.debug("checking for more records...");
}
return records;
}
finally {
previousContext.restore();
}
}
?
public void producerException(final RuntimeException producerException) {
this.producerException = producerException;
}
?
private void throwProducerExceptionIfPresent() {
if (producerException != null) {
throw producerException;
}
}
?
@Override
public int totalCapacity() {
return maxQueueSize;
}
?
@Override
public int remainingCapacity() {
return queue.remainingCapacity();
}
}
- ChangeEventQueue实现了ChangeEventQueueMetrics接口,其构造器创建了BlockingQueue、Metronome,并接收了loggingContextSupplier;其enqueue方法执行queue.put(record);其poll方法先通过loggingContextSupplier.get()获取previousContext,之后创建timeout,并while循环执行queue.drainTo(records, maxBatchSize)及metronome.pause(),直到timeout.expired()或者queue.drainTo(records, maxBatchSize) == 0为false,最后执行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()
Threads
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Threads.java
public class Threads {
?
//......
?
public static interface TimeSince {
/**
* Reset the elapsed time to 0.
*/
void reset();
?
/**
* Get the time that has elapsed since the last call to {@link #reset() reset}.
*
* @return the number of milliseconds
*/
long elapsedTime();
}
?
public static interface Timer {
?
/**
* @return true if current time is greater than start time plus requested time period
*/
boolean expired();
?
Duration remaining();
}
?
public static Timer timer(Clock clock, Duration time) {
final TimeSince start = timeSince(clock);
start.reset();
?
return new Timer() {
?
@Override
public boolean expired() {
return start.elapsedTime() > time.toMillis();
}
?
@Override
public Duration remaining() {
return time.minus(start.elapsedTime(), ChronoUnit.MILLIS);
}
};
}
?
public static TimeSince timeSince(Clock clock) {
return new TimeSince() {
private long lastTimeInMillis;
?
@Override
public void reset() {
lastTimeInMillis = clock.currentTimeInMillis();
}
?
@Override
public long elapsedTime() {
long elapsed = clock.currentTimeInMillis() - lastTimeInMillis;
return elapsed <= 0L ? 0L : elapsed;
}
};
}
?
//......
?
}
- Threads定义了Timer接口,该接口定义了expired、remaining方法;timer方法先通过timeSince创建TimeSince,然后创建一个匿名Timer
LoggingContext
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/LoggingContext.java
public class LoggingContext {
?
/**
* The key for the connector type MDC property.
*/
public static final String CONNECTOR_TYPE = "dbz.connectorType";
/**
* The key for the connector logical name MDC property.
*/
public static final String CONNECTOR_NAME = "dbz.connectorName";
/**
* The key for the connector context name MDC property.
*/
public static final String CONNECTOR_CONTEXT = "dbz.connectorContext";
?
private LoggingContext() {
}
?
/**
* A snapshot of an MDC context that can be {@link #restore()}.
*/
public static final class PreviousContext {
private static final Map<String, String> EMPTY_CONTEXT = Collections.emptyMap();
private final Map<String, String> context;
?
protected PreviousContext() {
Map<String, String> context = MDC.getCopyOfContextMap();
this.context = context != null ? context : EMPTY_CONTEXT;
}
?
/**
* Restore this logging context.
*/
public void restore() {
MDC.setContextMap(context);
}
}
?
//......
?
}
- LoggingContext定义了PreviousContext,其构造器使用MDC.getCopyOfContextMap()拷贝的当前的MDC,其restore方法把之前拷贝的MDC数据再次设置到MDC中
Metronome
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Metronome.java
@FunctionalInterface
public interface Metronome {
?
public void pause() throws InterruptedException;
?
public static Metronome sleeper(Duration period, Clock timeSystem) {
long periodInMillis = period.toMillis();
return new Metronome() {
private long next = timeSystem.currentTimeInMillis() + periodInMillis;
?
@Override
public void pause() throws InterruptedException {
for (;;) {
final long now = timeSystem.currentTimeInMillis();
if (next <= now) {
break;
}
Thread.sleep(next - now);
}
next = next + periodInMillis;
}
?
@Override
public String toString() {
return "Metronome (sleep for " + periodInMillis + " ms)";
}
};
}
?
//......
?
}
- Metronome接口定义了pause方法;它提供了sleeper静态方法用于创建匿名的Metronome实现类,该实现类的pause方法通过Thread.sleep来实现pause
小结
ChangeEventQueue实现了ChangeEventQueueMetrics接口,其构造器创建了BlockingQueue、Metronome,并接收了loggingContextSupplier;其enqueue方法执行queue.put(record);其poll方法先通过loggingContextSupplier.get()获取previousContext,之后创建timeout,并while循环执行queue.drainTo(records, maxBatchSize)及metronome.pause(),直到timeout.expired()或者queue.drainTo(records, maxBatchSize) == 0为false,最后执行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()
doc
- ChangeEventQueue
- 上一篇: 多线程下的调用上下文 : CallContext
- 下一篇: 简直不要太硬了!一文带你彻底理解文件系统
猜你喜欢
- 2024-10-07 MySQL 数据库优化方案及参数详解(值得收藏)(上)
- 2024-10-07 一文看懂 Linux 系统结构(linux的系统结构)
- 2024-10-07 带你真正认识Linux 系统结构(简述linux的系统结构)
- 2024-10-07 UBIFS 根文件系统制作(如何制作根文件系统)
- 2024-10-07 简直不要太硬了!一文带你彻底理解文件系统
- 2024-10-07 多线程下的调用上下文 : CallContext
- 2024-10-07 通过IDoc来实现公司间STO的外向交货单过账后自动触发内向交货单2
- 2024-10-07 SAP 用户权限(sap用户权限怎么设置)
- 2024-10-07 使用Python编写量子线路打印项目,并使用Sphinx生成API文档
- 2024-10-07 不是Typescript用不起,而是JSDoc更有性价比?
你 发表评论:
欢迎- 最近发表
-
- 后端服务太慢?试试这 7 招(后端 服务端 区别)
- 做一个适合二次开发的低代码平台,把程序员从curd中解脱出来-1
- Caffeine缓存 最快缓存 内存缓存(caffeine缓存使用)
- Java性能优化的10大策略(java性能调优从哪几个方面入手)
- New Balance M576PGT 全新配色设计
- x-cmd pkg | qrencode - 二维码生成工具
- 平和精英抽奖概率是多少 平和精英抽奖物品一览
- x-cmd pkg | tmux - 开源终端多路复用器(terminal multiplexer)
- 漫威官方App中文版上线:全站漫画限时免费
- macOS Monterey 12.7.4 (21H1123) 正式版发布,ISO、IPSW、PKG 下载
- 标签列表
-
- jdk (81)
- putty (66)
- rufus (78)
- 内网穿透 (89)
- okhttp (70)
- powertoys (74)
- windowsterminal (81)
- netcat (65)
- ghostscript (65)
- veracrypt (65)
- asp.netcore (70)
- wrk (67)
- aspose.words (80)
- itk (80)
- ajaxfileupload.js (66)
- sqlhelper (67)
- express.js (67)
- phpmailer (67)
- xjar (70)
- redisclient (78)
- wakeonlan (66)
- tinygo (85)
- startbbs (72)
- webftp (82)
- vsvim (79)
本文暂时没有评论,来添加一个吧(●'◡'●)