这一章想讲一下 Spark 的缓存是如何实现的。这个 persist 方法是在 RDD 里面的,所以我们直接打开 RDD 这个类。
def persist(newLevel: StorageLevel): this.type = {
// StorageLevel不能随意更改
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")
}
sc.persistRDD(this)
// Register the RDD with the ContextCleaner for automatic GC-based cleanup
// 注册清理方法
sc.cleaner.foreach(_.registerRDDForCleanup(this))
storageLevel = newLevel
this
}
它调用 SparkContext 去缓存这个 RDD,追杀下去。
private[spark] def persistRDD(rdd: RDD[_]) {
persistentRdds(rdd.id) = rdd
}
它居然是用一个 HashMap 来存的,具体看这个 map 的类型是 TimeStampedWeakValueHashMap[Int, RDD[_]] 类型。把存进去的值都隐式转换成 WeakReference,然后加到一个内部的一个 ConcurrentHashMap 里面。这里貌似也没干啥,这是有个鸟蛋用。。大神莫喷,知道干啥用的人希望告诉我一下。
CacheManager
现在并没有保存,等到真正运行 Task 运行的时候才会去缓存起来。入口在 Task 的 runTask 方法里面,具体的我们可以看 ResultTask,它调用了 RDD 的 iterator 方法。
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}
一旦设置了 StorageLevel,就要从 SparkEnv 的 cacheManager 取数据。
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(rdd.id, split.index)
blockManager.get(key) match {
case Some(values) =>
// 已经有了,直接返回就可以了
new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
// loading包含这个key表示已经有人在加载了,等到loading被释放了,就可以去blockManager里面取到了
loading.synchronized {
if (loading.contains(key)) {
while (loading.contains(key)) {
try {
loading.wait()
} catch {
case e: Exception =>
logWarning(s"Got an exception while waiting for another thread to load $key", e)
}
}
// 别人成功拿到了,我们直接取结果就是了,如果别人取失败了,我们再来取一次
blockManager.get(key) match {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
loading.add(key)
}
} else {
loading.add(key)
}
}
try {
// 通过rdd自身的compute方法去计算得到结果,回去看看RDD那文章,自己看看源码就清楚了
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// 如果是本地运行的,就没必要缓存了,直接返回即可
if (context.runningLocally) {
return computedValues
}
// 跟踪blocks的更新状态
var updatedBlocks = Seq[(BlockId, BlockStatus)]()
val returnValue: Iterator[T] = {
if (storageLevel.useDisk && !storageLevel.useMemory) {
/* 这是RDD采用DISK_ONLY的情况,直接扔给blockManager * 然后把结果直接返回,它不需要把结果一下子全部加载进内存 * 这同样适用于MEMORY_ONLY_SER,但是我们需要在启用它之前确认blocks没被block store给丢弃 */
updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
blockManager.get(key) match {
case Some(values) =>
values.asInstanceOf[Iterator[T]]
case None =>
throw new Exception("Block manager failed to return persisted valued")
}
} else {
// 先存到一个ArrayBuffer,然后一次返回,在blockManager里也存一份
val elements = new ArrayBuffer[Any]
elements ++= computedValues
updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
}
}
// 更新task的监控参数
val metrics = context.taskMetrics
metrics.updatedBlocks = Some(updatedBlocks)
new InterruptibleIterator(context, returnValue)
} finally {
// 改完了,释放锁
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
}
}
1、如果 blockManager 当中有,直接从 blockManager 当中取。
2、如果 blockManager 没有,就先用 RDD 的 compute 函数得到出来一个 Iterable 接口。
3、如果 StorageLevel 是只保存在硬盘的话,就把值存在 blockManager 当中,然后从 blockManager 当中取出一个 Iterable 接口,这样的好处是不会一次把数据全部加载进内存。
4、如果 StorageLevel 是需要使用内存的情况,就把结果添加到一个 ArrayBuffer 当中一次返回,另外在 blockManager 存上一份,下次直接从 blockManager 取。
对 StorageLevel 说明一下吧,贴一下它的源码。
class StorageLevel private( private var useDisk_ : Boolean, private var useMemory_ : Boolean, private var useOffHeap_ : Boolean, private var deserialized_ : Boolean, private var replication_ : Int = 1)
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
大家注意看它那几个参数,useDisk_、useMemory_、useOffHeap_、deserialized_、replication_ 在具体的类型的时候是传的什么值。
下面我们的目标要放到 blockManager。
BlockManager
BlockManager 这个类比较大,我们从两方面开始看吧,putBytes 和 get 方法。先从 putBytes 说起,之前说过 Task 运行结束之后,结果超过 10M 的话,会用 BlockManager 缓存起来。
env.blockManager.putBytes(blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
putBytes 内部又掉了另外一个方法 doPut,方法很大呀,先折叠起来。
private def doPut(
blockId: BlockId,
data: Values,
level: StorageLevel,
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {// Return value
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
// 记录它的StorageLevel,以便我们可以在它加载进内存之后,可以按需写入硬盘。
  // 此外,在我们把调用BlockInfo的markReay方法之前,都没法通过get方法获得该部分内容
val putBlockInfo = {
val tinfo = new BlockInfo(level, tellMaster)
// 如果不存在,就添加到blockInfo里面
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
if (oldBlockOpt.isDefined) {
// 如果已经存在了,就不需要重复添加了
if (oldBlockOpt.get.waitForReady()) {return updatedBlocks
}
// 存在于blockInfo当中->但是上一次保存失败了,拿出旧的信息,再试一遍
oldBlockOpt.get
} else {
tinfo
}
}
val startTimeMs = System.currentTimeMillis
// 当我们需要存储数据,并且要复制数据到别的机器,我们需要访问它的值,但是因为我们的put操作会读取整个iterator,
// 这就不会有任何的值留下。在我们保存序列化的数据的场景,我们可以记住这些bytes,但在其他场景,比如反序列化存储的
// 时候,我们就必须依赖返回一个Iterator
var valuesAfterPut: Iterator[Any] = null
// Ditto for the bytes after the put
var bytesAfterPut: ByteBuffer = null
// Size of the block in bytes
var size = 0L
// 在保存数据之前,我们要实例化,在数据已经序列化并且准备好发送的情况下,这个过程是很快的
val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
// duplicate并不是复制这些数据,只是做了一个包装
val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
Future {
// 把block复制到别的机器上去
replicate(blockId, bufferView, level)
}
} else {
null
}
putBlockInfo.synchronized {
var marked = false
try {
if (level.useMemory) {
// 首先是保存到内存里面,尽管它也使用硬盘,等内存不够的时候,才会写入硬盘
// 下面分了三种情况,但是Task的结果是ByteBufferValues这种情况,具体看putBytes方法
val res = data match {
case IteratorValues(iterator) =>
memoryStore.putValues(blockId, iterator, level, true)
case ArrayBufferValues(array) =>
memoryStore.putValues(blockId, array, level, true)
case ByteBufferValues(bytes) =>
bytes.rewind()
memoryStore.putBytes(blockId, bytes, level)
}
size = res.size
// 这里写得那么恶心,是跟data的类型有关系的,data: Either[Iterator[_], ByteBuffer],Left是Iterator,Right是ByteBuffer
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case Left(newIterator) => valuesAfterPut = newIterator
}
// 把被置换到硬盘的blocks记录到updatedBlocks上
res.droppedBlocks.foreach { block => updatedBlocks += block }
} else if (level.useOffHeap) {
// 保存到Tachyon上.
val res = data match {
case IteratorValues(iterator) =>
tachyonStore.putValues(blockId, iterator, level, false)
case ArrayBufferValues(array) =>
tachyonStore.putValues(blockId, array, level, false)
case ByteBufferValues(bytes) =>
bytes.rewind()
tachyonStore.putBytes(blockId, bytes, level)
}
size = res.size
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case _ =>
}
} else {
// 直接保存到硬盘,不要复制到其它节点的就别返回数据了.
val askForBytes = level.replication > 1
val res = data match {
case IteratorValues(iterator) =>
diskStore.putValues(blockId, iterator, level, askForBytes)
case ArrayBufferValues(array) =>
diskStore.putValues(blockId, array, level, askForBytes)
case ByteBufferValues(bytes) =>
bytes.rewind()
diskStore.putBytes(blockId, bytes, level)
}
size = res.size
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case _ =>
}
}
     // 通过blockId获得当前的block状态
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// 成功了,把该block标记为ready,通知BlockManagerMaster
marked = true
putBlockInfo.markReady(size)
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
updatedBlocks += ((blockId, putBlockStatus))
}
} finally {
// 如果没有标记成功,就把该block信息清除
if (!marked) {
blockInfo.remove(blockId)
putBlockInfo.markFailure()
}
}
}
// 把数据发送到别的节点做备份
if (level.replication > 1) {
data match {
case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
case _ => {
val remoteStartTime = System.currentTimeMillis
// 把Iterator里面的数据序列化之后,发送到别的节点
if (bytesAfterPut == null) {
if (valuesAfterPut == null) {
throw new SparkException("Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, level)
}
}
}
// 销毁bytesAfterPut
BlockManager.dispose(bytesAfterPut)
updatedBlocks
}
从上面的的来看:
1、存储的时候按照不同的存储级别分了 3 种情况来处理:存在内存当中(包括 MEMORY 字样的),存在 tachyon 上(OFF_HEAP),只存在硬盘上(DISK_ONLY)。
2、存储完成之后会根据存储级别决定是否发送到别的节点,在名字上最后带 2 字的都是这种,2 表示一个 block 会在两个节点上保存。
3、存储完毕之后,会向 BlockManagerMaster 汇报 block 的情况。
4、这里面的序列化其实是先压缩后序列化,默认使用的是 LZF 压缩,可以通过 spark.io.compression.codec 设定为 snappy 或者 lzo,序列化方式通过 spark.serializer 设置,默认是 JavaSerializer。
接下来我们再看 get 的情况。
val local = getLocal(blockId)
if (local.isDefined) return local
val remote = getRemote(blockId)
if (remote.isDefined) return remote
None
先从本地取,本地没有再去别的节点取,都没有,返回 None。从本地取就不说了,怎么进怎么出。讲一下怎么从别的节点去,它们是一个什么样子的关系?
我们先看 getRemote 方法
private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
val locations = Random.shuffle(master.getLocations(blockId))
for (loc <- locations) {
val data = BlockManagerWorker.syncGetBlock(GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
if (data != null) {
if (asValues) {
return Some(dataDeserialize(blockId, data))
} else {
return Some(data)
}
}
}
None
}
这个方法包括两个步骤:
1、用 blockId 通过 master 的 getLocations 方法找到它的位置。
2、通过 BlockManagerWorker.syncGetBlock 到指定的节点获取数据。
ok,下面就重点讲 BlockManager 和 BlockManagerMaster 之间的关系,以及 BlockManager 之间是如何相互传输数据。
BlockManager 与 BlockManagerMaster 的关系
BlockManager 我们使用的时候是从 SparkEnv.get 获得的,我们观察了一下 SparkEnv,发现它包含了我们运行时候常用的那些东东。那它创建是怎么创建的呢,我们找到 SparkEnv 里面的 create 方法,右键 FindUsages,就会找到两个地方调用了,一个是 SparkContext,另一个是 Executor。在 SparkEnv 的 create 方法里面会实例化一个 BlockManager 和 BlockManagerMaster。这里我们需要注意看 BlockManagerMaster 的实例化方法,里面调用了 registerOrLookup 方法。
def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
if (isDriver) {
actorSystem.actorOf(Props(newActor), name = name)
} else {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
val timeout = AkkaUtils.lookupTimeout(conf)
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
}
所以从这里可以看出来,除了 Driver 之后的 actor 都是,都是持有的 Driver 的引用 ActorRef。梳理一下,我们可以得出以下结论:
1、SparkContext 持有一个 BlockManager 和 BlockManagerMaster。
2、每一个 Executor 都持有一个 BlockManager 和 BlockManagerMaster。
3、Executor 和 SparkContext 的 BlockManagerMaster 通过 BlockManagerMasterActor 来通信。
接下来,我们看看 BlockManagerMasterActor 里的三组映射关系。
// 1、BlockManagerId和BlockManagerInfo的映射关系
private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
// 2、Executor ID 和 Block manager ID的映射关系
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
// 3、BlockId和保存它的BlockManagerId的映射关系
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
看到这三组关系,前面的 getLocations 方法不用看它的实现,我们都应该知道是怎么找了。
BlockManager 相互传输数据
BlockManager 之间发送数据和接受数据是通过 BlockManagerWorker 的 syncPutBlock 和 syncGetBlock 方法来实现。看 BlockManagerWorker 的注释,说是 BlockManager 的网络接口,采用的是事件驱动模型。
再仔细看这两个方法,它传输的数据包装成 BlockMessage 之后,通过 ConnectionManager 的 sendMessageReliablySync 方法来传输。
接下来的故事就是 nio 之间的发送和接收了,就简单说几点吧:
1、ConnectionManager 内部实例化一个 selectorThread 线程来接收消息,具体请看 run 方法。
2、Connection 发送数据的时候,是一次把消息队列的 message 全部发送,不是一个一个 message 发送,具体看 SendConnection 的 write 方法,与之对应的接收看 ReceivingConnection 的 read 方法。
3、read 完了之后,调用回调函数 ConnectionManager 的 receiveMessage 方法,它又调用了 handleMessage 方法,handleMessage 又调用了 BlockManagerWorker 的 onBlockMessageReceive 方法。传说中的事件驱动又出现了。
def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
blockMessage.getType match {
case BlockMessage.TYPE_PUT_BLOCK => {
val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
putBlock(pB.id, pB.data, pB.level)
None
}
case BlockMessage.TYPE_GET_BLOCK => {
val gB = new GetBlock(blockMessage.getId)
val buffer = getBlock(gB.id)
Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
}
case _ => None
}
}
根据 BlockMessage 的类型进行处理,put 类型就保存数据,get 类型就从本地把 block 读出来返回给它。
注:BlockManagerMasterActor 是存在于 BlockManagerMaster 内部,画在外面只是因为它在通信的时候起了关键的作用的,Executor 上持有的 BlockManagerMasterActor 均是 Driver 节点的 Actor 的引用。
广播变量
先回顾一下怎么使用广播变量:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
看了一下实现调用的是 broadcastFactory 的 newBroadcast 方法。
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
}
默认的 broadcastFactory 是 HttpBroadcastFactory,内部还有另外一个实现 TorrentBroadcastFactory,先说 HttpBroadcastFactory 的 newBroadcast 方法。
它直接 new 了一个 HttpBroadcast。
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}
if (!isLocal) {
HttpBroadcast.write(id, value_)
}
它的内部做了两个操作,把数据保存到 driver 端的 BlockManager 并且写入到硬盘。
TorrentBroadcast 和 HttpBroadcast 都把数据存进了 BlockManager 做备份,但是 TorrentBroadcast 接着并没有把数据写入文件,而是采用了下面这种方式:
def sendBroadcast() {
// 把数据给切分了,每4M一个分片
val tInfo = TorrentBroadcast.blockifyObject(value_)
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
hasBlocks = tInfo.totalBlocks
// 把分片的信息存到BlockManager,并通知Master
val metaId = BroadcastBlockId(id, "meta")
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
// 遍历所有分片,存到BlockManager上面,并通知Master
for (i <- 0 until totalBlocks) {
val pieceId = BroadcastBlockId(id, "piece" + i)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
}
}
1、把数据序列化之后,每 4M 切分一下。
2、切分完了之后,把所有分片写入 BlockManager。
但是找不到它们是怎么传播的??只是写入到 BlockManager,但是 tellMaster 为 false 的话,就相当于存在本地了,别的 BlockManager 是没法获取到的。
这时候我注意到它内部有两个方法,readObject 和 writeObject,会不会和这两个方法有关呢?它们做的操作就是给 value 赋值。
为了检验这个想法,我亲自调试了一下,在反序列化任务的时候,readObject 这个方法是被 ObjectInputStream 调用了。这块的知识大家可以百度下 ObjectInputStream 和 ObjectOutputStream。
具体操作如下:
1、打开 BroadcastSuite 这个类,找到下面这段代码,图中的地方原来是 512, 被我改成 256 了,之前一直运行不起来。
test("Accessing TorrentBroadcast variables in a local cluster") {
val numSlaves = 4
sc = new SparkContext("local-cluster[%d, 1, 256]".format(numSlaves), "test", torrentConf)
val list = List[Int](1, 2, 3, 4)
val broadcast = sc.broadcast(list)
val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))
assert(results.collect().toSet === (1 to numSlaves).map(x => (x, 10)).toSet)
}
2、找到 TorrentBroadcast,在 readObject 方法上打上断点。
3、开始调试吧。
之前讲过,Task 是被序列化之后包装在消息里面发送给 Worker 去运行的,所以在运行之前必须把 Task 进行反序列化,具体在 TaskRunner 的 run 方法里面:
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
Ok,告诉大家入口了,剩下的大家去尝试吧。前面介绍了怎么切分的,到 TorrentBroadcast 的 readObject 里面就很容易理解了。
1、先通过 MetaId 从 BlockManager 里面取出来 Meta 信息。
2、通过 Meta 信息,构造分片 id,去 BlockManager 里面取。
3、获得分片之后,把分片写入到本地的 BlockManager 当中。
4、全部取完之后,通过下面的方法反向赋值。
if (receiveBroadcast()) {
value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
   SparkEnv.get.blockManager.putSingle(broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}
5、把 value_又顺手写入到 BlockManager 当中。(这里相当于写了两份进去,大家要注意了哈,内存消耗还是大大地。幸好是 MEMORY_AND_DISK 的)
这么做是有好处的,这是一种类似 BT 的做法,把数据切分成一小块一小块,容易传播,从不同的机器上获取一小块一小块的数据,最后组装成完整的。
把完整的 value 写入 BlockManager 是为了使用的时候方便,不需要再次组装。
相关参数
// BlockManager的最大内存
spark.storage.memoryFraction 默认值0.6
// 文件保存的位置
spark.local.dir 默认是系统变量java.io.tmpdir的值
// tachyon保存的地址
spark.tachyonStore.url 默认值tachyon://localhost:19998
// 默认不启用netty来传输shuffle的数据
spark.shuffle.use.netty 默认值是false
spark.shuffle.sender.port 默认值是0
// 一个reduce抓取map中间结果的最大的同时抓取数量大小(to avoid over-allocating memory for receiving shuffle outputs)
spark.reducer.maxMbInFlight 默认值是48*1024*1024
// TorrentBroadcast切分数据块的分片大小
spark.broadcast.blockSize 默认是4096
// 广播变量的工厂类
spark.broadcast.factory 默认是org.apache.spark.broadcast.HttpBroadcastFactory,也可以设置为org.apache.spark.broadcast.TorrentBroadcastFactory
// 压缩格式
spark.io.compression.codec 默认是LZF,可以设置成Snappy或者Lzo
今天的文章Spark 源码系列(五)分布式缓存分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/22794.html