前言
在前面的文章中为大家详细介绍了Hollow的数据模型以及深入分析了Hollow的内存布局。本文将详细介绍Hollow的生产消费模型。其中,Producer和Consumer概念是整个项目的核心,而HollowProducer和HollowConsumer分别代表Hollow的生产者和消费者。
希望大家通过本文可以充分的了解Hollow的生产消费模型。本文中会涉及很多Hollow中定义的名词,可以参考【Netflix Hollow系列】Hollow的一些术语。
@空歌白石 原创。
生产者与消费者
在进程世界里,生产者就是生产数据的进程,消费者就是消费数据的进程。在多进程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。
为了解决这个问题于是引入了生产者和消费者模式。本小节,首先将通过介绍操作系统和Kafka来介绍典型的生产消费模型。
操作系统的生产者消费者模型
首先我们来看下操作系统中的生产者消费者模型,生产者消费者问题是多个进程因共享一个缓存区而产生的相互依赖的问题。生产者进程需要向缓冲区中存放数据,消费者进程从共享缓存中取数据。当缓存区满的时候,生产者进程需要等待消费者进程取走数据,当缓存区空的时候,消费者进程需要等待生产者放入数据。
上述进程间的合作和依赖关系在整个计算机系统中非常普遍,比较典型的就是磁盘驱动程序和上层应用通过共享缓存区来交换数据。具体来讲,磁盘驱动程序是生产者进程,要将磁盘中读取的数据存放在共享缓存区中,上层应用程序是消费者进程,要从共享缓存区中取走数据并交给更上层的应用或用户来处理。
生产者和消费者之间如何通信呢?是的,信号量,操作系统通过信号量来解决互斥和同步的问题。
- 互斥:有节缓冲区是一个临界资源,对临界资源得访问需要设置一个信号量mutex。
- 同步:设置两个同步信号量empty和full,其初值分别为n、0,用于临界资源得具体数目,同时也可以表示等待条件。
kafka的生产者消费者模型
kafka流行的消息中间件,最初由LinkedIn开源,生产者生产(produce)各种信息,消费者消费(consume)(处理分析)这些信息,而在生产者与消费者之间,需要一个沟通两者的桥梁-消息系统。从一个微观层面来说,这种需求也可理解为不同的系统之间如何传递消息。
kafka实现了两种消息订阅模式:
点对点模式
基于拉取或者轮询的消息传送模型,这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。生产者将消息放入消息队列后,由消费者主动的去拉取消息进行消费。 优点是消费者拉取消息的频率可以由自己控制。但是消息队列是否有消息需要消费,在消费者端无法感知,所以在消费者端需要额外的线程去监控。
发布订阅模式
发布订阅模式是一个基于消息推送的消息模型,这种模型可以有多种不同的订阅者。生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者。由于是消费者被动接收推送,所以无需感知消息队列是否有待消费的消息!
但是各个消费者由于机器性能不一样,所以处理消息的能力也会不一样,但消息队列却无法感知消费者消费的速度!所以推送的速度成了发布订阅模模式的一个问题!假设三个消费者处理速度分别是3M/s、2M/s、1M/s,如果队列推送的速度为2M/s,则第三个消费者将无法承受!如果队列推送的速度为1M/s,则前两个消费者会出现资源的极大浪费!
生产者消费者模型实现方式
实现生产者消费者模型会有有两种方式:
wait—notify
方式实现生产者消费者模型,也就是上文中kafka中实现的点对点模式(注意这里需要加同步锁synchronized
)。阻塞队列
方式实现生产者消费者模式,也就是上文中kafka实现的发布订阅模型。
以上两种方式也称之为 pull
和 push
方式,在不同的系统中可能有不同的名臣或概念。
Hollow实现的生产者消费者
HollowProducer和HollowConsumer分别代表Hollow的生产者和消费者,可以直接使用HollowProducer和HollowConsumer完成生产和消费的任务。
其中HollowReadStateEngine和HollowWriteStateEngine分别是HollowProducer和HollowConsumer的底层数据处理类,如果觉得Hollow提供的Producer和Consumer不满足需求,可以基于HollowReadStateEngine和HollowWriteStateEngine实现自己的Producer和Consumer。
Producer与Consumer交互
通过上文介绍可以看出,Producer与Consumer之间的联系首先通过Announcer和AnnouncementWatcher相互关联的。但是具体的两者之间如何实现交互呢?
Producer通过Announcer发出通知,但是Hollow并没有限定如何通知,可以通过Producer主动推送消息到Consumer,也可以由Consumer通过不断的轮训获取通知。
Consumer借助于AnnouncementWatcher接收通知消息,Consumer接收消息的通知方式由Producer提供的消息发送方式决定,可以通过Pull或Push的方式完成。
如Hollow默认提供的HollowFilesystemAnnouncementWatcher就是通过定义一个名为Watch的Runnable,不断的扫描Producer存放文件的文件夹判断是否有新的Blob产生。我们也可以自己实现如S3AnnouncementWatcher、MySQLAnnouncementWatcher等,实现HollowConsumer.AnnouncementWatcher的subscribeToUpdates方法,并定义自己的通知机制即可。
Push方式
使用Push方式Producer和Consumer之间通信,通常会有使用消息队列的方式完成。其中流行的消息中间件包括:
Pull方式
使用Pull方式Producer和Consumer之间通信,在Consumer轮询Producer的特定接口。这里会涉及两部分中间件:
轮询方式
Consumer的轮询可以借助于定时器实现,比较典型的包括:
- JDK的
ScheduledExecutorService
和Timer
- 定时器中间件:quartz
Consumer使用定时器轮询Producer提供的RPC接口。
RPC
RPC的方式有很多,典型的直接使用HTTP、TCP访问,也可以使用中间件Dubbo等实现。
HollowProducer
HollowProducer
的负责数据的生产工作,Hollow在HollowProducer中定义了Producer职责,除了核心的Publisher外,还定义了额外的附加机制,如下:
- 发布者(BlobPublisher)
- 通知(Announcer)
- Validator
- Listener
- 文件存储目录
- BLOB压缩器
- 按照BLOB版本查询器
- 自定义Executor
- 全量SNAPSHOT版本保留的份数
- 清理机制
- 监控收集器
Producer按照一定周期生成全量以及增量数据,在每个周期结束前会创建一个数据集(可以称之为BLOB),并将这个数据集合发送到某种存储介质上(可以是本地磁盘、S3、DB、Redis等),最后发布一个通知,告知Consumer数据已经Ready,可以来获取数据了。
可以使用如下方式初始化Producer,除Publisher外,其他都是可选项。
HollowProducer producer = HollowProducer
.withPublisher(publisher) /// required: a BlobPublisher
.withAnnouncer(announcer) /// optional: an Announcer
.withValidators(validators) /// optional: one or more Validator
.withListeners(listeners) /// optional: one or more HollowProducerListeners
.withBlobStagingDir(dir) /// optional: a java.io.File
.withBlobCompressor(compressor) /// optional: a BlobCompressor
.withBlobStager(stager) /// optional: a BlobStager
.withSnapshotPublishExecutor(e) /// optional: a java.util.concurrent.Executor
.withNumStatesBetweenSnapshots(n) /// optional: an int
.withTargetMaxTypeShardSize(size) /// optional: a long
.withBlobStorageCleaner(blobStorageCleaner) /// optional: a BlobStorageCleaner
.withMetricsCollector(hollowMetricsCollector) /// optional: a HollowMetricsCollector<HollowProducerMetrics>
.build();
接下来我们详细介绍HollowProducer的属性
Properties
首先让我们看下HollowProducer的构造方法。可以看出属性在上文中已经有所阐述,还请大家特别留意HollowWriteStateEngine
,HollowWriteStateEngine
是HollowProducer
的底层核心类,负责数据的写入职责。本文中不再详细展开,后续会有专门文章进行分析,也包括下文中HollowConsumer
的底层类HollowReadStateEngine
。
private AbstractHollowProducer( HollowProducer.BlobStager blobStager, HollowProducer.Publisher publisher, HollowProducer.Announcer announcer, List<? extends HollowProducerEventListener> eventListeners, HollowProducer.VersionMinter versionMinter, Executor snapshotPublishExecutor, int numStatesBetweenSnapshots, long targetMaxTypeShardSize, boolean focusHoleFillInFewestShards, HollowMetricsCollector<HollowProducerMetrics> metricsCollector, HollowProducer.BlobStorageCleaner blobStorageCleaner, SingleProducerEnforcer singleProducerEnforcer, HollowObjectHashCodeFinder hashCodeFinder, boolean doIntegrityCheck) {
this.publisher = publisher;
this.announcer = announcer;
this.versionMinter = versionMinter;
this.blobStager = blobStager;
this.singleProducerEnforcer = singleProducerEnforcer;
this.snapshotPublishExecutor = snapshotPublishExecutor;
this.numStatesBetweenSnapshots = numStatesBetweenSnapshots;
this.hashCodeFinder = hashCodeFinder;
this.doIntegrityCheck = doIntegrityCheck;
HollowWriteStateEngine writeEngine = hashCodeFinder == null
? new HollowWriteStateEngine()
: new HollowWriteStateEngine(hashCodeFinder);
writeEngine.setTargetMaxTypeShardSize(targetMaxTypeShardSize);
writeEngine.setFocusHoleFillInFewestShards(focusHoleFillInFewestShards);
this.objectMapper = new HollowObjectMapper(writeEngine);
if (hashCodeFinder != null) {
objectMapper.doNotUseDefaultHashKeys();
}
this.readStates = ReadStateHelper.newDeltaChain();
this.blobStorageCleaner = blobStorageCleaner;
this.listeners = new ProducerListenerSupport(eventListeners.stream().distinct().collect(toList()));
this.metrics = new HollowProducerMetrics();
this.metricsCollector = metricsCollector;
}
HollowProducer
有很多的get或set方法,本文着重介绍两个重载的initializeDataModel
方法。当服务启动时,HollowProducer可以基于指定的HolowSchema初始化数据模型,也可以使用具体的class初始化数据模型。
这确保恢复可以正确地将生产者的当前数据模型与恢复的数据状态的数据模型进行比较,并管理这些模型中的任何差异(例如不恢复生产者当前数据模型中不存在的恢复数据模型中的任何类型的状态) )。初始化后,将向所有已注册的数据模型初始化侦听器DataModelInitializationListener
发出数据模型初始化事件。
@Override
public void initializeDataModel(Class<?>... classes) {
super.initializeDataModel(classes);
}
@Override
public void initializeDataModel(HollowSchema... schemas) {
super.initializeDataModel(schemas);
}
Populator
表示在 HollowProducer
循环中填充新数据状态的任务。
@FunctionalInterface
public interface Populator {
void populate(HollowProducer.WriteState newState) throws Exception;
}
WriteState
WriteState
负责向StateEngine写入数据,
/** * Representation of new write state. */
public interface WriteState {
int add(Object o) throws IllegalStateException;
HollowObjectMapper getObjectMapper() throws IllegalStateException;
HollowWriteStateEngine getStateEngine() throws IllegalStateException;
ReadState getPriorState() throws IllegalStateException;
long getVersion() throws IllegalStateException;
}
ReadState
从新写入状态的数量计算的读取状态表示。负责提供HollowProducer读取当前数据状态的接口。
public interface ReadState {
long getVersion();
HollowReadStateEngine getStateEngine();
}
BlobStager
BlobStager
提供了读取全量、增量、反向增量以及头数据的接口,参数是具体的版本号。
public interface BlobStager {
HollowProducer.Blob openSnapshot(long version);
HollowProducer.HeaderBlob openHeader(long version);
HollowProducer.Blob openDelta(long fromVersion, long toVersion);
HollowProducer.Blob openReverseDelta(long fromVersion, long toVersion);
}
BlobCompressor
public interface BlobCompressor {
BlobCompressor NO_COMPRESSION = new BlobCompressor() {
@Override
public OutputStream compress(OutputStream os) {
return os;
}
@Override
public InputStream decompress(InputStream is) {
return is;
}
};
/** * This method provides an opportunity to wrap the OutputStream used to write the blob (e.g. with a GZIPOutputStream). * * @param is the uncompressed output stream * @return the compressed output stream */
OutputStream compress(OutputStream is);
/** * This method provides an opportunity to wrap the InputStream used to write the blob (e.g. with a GZIPInputStream). * * @param is the compressed input stream * @return the uncompressed input stream */
InputStream decompress(InputStream is);
}
BlobPublisher(Publisher)
BlobPublisher负责将数据集打包并发布到存储中。Hollow提供的Publisher默认实现为HollowFilesystemPublisher。 举例来讲,通过BlobPublisher可以将一份全量的城市数据,写入到文件中,并将文件上传的S3.
public interface Publisher {
/** * Deprecated - Should use {@link Publisher#publish(PublishArtifact)} instead.<p> * Publish the blob specified to this publisher's blobstore. * <p> * It is guaranteed that {@code blob} was created by calling one of * {@link BlobStager#openSnapshot(long)}, {@link BlobStager#openDelta(long, long)}, or * {@link BlobStager#openReverseDelta(long, long)} on this publisher. * * @param blob the blob to publish */
@Deprecated
default void publish(HollowProducer.Blob blob) {
publish((PublishArtifact)blob);
}
/** * Publish the blob specified to this publisher's blobstore. * <p> * It is guaranteed that {@code blob} was created by calling one of * {@link BlobStager#openSnapshot(long)}, {@link BlobStager#openDelta(long, long)}, * {@link BlobStager#openReverseDelta(long, long)}, or * {@link BlobStager#openHeader(long)} on this publisher. * * @param publishArtifact the blob to publish */
default void publish(HollowProducer.PublishArtifact publishArtifact) {
if (publishArtifact instanceof HollowProducer.Blob) {
publish((HollowProducer.Blob)publishArtifact);
}
}
}
PublishArtifact
public interface PublishArtifact {
void cleanup();
void write(HollowBlobWriter blobWriter) throws IOException;
InputStream newInputStream() throws IOException;
@Deprecated
default File getFile() {
throw new UnsupportedOperationException("File is not available");
}
default Path getPath() {
throw new UnsupportedOperationException("Path is not available");
}
}
public static abstract class HeaderBlob implements PublishArtifact{
protected final long version;
protected HeaderBlob(long version) {
this.version = version;
}
public long getVersion() {
return version;
}
}
public static abstract class Blob implements PublishArtifact {
protected final long fromVersion;
protected final long toVersion;
protected final Blob.Type type;
protected final ProducerOptionalBlobPartConfig optionalPartConfig;
protected Blob(long fromVersion, long toVersion, Blob.Type type) {
this(fromVersion, toVersion, type, null);
}
protected Blob(long fromVersion, long toVersion, Blob.Type type, ProducerOptionalBlobPartConfig optionalPartConfig) {
this.fromVersion = fromVersion;
this.toVersion = toVersion;
this.type = type;
this.optionalPartConfig = optionalPartConfig;
}
public InputStream newOptionalPartInputStream(String partName) throws IOException {
throw new UnsupportedOperationException("The provided HollowProducer.Blob implementation does not support optional blob parts");
}
public Path getOptionalPartPath(String partName) {
throw new UnsupportedOperationException("The provided HollowProducer.Blob implementation does not support optional blob parts");
}
public Type getType() {
return this.type;
}
public long getFromVersion() {
return this.fromVersion;
}
public long getToVersion() {
return this.toVersion;
}
public ProducerOptionalBlobPartConfig getOptionalPartConfig() {
return optionalPartConfig;
}
/** * Hollow blob types are {@code SNAPSHOT}, {@code DELTA} and {@code REVERSE_DELTA}. */
public enum Type {
SNAPSHOT("snapshot"),
DELTA("delta"),
REVERSE_DELTA("reversedelta");
public final String prefix;
Type(String prefix) {
this.prefix = prefix;
}
}
}
Announcer
Announcer负责通知的职责,可以通过Announcer获取到当前最准确的数据集信息(包括全量版本、增量版本、反向增量版本)。
Announcer被定义为一个Interface,在使用过程中需要实现具体的Announcer的announce方法。Hollow提供的Announcer默认实现为HollowFilesystemAnnouncer。
举例来讲,可以借助QMQ实现一个Announcer,当数据集构建完成后,通过QMQ将消息发送给consumer;另外一个例子,也可以将Announcer具体实现为一个RPC,由consumer来调用。
public interface Announcer {
void announce(long stateVersion);
default void announce(long stateVersion, Map<String, String> metadata) {
announce(stateVersion);
}
}
HollowWriteStateEngine 在两个阶段之间来回循环:
- 添加记录(add HollowRecord)
- 写状态(write State)
Version
Hollow默认提供的版本管理是由Producer的VersionMinuter负责管理的。version = 时间戳 + 重复的 3 位序列号
public interface VersionMinter {
/** * Create a new state version. * <p> * State versions should be ascending -- later states have greater versions.<p> * * @return a new state version */
long mint();
}
BlobStorageCleaner
BlobStorageCleaner
负责BLOB文件的清理工作,清理的规则可以通过实现接口定义。
public static abstract class BlobStorageCleaner {
public void clean(Blob.Type blobType) {
switch (blobType) {
case SNAPSHOT:
cleanSnapshots();
break;
case DELTA:
cleanDeltas();
break;
case REVERSE_DELTA:
cleanReverseDeltas();
break;
}
}
/** * This method provides an opportunity to remove old snapshots. */
public abstract void cleanSnapshots();
/** * This method provides an opportunity to remove old deltas. */
public abstract void cleanDeltas();
/** * This method provides an opportunity to remove old reverse deltas. */
public abstract void cleanReverseDeltas();
}
Incremental
Incremental
是处理Hollow增量数据的核心接口,填充给定数据更改(添加、修改或删除)的空心数据的生产者,称为增量生产。 HollowProducer填充所有给定的数据,然后确定更改。
通常在其他方面HollowProducer
和 HollowProducer.Incremental
在Publish和Announcer的实现上具有相同的行为。如果需要特殊的指定,则可以通过实现Incremental
接口完成即可。
public static class Incremental extends AbstractHollowProducer {
protected Incremental(Builder<?> b) {
super(b);
}
@Override
public HollowProducer.ReadState restore(long versionDesired, HollowConsumer.BlobRetriever blobRetriever) {
return super.hardRestore(versionDesired, blobRetriever);
}
public long runIncrementalCycle(Incremental.IncrementalPopulator task) {
return runCycle(task, null);
}
@FunctionalInterface
public interface IncrementalPopulator {
void populate(IncrementalWriteState state) throws Exception;
}
public interface IncrementalWriteState {
void addOrModify(Object o);
void addIfAbsent(Object o);
void delete(Object o);
void delete(RecordPrimaryKey key);
}
}
至此,HollowProducer
的全部功能基本都已经介绍完了,接下来,将介绍HollowConsumer
的实现。
HollowConsumer
HollowConsumer
的负责数据的消费工作,Hollow在HollowConsumer中定义了Consumer职责,除了核心的Retriever外,还定义了额外的附加机制,如下:
- 文件接收器(BlobRetriever)
- 通知观察者,监听器(AnnouncementWatcher)
- 本地磁盘目录,接收到BLOB文件后,需要存储的路径
- RefreshListener
- generated client,这个是一个重点的概念,简单理解为本身HollowConsumer会提供HollowAPI,通过HollowAPI可以访问Hollow编码或压缩后的数据,无需自己再实现类似于Map的缓存结构。
- FilterConfig,过滤器
- DoubleSnapshotConfig,是否保存双份全量数据
- 自定义Executor
- 监控收集器
Consumer需要始终与Producer保持一定的联系,以便于让自身的数据保持与Producer同步。Consumer在初始化时需要从Producer获取一份全量数据,再初始化完成后,为保持数据的新鲜度,需要不断的消费增量数据。
可以使用如下方式初始化HollowConsumer,除BlobRetriever外,其他都是可选项。
HollowConsumer consumer = HollowConsumer
.withBlobRetriever(blobRetriever) /// required: a BlobRetriever
.withLocalBlobStore(localDiskDir) /// optional: a local disk location
.withAnnouncementWatcher(announcementWatcher) /// optional: a AnnouncementWatcher
.withRefreshListener(refreshListener) /// optional: a RefreshListener
.withGeneratedAPIClass(MyGeneratedAPI.class) /// optional: a generated client API class
.withFilterConfig(filterConfig) /// optional: a HollowFilterConfig
.withDoubleSnapshotConfig(doubleSnapshotCfg) /// optional: a DoubleSnapshotConfig
.withObjectLongevityConfig(objectLongevityCfg) /// optional: an ObjectLongevityConfig
.withObjectLongevityDetector(detector) /// optional: an ObjectLongevityDetector
.withRefreshExecutor(refreshExecutor) /// optional: an Executor
.withMetricsCollector(hollowMetricsCollector) /// optional: a HollowMetricsCollector<HollowConsumerMetrics>
.build();
Properties
HollowConsumer
的构造方法。
protected <B extends Builder<B>> HollowConsumer(B builder) {
// duplicated with HollowConsumer(...) constructor above. We cannot chain constructor calls because that
// constructor subscribes to the announcement watcher and we have more setup to do first
this.metrics = new HollowConsumerMetrics();
this.updater = new HollowClientUpdater(builder.blobRetriever,
builder.refreshListeners,
builder.apiFactory,
builder.doubleSnapshotConfig,
builder.hashCodeFinder,
builder.memoryMode,
builder.objectLongevityConfig,
builder.objectLongevityDetector,
metrics,
builder.metricsCollector);
updater.setFilter(builder.typeFilter);
if(builder.skipTypeShardUpdateWithNoAdditions)
updater.setSkipShardUpdateWithNoAdditions(true);
this.announcementWatcher = builder.announcementWatcher;
this.refreshExecutor = builder.refreshExecutor;
this.refreshLock = new ReentrantReadWriteLock();
this.memoryMode = builder.memoryMode;
if (announcementWatcher != null)
announcementWatcher.subscribeToUpdates(this);
}
HollowConsumer
也有众多的get和set方法,本文着重介绍getInitialLoad
方法,此方式实现类在Consumer端的异步刷新功能,对于消费者消费数据至关重要。
/** * Returns a {@code CompletableFuture} that completes after the initial data load succeeds. Also triggers the initial * load asynchronously, to avoid waiting on a polling interval for the initial load. * <p> * Callers can use methods like {@link CompletableFuture#join()} or {@link CompletableFuture#get(long, TimeUnit)} * to block until the initial load is complete. * <p> * A failure during the initial load <em>will not</em> cause the future to complete exceptionally; this allows * for a subsequent data version to eventually succeed. * <p> * In a consumer without published or announced versions – or one that always fails the initial load – the future * will remain incomplete indefinitely. * * @return a future which, when completed, has a value set to the data version that was initially loaded */
public CompletableFuture<Long> getInitialLoad() {
try {
triggerAsyncRefresh();
} catch (RejectedExecutionException | NullPointerException e) {
LOG.log(Level.INFO, "Refresh triggered by getInitialLoad() failed; future attempts might succeed", e);
}
return updater.getInitialLoad();
}
Refresh Trigger
Refresh Trigger
负责触发刷新到 HollowConsumer.AnnouncementWatcher
指定的最新版本。
- 如果已经在最新版本上,则此操作是无操作的。
- 如果
HollowConsumer.AnnouncementWatcher
不存在,则此调用会触发刷新到 Blob 存储中可用的最新版本。
特别注意:Refresh Trigger
都是阻塞的操作。
public void triggerRefresh() {
refreshLock.writeLock().lock();
try {
updater.updateTo(announcementWatcher == null ? Long.MAX_VALUE : announcementWatcher.getLatestVersion());
} catch (Error | RuntimeException e) {
throw e;
} catch (Throwable t) {
throw new RuntimeException(t);
} finally {
refreshLock.writeLock().unlock();
}
}
public void triggerAsyncRefresh() {
triggerAsyncRefreshWithDelay(0);
}
public void triggerAsyncRefreshWithDelay(int delayMillis) {
final long targetBeginTime = System.currentTimeMillis() + delayMillis;
refreshExecutor.execute(() -> {
try {
long delay = targetBeginTime - System.currentTimeMillis();
if (delay > 0)
Thread.sleep(delay);
} catch (InterruptedException e) {
// Interrupting, such as shutting down the executor pool,
// cancels the trigger
LOG.log(Level.INFO, "Async refresh interrupted before trigger, refresh cancelled", e);
return;
}
try {
triggerRefresh();
} catch (Error | RuntimeException e) {
// Ensure exceptions are propagated to the executor
LOG.log(Level.SEVERE, "Async refresh failed", e);
throw e;
}
});
}
public void triggerRefreshTo(long version) {
if (announcementWatcher != null)
throw new UnsupportedOperationException("Cannot trigger refresh to specified version when a HollowConsumer.AnnouncementWatcher is present");
try {
updater.updateTo(version);
} catch (Error | RuntimeException e) {
throw e;
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
BlobRetriever
BlobRetriever与BlobPublisher相对应,BlobRetriever负责从存储介质中接收数据,需要与BlobPublisher有相对应的读数据的能力,BlobRetriever的方法参数都是具体的数据集版本(可以是全量版本,也可以是增量版本)。Hollow提供的BlobRetriever默认实现为HollowFilesystemBlobRetriever。
举例来讲,我们将文件上传的S3后,将有BlobRetriever负责从S3上下载文件,并加载到内存中。
/** * An interface which defines the necessary interactions of Hollow with a blob data store. * <p> * Implementations will define how to retrieve blob data from a data store. */
public interface BlobRetriever {
/** * Returns the snapshot for the state with the greatest version identifier which is equal to or less than the desired version * @param desiredVersion the desired version * @return the blob of the snapshot */
HollowConsumer.Blob retrieveSnapshotBlob(long desiredVersion);
/** * Returns a delta transition which can be applied to the specified version identifier * @param currentVersion the current version * @return the blob of the delta */
HollowConsumer.Blob retrieveDeltaBlob(long currentVersion);
/** * Returns a reverse delta transition which can be applied to the specified version identifier * @param currentVersion the current version * @return the blob of the reverse delta */
HollowConsumer.Blob retrieveReverseDeltaBlob(long currentVersion);
default Set<String> configuredOptionalBlobParts() {
return null;
}
default HollowConsumer.HeaderBlob retrieveHeaderBlob(long currentVersion) {
throw new UnsupportedOperationException();
}
}
Blob
Blob
的全称是Binary large Object
。在Hollow中特指一个经过压缩编码后的数据集合。HollowConsumer
中特指的是带有版本号的BLOB文件,为什么要带版本号呢?作为消费端如果没有Version
概念,数据本身是没有意义的,以为我们不知道收到的这份数据是应该叠加在哪个已有数据版本的基础上。
protected interface VersionedBlob {
InputStream getInputStream() throws IOException;
default File getFile() throws IOException {
throw new UnsupportedOperationException();
}
}
public static abstract class HeaderBlob implements VersionedBlob{
private final long version;
protected HeaderBlob(long version) {
this.version = version;
}
public long getVersion() {
return this.version;
}
}
public static abstract class Blob implements VersionedBlob{
protected final long fromVersion;
protected final long toVersion;
private final BlobType blobType;
public Blob(long toVersion) {
this(HollowConstants.VERSION_NONE, toVersion);
}
public Blob(long fromVersion, long toVersion) {
this.fromVersion = fromVersion;
this.toVersion = toVersion;
if (this.isSnapshot())
this.blobType = BlobType.SNAPSHOT;
else if (this.isReverseDelta())
this.blobType = BlobType.REVERSE_DELTA;
else
this.blobType = BlobType.DELTA;
}
public abstract InputStream getInputStream() throws IOException;
public OptionalBlobPartInput getOptionalBlobPartInputs() throws IOException {
return null;
}
public enum BlobType {
SNAPSHOT("snapshot"),
DELTA("delta"),
REVERSE_DELTA("reversedelta");
private final String type;
BlobType(String type) {
this.type = type;
}
public String getType() {
return this.type;
}
}
public boolean isSnapshot() {
return fromVersion == HollowConstants.VERSION_NONE;
}
public boolean isReverseDelta() {
return toVersion < fromVersion;
}
public boolean isDelta() {
return !isSnapshot() && !isReverseDelta();
}
public long getFromVersion() {
return fromVersion;
}
public long getToVersion() {
return toVersion;
}
public BlobType getBlobType() {
return blobType;
}
}
AnnouncementWatcher
AnnouncementWatcher与Producer中介绍的Announcer相对应,Announcer负责发送通知,AnnouncementWatcher负责监控通知内容,一旦收到通知,即可调用BlobRetriever将数据接收到内存。Hollow提供的AnnouncementWatcher默认实现为HollowFilesystemAnnouncementWatcher。
举例来讲,AnnouncementWatcher的实现可以由QMQ的consumer来完成。
/** * Implementations of this class are responsible for two things: * <p> * 1) Tracking the latest announced data state version. * 2) Keeping the client up to date by calling triggerAsyncRefresh() on self when the latest version changes. * <p> * If an AnnouncementWatcher is provided to a HollowConsumer, then calling HollowConsumer#triggerRefreshTo() is unsupported. */
public interface AnnouncementWatcher {
long NO_ANNOUNCEMENT_AVAILABLE = HollowConstants.VERSION_NONE;
/** * @return the latest announced version. */
long getLatestVersion();
/** * Implementations of this method should subscribe a HollowConsumer to updates to announced versions. * <p> * When announcements are received via a push mechanism, or polling reveals a new version, a call should be placed to one * of the flavors of {@link HollowConsumer#triggerRefresh()} on the provided HollowConsumer. * * @param consumer the hollow consumer */
void subscribeToUpdates(HollowConsumer consumer);
}
DoubleSnapshotConfig
是否保留双份全量数据,以及最大增量版本数量配置。
public interface DoubleSnapshotConfig {
boolean allowDoubleSnapshot();
int maxDeltasBeforeDoubleSnapshot();
DoubleSnapshotConfig DEFAULT_CONFIG = new DoubleSnapshotConfig() {
@Override
public int maxDeltasBeforeDoubleSnapshot() {
return 32;
}
@Override
public boolean allowDoubleSnapshot() {
return true;
}
};
}
Longevity
Hollow池化的配置管理,此章节要结合前一篇文章【Netflix Hollow系列】深入分析Hollow内存布局一起看会更加容易理解些。
public interface ObjectLongevityConfig {
/** * @return whether or not long-lived object support is enabled. * <p> * Because Hollow reuses pooled memory, if references to Hollow records are held too long, the underlying data may * be overwritten. When long-lived object support is enabled, Hollow records referenced via a {@link HollowAPI} will, * after an update, be backed by a reserved copy of the data at the time the reference was created. This guarantees * that even if a reference is held for a long time, it will continue to return the same data when interrogated. * <p> * These reserved copies are backed by the {@link HollowHistory} data structure. */
boolean enableLongLivedObjectSupport();
boolean enableExpiredUsageStackTraces();
/** * @return if long-lived object support is enabled, the number of milliseconds before the {@link StaleHollowReferenceDetector} * will begin flagging usage of stale objects. */
long gracePeriodMillis();
/** * @return if long-lived object support is enabled, the number of milliseconds, after the grace period, during which * data is still available in stale references, but usage will be flagged by the {@link StaleHollowReferenceDetector}. * <p> * After the grace period + usage detection period have expired, the data from stale references will become inaccessible if * dropDataAutomatically() is enabled. */
long usageDetectionPeriodMillis();
/** * @return whether or not to drop data behind stale references after the grace period + usage detection period has elapsed, assuming * that no usage was detected during the usage detection period. */
boolean dropDataAutomatically();
/** * @return whether data is dropped even if flagged during the usage detection period. */
boolean forceDropData();
ObjectLongevityConfig DEFAULT_CONFIG = new ObjectLongevityConfig() {
@Override
public boolean enableLongLivedObjectSupport() {
return false;
}
@Override
public boolean dropDataAutomatically() {
return false;
}
@Override
public boolean forceDropData() {
return false;
}
@Override
public boolean enableExpiredUsageStackTraces() {
return false;
}
@Override
public long usageDetectionPeriodMillis() {
return 60 * 60 * 1000;
}
@Override
public long gracePeriodMillis() {
return 60 * 60 * 1000;
}
};
}
/** * Listens for stale Hollow object usage */
public interface ObjectLongevityDetector {
/** * Stale reference detection hint. This will be called every ~30 seconds. * <p> * If a nonzero value is reported, then stale references to Hollow objects may be cached somewhere in your codebase. * <p> * This signal can be noisy, and a nonzero value indicates that some reference to stale data exists somewhere. * * @param count the count of stale references */
void staleReferenceExistenceDetected(int count);
/** * Stale reference USAGE detection. This will be called every ~30 seconds. * <p> * If a nonzero value is reported, then stale references to Hollow objects are being accessed from somewhere in your codebase. * <p> * This signal is noiseless, and a nonzero value indicates that some reference to stale data is USED somewhere. * * @param count the count of stale references */
void staleReferenceUsageDetected(int count);
ObjectLongevityDetector DEFAULT_DETECTOR = new ObjectLongevityDetector() {
@Override
public void staleReferenceUsageDetected(int count) {
}
@Override
public void staleReferenceExistenceDetected(int count) {
}
};
}
RefreshListener
RefreshListener
将定义在更新Hollow的本地内存副本之前、期间和之后发生各种事件时要做什么。
public interface RefreshListener {
void refreshStarted(long currentVersion, long requestedVersion);
void snapshotUpdateOccurred(HollowAPI api, HollowReadStateEngine stateEngine, long version) throws Exception;
void deltaUpdateOccurred(HollowAPI api, HollowReadStateEngine stateEngine, long version) throws Exception;
void blobLoaded(HollowConsumer.Blob transition);
void refreshSuccessful(long beforeVersion, long afterVersion, long requestedVersion);
void refreshFailed(long beforeVersion, long afterVersion, long requestedVersion, Throwable failureCause);
}
TransitionAwareRefreshListener
是对RefreshListener
的丰富和扩展。
public interface TransitionAwareRefreshListener extends RefreshListener {
void snapshotApplied(HollowAPI api, HollowReadStateEngine stateEngine, long version) throws Exception;
void deltaApplied(HollowAPI api, HollowReadStateEngine stateEngine, long version) throws Exception;
default void transitionsPlanned(long beforeVersion, long desiredVersion, boolean isSnapshotPlan, List<HollowConsumer.Blob.BlobType> transitionSequence) {}
}
RefreshRegistrationListener
是监听器的监听器,负责刷新监听器添加和删除的监听器。
public interface RefreshRegistrationListener {
void onBeforeAddition(HollowConsumer c);
void onAfterRemoval(HollowConsumer c);
}
Consumer API
如果想要读取Hollow的Blob数据,需要使用借助于HollowAPI实现。
HollowAPI分为HollowObjectAPI和HollowTypeAPI,当Blob中数据量较大时,推荐直接使用HollowTypeAPI进行解析,性能要比HollowObjectAPI好的多。
在上文中分析HollowConsumer
的构造方法时,我看注意到一个generator client
,Hollow提供了API的代码生成工具类HollowAPIGenerator.java,使用HollowAPIGenerator可以方便的生成在DataClient需要的API类。API完全基于在Producer中定义的Schema生成。生成后的API,可以方便的使用包括query、index、hash等方法。
此外,Hollow还提供了一个通用的数据访问类GenericHollowObject.java,使用这个类,可以无需针对具体的数据模型生成API类,但是使用起来会相对较为繁琐,因为我们无法在IDE中通过“点”的方式准确的获取具体的类和方法。
总结
本文从原始的生产消费模型入手,详细阐述Hollow的生产消费模型。并结合Hollow源码,详细分析了HollowProducer
和HollowConsumer
。
通过上文可以看出,HollowProducer
和HollowConsumer
是基于File进行生产和消费管理的,但是这是唯一的方式吗?答案是否定的。我们可以借助于HollowWriteStateEngine
和HollowReadStateEngine
的能力直接使用InputStream
和OutputStream
操作数据。
提前和大家预告下,将在下一篇文章中详细探讨HollowWriteStateEngine
和HollowReadStateEngine
的使用。希望大家能够喜欢。
结束语
看了下时间已经很晚了,洗洗准备哄娃睡觉了,祝大家周末愉快。
今天的文章【Netflix Hollow系列】深入分析Hollow生产消费模型分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/16923.html