numpy-ml\numpy_ml\preprocessing\nlp.py
# 导入必要的库和模块 import re import heapq import os.path as op from collections import Counter, OrderedDict, defaultdict import numpy as np # 定义英文停用词列表,来源于"Glasgow Information Retrieval Group" _STOP_WORDS = set( ).split(" "), ) # 定义用于匹配单词的正则表达式,用于分词 _WORD_REGEX = re.compile(r"(?u)\b\w\w+\b") # sklearn默认 _WORD_REGEX_W_PUNC = re.compile(r"(?u)\w+|[^a-zA-Z0-9\s]") _WORD_REGEX_W_PUNC_AND_WHITESPACE = re.compile(r"(?u)s?\w+\s?|\s?[^a-zA-Z0-9\s]\s?") # 定义用于匹配标点符号的正则表达式 _PUNC_BYTE_REGEX = re.compile( r"(33|34|35|36|37|38|39|40|41|42|43|44|45|" r"46|47|58|59|60|61|62|63|64|91|92|93|94|" r"95|96|123|124|125|126)", ) # 定义标点符号 _PUNCTUATION = "!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~" # 创建用于去除标点符号的转换表 _PUNC_TABLE = str.maketrans("", "", _PUNCTUATION) # 定义函数,返回指定长度的n-gram序列 def ngrams(sequence, N): """Return all `N`-grams of the elements in `sequence`""" assert N >= 1 return list(zip(*[sequence[i:] for i in range(N)])) # 定义函数,将字符串按空格分词,可选择是否转为小写、过滤停用词和标点符号 def tokenize_whitespace( line, lowercase=True, filter_stopwords=True, filter_punctuation=True, kwargs, ): """ Split a string at any whitespace characters, optionally removing punctuation and stop-words in the process. """ line = line.lower() if lowercase else line words = line.split() line = [strip_punctuation(w) for w in words] if filter_punctuation else line return remove_stop_words(words) if filter_stopwords else words # 定义函数,将字符串按单词分词,可选择是否转为小写、过滤停用词和标点符号 def tokenize_words( line, lowercase=True, filter_stopwords=True, filter_punctuation=True, kwargs, ): """ Split a string into individual words, optionally removing punctuation and stop-words in the process. """ REGEX = _WORD_REGEX if filter_punctuation else _WORD_REGEX_W_PUNC words = REGEX.findall(line.lower() if lowercase else line) return remove_stop_words(words) if filter_stopwords else words # 定义函数,将字符串按字节分词 def tokenize_words_bytes( line, # 设置是否将文本转换为小写 lowercase=True, # 设置是否过滤停用词 filter_stopwords=True, # 设置是否过滤标点符号 filter_punctuation=True, # 设置文本编码格式为 UTF-8 encoding="utf-8", # kwargs 表示接受任意数量的关键字参数,这些参数会被传递给函数的其他部分进行处理 kwargs, # 将字符串拆分为单词,并在此过程中选择性地删除标点符号和停用词。将每个单词转换为字节列表。 def tokenize_words( line, lowercase=lowercase, filter_stopwords=filter_stopwords, filter_punctuation=filter_punctuation, kwargs, ): # 对单词进行分词处理,根据参数选择是否转换为小写、过滤停用词和标点符号 words = tokenize_words( line, lowercase=lowercase, filter_stopwords=filter_stopwords, filter_punctuation=filter_punctuation, kwargs, ) # 将单词转换为字节列表,每个字节用空格分隔 words = [" ".join([str(i) for i in w.encode(encoding)]) for w in words] # 返回字节列表 return words # 将字符串中的字符转换为字节集合。每个字节用0到255之间的整数表示。 def tokenize_bytes_raw(line, encoding="utf-8", splitter=None, kwargs): # 将字符串中的字符编码为字节,每个字节用空格分隔 byte_str = [" ".join([str(i) for i in line.encode(encoding)]) # 如果指定了分隔符为标点符号,则在编码为字节之前在标点符号处进行分割 if splitter == "punctuation": byte_str = _PUNC_BYTE_REGEX.sub(r"-\1-", byte_str[0]).split("-") return byte_str # 将字节(表示为0到255之间的整数)解码为指定编码的字符。 def bytes_to_chars(byte_list, encoding="utf-8"): # 将字节列表中的整数转换为十六进制字符串 hex_array = [hex(a).replace("0x", "") for a in byte_list] # 将十六进制字符串连接起来,并在需要时在前面补0 hex_array = " ".join([h if len(h) > 1 else f"0{h}" for h in hex_array]) # 将十六进制字符串转换为字节数组,再根据指定编码解码为字符 return bytearray.fromhex(hex_array).decode(encoding) # 将字符串中的字符转换为小写,并根据参数选择是否过滤标点符号。 def tokenize_chars(line, lowercase=True, filter_punctuation=True, kwargs): # 将字符串拆分为单个字符,可选择在此过程中删除标点符号和停用词 """ # 如果需要转换为小写,则将字符串转换为小写 line = line.lower() if lowercase else line # 如果需要过滤标点符号,则调用函数去除标点符号 line = strip_punctuation(line) if filter_punctuation else line # 使用正则表达式将连续多个空格替换为一个空格,并去除首尾空格,然后将结果转换为字符列表 chars = list(re.sub(" {2,}", " ", line).strip()) # 返回字符列表 return chars # 从单词字符串列表中移除停用词 def remove_stop_words(words): """Remove stop words from a list of word strings""" # 返回不在停用词列表中的单词 return [w for w in words if w.lower() not in _STOP_WORDS] # 从字符串中移除标点符号 def strip_punctuation(line): """Remove punctuation from a string""" # 使用_PUNC_TABLE来移除字符串中的标点符号,并去除首尾空格 return line.translate(_PUNC_TABLE).strip() # # Byte-Pair Encoder # # # 定义一个Byte-Pair编码器类 class BytePairEncoder(object): def __init__(self, max_merges=3000, encoding="utf-8"): """ A byte-pair encoder for sub-word embeddings. Notes ----- Byte-pair encoding [1][2] is a compression algorithm that iteratively replaces the most frequently ocurring byte pairs in a set of documents with a new, single token. It has gained popularity as a preprocessing step for many NLP tasks due to its simplicity and expressiveness: using a base coebook of just 256 unique tokens (bytes), any string can be encoded. References ---------- .. [1] Gage, P. (1994). A new algorithm for data compression. *C Users Journal, 12(2)*, 23–38. .. [2] Sennrich, R., Haddow, B., & Birch, A. (2015). Neural machine translation of rare words with subword units, *Proceedings of the 54th Annual Meeting of the Association for Computational Linguistics,* 1715-1725. Parameters ---------- max_merges : int The maximum number of byte pair merges to perform during the :meth:`fit` operation. Default is 3000. encoding : str The encoding scheme for the documents used to train the encoder. Default is `'utf-8'`. """ # 初始化参数字典 self.parameters = { "max_merges": max_merges, "encoding": encoding, } # 初始化字节到标记和标记到字节的有序字典。字节以十进制表示为0到255之间的整数。 # 在255之前,标记和字节表示之间存在一对一的对应关系。 self.byte2token = OrderedDict({i: i for i in range(256)}) self.token2byte = OrderedDict({v: k for k, v in self.byte2token.items()}) # 在给定语料库上训练一个字节对编码表 def fit(self, corpus_fps, encoding="utf-8"): """ Train a byte pair codebook on a set of documents. Parameters ---------- corpus_fps : str or list of strs The filepath / list of filepaths for the document(s) to be used to learn the byte pair codebook. encoding : str The text encoding for documents. Common entries are either 'utf-8' (no header byte), or 'utf-8-sig' (header byte). Default is 'utf-8'. """ # 创建一个词汇表对象,用于存储字节对编码表 vocab = ( Vocabulary( lowercase=False, min_count=None, max_tokens=None, filter_stopwords=False, filter_punctuation=False, tokenizer="bytes", ) # 在给定语料库上拟合词汇表 .fit(corpus_fps, encoding=encoding) # 获取词汇表中的计数信息 .counts ) # 迭代地合并跨文档中最常见的字节二组 for _ in range(self.parameters["max_merges"]): # 获取词汇表中的字节二组计数信息 pair_counts = self._get_counts(vocab) # 找到出现次数最多的字节二组 most_common_bigram = max(pair_counts, key=pair_counts.get) # 合并最常见的字节二组到词汇表中 vocab = self._merge(most_common_bigram, vocab) # 初始化一个空集合,用于存储字节标记 token_bytes = set() # 遍历词汇表中的键 for k in vocab.keys(): # 将键按空格分割,筛选包含"-"的字节标记 token_bytes = token_bytes.union([w for w in k.split(" ") if "-" in w]) # 遍历字节标记集合 for i, t in enumerate(token_bytes): # 将字节标记转换为组形式 byte_tuple = tuple(int(j) for j in t.split("-")) # 将字节标记映射到对应的标记索引 self.token2byte[256 + i] = byte_tuple # 将字节标记索引映射到对应的字节标记 self.byte2token[byte_tuple] = 256 + i # 返回当前对象 return self # 获取词汇表中的字节二组计数信息 def _get_counts(self, vocab): """Collect bigram counts for the tokens in vocab""" # 初始化一个默认字典,用于存储字节二组计数 pair_counts = defaultdict(int) # 遍历词汇表中的单词和计数信息 for word, count in vocab.items(): # 生成单词的二组 pairs = ngrams(word.split(" "), 2) # 遍历单词的二组 for p in pairs: # 更新字节二组计数信息 pair_counts[p] += count # 返回字节二组计数信息 return pair_counts # 将给定的二组替换为单个标记,并相应更新词汇表 def _merge(self, bigram, vocab): v_out = {} # 转义二组中的单词,用于正则表达式匹配 bg = re.escape(" ".join(bigram)) # 创建匹配二组的正则表达式 bigram_regex = re.compile(r"(?<!\S)" + bg + r"(?!\S)") # 遍历词汇表中的单词 for word in vocab.keys(): # 将匹配到的二组替换为连接符"-" w_out = bigram_regex.sub("-".join(bigram), word) v_out[w_out] = vocab[word] return v_out # 将文本中的单词转换为其字节对编码的标记ID def transform(self, text): """ Transform the words in `text` into their byte pair encoded token IDs. Parameters ---------- text: str or list of `N` strings The list of strings to encode Returns ------- codes : list of `N` lists A list of byte pair token IDs for each of the `N` strings in `text`. Examples -------- >>> B = BytePairEncoder(max_merges=100).fit("./example.txt") >>> encoded_tokens = B.transform("Hello! How are you 😁 ?") >>> encoded_tokens [[72, 879, 474, ...]] """ # 如果输入是字符串,则转换为列表 if isinstance(text, str): text = [text] # 对文本中的每个字符串进行转换 return [self._transform(string) for string in text] # 将单个文本字符串转换为字节对 ID 列表 def _transform(self, text): # 获取参数配置 P = self.parameters # 将文本字符串转换为原始字节流 _bytes = tokenize_bytes_raw(text, encoding=P["encoding"]) # 初始化编码结果列表 encoded = [] # 遍历每个字节对 for w in _bytes: l, r = 0, len(w) # 将字节对转换为整数列表 w = [int(i) for i in w.split(" ")] # 循环处理字节对 while l < len(w): candidate = tuple(w[l:r]) # 如果候选字节对长度大于1且在词汇表中 if len(candidate) > 1 and candidate in self.byte2token: # 将候选字节对的 ID 添加到编码结果列表中 encoded.append(self.byte2token[candidate]) l, r = r, len(w) # 如果候选字节对长度为1 elif len(candidate) == 1: # 将候选字节的 ID 添加到编码结果列表中 encoded.append(candidate[0]) l, r = r, len(w) else: # 如果候选字节对不在词汇表中,则减小上下文窗口大小并重试 r -= 1 # 返回编码结果列表 return encoded def inverse_transform(self, codes): """ Transform an encoded sequence of byte pair codeword IDs back into human-readable text. Parameters ---------- codes : list of `N` lists A list of `N` lists. Each sublist is a collection of integer byte-pair token IDs representing a particular text string. Returns ------- text: list of `N` strings The decoded strings corresponding to the `N` sublists in `codes`. Examples -------- >>> B = BytePairEncoder(max_merges=100).fit("./example.txt") >>> encoded_tokens = B.transform("Hello! How are you 😁 ?") >>> encoded_tokens [[72, 879, 474, ...]] >>> B.inverse_transform(encoded_tokens) ["Hello! How are you 😁 ?"] """ # 如果输入的codes是一个整数,将其转换为包含一个列表的形式 if isinstance(codes[0], int): codes = [codes] decoded = [] P = self.parameters # 遍历codes中的每个列表 for code in codes: # 将每个token转换为对应的字节 _bytes = [self.token2byte[t] if t > 255 else [t] for t in code] # 将字节列表展开为一维列表 _bytes = [b for blist in _bytes for b in blist] # 将字节转换为字符并添加到decoded列表中 decoded.append(bytes_to_chars(_bytes, encoding=P["encoding"])) return decoded @property def codebook(self): """ A list of the learned byte pair codewords, decoded into human-readable format """ # 返回学习到的字节对编码的人类可读形式 return [ self.inverse_transform(t)[0] for t in self.byte2token.keys() if isinstance(t, tuple) ] @property def tokens(self): """A list of the byte pair codeword IDs""" # 返回字节对编码的ID列表 return list(self.token2byte.keys()) # 定义节点类,用于构建哈夫曼树 class Node(object): def __init__(self, key, val): self.key = key self.val = val self.left = None self.right = None # 重载大于运算符 def __gt__(self, other): """Greater than""" if not isinstance(other, Node): return -1 return self.val > other.val # 重载大于等于运算符 def __ge__(self, other): """Greater than or equal to""" if not isinstance(other, Node): return -1 return self.val >= other.val # 重载小于运算符 def __lt__(self, other): """Less than""" if not isinstance(other, Node): return -1 return self.val < other.val # 重载小于等于运算符 def __le__(self, other): """Less than or equal to""" if not isinstance(other, Node): return -1 return self.val <= other.val # 定义哈夫曼编码器类 class HuffmanEncoder(object): # 为文本中的标记构建一个哈夫曼树,并计算每个标记的二进制编码。 # 在哈夫曼编码中,出现频率更高的标记通常使用较少的位表示。哈夫曼编码产生了所有方法中对单独编码标记的最小期望码字长度。 # 哈夫曼编码对应于通过二叉树的路径,其中1表示“向右移动”,0表示“向左移动”。与标准二叉树相反,哈夫曼树是自底向上构建的。构造始于初始化一个最小堆优先队列,其中包含语料库中的每个标记,优先级对应于标记频率。在每一步中,语料库中最不频繁的两个标记被移除,并成为一个父伪标记的子节点,其“频率”是其子节点频率的总和。将这个新的父伪标记添加到优先队列中,并递归重复这个过程,直到没有标记剩余。 # 参数 # text: 字符串列表或Vocabulary类的实例 # 标记化的文本或用于构建哈夫曼编码的预训练Vocabulary对象。 def fit(self, text): # 构建哈夫曼树 self._build_tree(text) # 生成编码 self._generate_codes() def transform(self, text): """ Transform the words in `text` into their Huffman-code representations. Parameters ---------- text: list of `N` strings The list of words to encode Returns ------- codes : list of `N` binary strings The encoded words in `text` """ # 如果输入的是字符串,则转换为包含该字符串的列表 if isinstance(text, str): text = [text] # 遍历文本中的每个单词 for token in set(text): # 如果单词不在 Huffman 树中,则抛出警告并跳过 if token not in self._item2code: raise Warning("Token '{}' not in Huffman tree. Skipping".format(token)) # 返回每个单词的 Huffman 编码 return [self._item2code.get(t, None) for t in text] def inverse_transform(self, codes): """ Transform an encoded sequence of bit-strings back into words. Parameters ---------- codes : list of `N` binary strings A list of encoded bit-strings, represented as strings. Returns ------- text: list of `N` strings The decoded text. """ # 如果输入的是字符串,则转换为包含该字符串的列表 if isinstance(codes, str): codes = [codes] # 遍历编码序列中的每个编码 for code in set(codes): # 如果编码不在 Huffman 树中,则抛出警告并跳过 if code not in self._code2item: raise Warning("Code '{}' not in Huffman tree. Skipping".format(code)) # 返回每个编码对应的单词 return [self._code2item.get(c, None) for c in codes] @property def tokens(self): """A list the unique tokens in `text`""" # 返回 Huffman 树中的所有唯一单词 return list(self._item2code.keys()) @property def codes(self): """A list with the Huffman code for each unique token in `text`""" # 返回 Huffman 树中每个唯一单词的 Huffman 编码 return list(self._code2item.keys()) def _counter(self, text): counts = {} # 统计文本中每个单词的出现次数 for item in text: counts[item] = counts.get(item, 0) + 1 return counts # 构建哈夫曼树 def _build_tree(self, text): """Construct Huffman Tree""" # 初始化优先队列 PQ = [] # 如果输入是 Vocabulary 对象,则使用其 counts 属性 if isinstance(text, Vocabulary): counts = text.counts else: # 否则使用 _counter 方法计算频率 counts = self._counter(text) # 将每个字符及其频率作为节点加入优先队列 for (k, c) in counts.items(): PQ.append(Node(k, c)) # 创建一个优先队列,优先级为频率 heapq.heapify(PQ) # 构建哈夫曼树 while len(PQ) > 1: node1 = heapq.heappop(PQ) # 弹出频率最小的节点 node2 = heapq.heappop(PQ) # 弹出频率第二小的节点 parent = Node(None, node1.val + node2.val) parent.left = node1 parent.right = node2 heapq.heappush(PQ, parent) self._root = heapq.heappop(PQ) # 生成编码 def _generate_codes(self): current_code = "" self._item2code = {} self._code2item = {} self._build_code(self._root, current_code) # 递归构建编码 def _build_code(self, root, current_code): if root is None: return if root.key is not None: # 将叶子节点的字符与编码对应存储 self._item2code[root.key] = current_code self._code2item[current_code] = root.key return # 0 = 向左移动,1 = 向右移动 self._build_code(root.left, current_code + "0") self._build_code(root.right, current_code + "1") # 定义 Token 类,用于表示一个单词的计数和内容 class Token: def __init__(self, word): # 初始化单词计数为 0 self.count = 0 # 初始化单词内容 self.word = word def __repr__(self): """A string representation of the token""" # 返回 Token 对象的字符串表示,包括单词内容和计数 return "Token(word='{}', count={})".format(self.word, self.count) # 定义 TFIDFEncoder 类,用于计算 TF-IDF 编码 class TFIDFEncoder: def __init__( self, vocab=None, lowercase=True, min_count=0, smooth_idf=True, max_tokens=None, input_type="files", filter_stopwords=True, filter_punctuation=True, tokenizer="words", ): # 初始化 TFIDFEncoder 对象的各种参数 # 定义内部方法 _encode_document,用于对文档进行编码 def _encode_document( self, doc, word2idx, idx2word, tokens, doc_count, bol_ix, eol_ix, ): """Perform tokenization and compute token counts for a single document""" # 获取超参数 H = self.hyperparameters # 是否转换为小写 lowercase = H["lowercase"] # 是否过滤停用词 filter_stop = H["filter_stopwords"] # 是否过滤标点符号 filter_punc = H["filter_punctuation"] # 如果输入类型为文件 if H["input_type"] == "files": # 打开文件并读取内容 with open(doc, "r", encoding=H["encoding"]) as handle: doc = handle.read() # 定义不同类型的分词器 tokenizer_dict = { "words": tokenize_words, "characters": tokenize_chars, "whitespace": tokenize_whitespace, "bytes": tokenize_bytes_raw, } # 根据超参数选择相应的分词器 tokenizer = tokenizer_dict[H["tokenizer"]] # 初始化单词数量 n_words = 0 # 将文档按行分割 lines = doc.split("\n") # 遍历每一行 for line in lines: # 对每一行进行分词 words = tokenizer( line, lowercase=lowercase, filter_stopwords=filter_stop, filter_punctuation=filter_punc, encoding=H["encoding"], ) # 过滤词汇表中不存在的词 words = self._filter_vocab(words) # 更新单词数量 n_words += len(words) # 遍历每个词 for ww in words: # 如果词不在 word2idx 中,则添加 if ww not in word2idx: word2idx[ww] = len(tokens) idx2word[len(tokens)] = ww tokens.append(Token(ww)) # 获取词的索引 t_idx = word2idx[ww] # 更新词频 tokens[t_idx].count += 1 # 更新文档中词的出现次数 doc_count[t_idx] = doc_count.get(t_idx, 0) + 1 # 在每行开头和结尾添加 <bol> 和 <eol> 标签 tokens[bol_ix].count += 1 tokens[eol_ix].count += 1 doc_count[bol_ix] = doc_count.get(bol_ix, 0) + 1 doc_count[eol_ix] = doc_count.get(eol_ix, 0) + 1 # 返回单词到索引的映射、索引到单词的映射、单词列表、文档中单词出现次数 return word2idx, idx2word, tokens, doc_count # 保留前 N 个最频繁出现的词汇 def _keep_top_n_tokens(self): # 获取最大词汇数 N = self.hyperparameters["max_tokens"] # 初始化词汇计数、词汇到索引、索引到词汇的字典 doc_counts, word2idx, idx2word = {}, {}, {} # 根据词汇出现次数排序词汇列表 tokens = sorted(self._tokens, key=lambda x: x.count, reverse=True) # 重新索引前 N 个词汇... unk_ix = None for idx, tt in enumerate(tokens[:N]): word2idx[tt.word] = idx idx2word[idx] = tt.word # 如果 <unk> 不在前 N 个词汇中,将其添加进去,替换第 N 个最频繁出现的词汇,并相应调整 <unk> 的计数... if tt.word == "<unk>": unk_ix = idx # ... 最后,将所有被删除的词汇重新编码为 "<unk>" for tt in tokens[N:]: tokens[unk_ix].count += tt.count # ... 最后,重新为每个文档重新索引词汇计数 for d_ix in self.term_freq.keys(): doc_counts[d_ix] = {} for old_ix, d_count in self.term_freq[d_ix].items(): word = self.idx2token[old_ix] new_ix = word2idx.get(word, unk_ix) doc_counts[d_ix][new_ix] = doc_counts[d_ix].get(new_ix, 0) + d_count # 更新词汇列表、词汇到索引、索引到词汇的字典以及文档词频 self._tokens = tokens[:N] self.token2idx = word2idx self.idx2token = idx2word self.term_freq = doc_counts # 断言词汇列表长度不超过 N assert len(self._tokens) <= N def _drop_low_freq_tokens(self): """ 替换所有出现次数少于 `min_count` 的标记为 `<unk>` 标记。 """ H = self.hyperparameters # 获取 `<unk>` 标记的索引 unk_token = self._tokens[self.token2idx["<unk>"]] # 获取 `<eol>` 标记的索引 eol_token = self._tokens[self.token2idx["<eol>"]] # 获取 `<bol>` 标记的索引 bol_token = self._tokens[self.token2idx["<bol>"]] # 初始化特殊标记列表 tokens = [unk_token, eol_token, bol_token] # 初始化 `<unk>` 标记的索引 unk_idx = 0 # 初始化特殊标记到索引的映射 word2idx = {"<unk>": 0, "<eol>": 1, "<bol>": 2} # 初始化索引到特殊标记的映射 idx2word = {0: "<unk>", 1: "<eol>", 2: "<bol>"} # 初始化特殊标记集合 special = {"<eol>", "<bol>", "<unk>"} # 遍历所有标记 for tt in self._tokens: # 如果标记不是特殊标记 if tt.word not in special: # 如果标记出现次数小于 `min_count` if tt.count < H["min_count"]: # 将出现次数加到 `<unk>` 标记上 tokens[unk_idx].count += tt.count else: # 更新标记到索引的映射 word2idx[tt.word] = len(tokens) # 更新索引到标记的映射 idx2word[len(tokens)] = tt.word # 添加标记到列表中 tokens.append(tt) # 重新索引文档计数 doc_counts = {} for d_idx in self.term_freq.keys(): doc_counts[d_idx] = {} for old_idx, d_count in self.term_freq[d_idx].items(): word = self.idx2token[old_idx] new_idx = word2idx.get(word, unk_idx) doc_counts[d_idx][new_idx] = doc_counts[d_idx].get(new_idx, 0) + d_count # 更新标记列表 self._tokens = tokens # 更新标记到索引的映射 self.token2idx = word2idx # 更新索引到标记的映射 self.idx2token = idx2word # 更新文档计数 self.term_freq = doc_counts # 对 tokens 进行排序,按字母顺序排序并重新编码 def _sort_tokens(self): # 初始化索引 ix = 0 # 初始化 token 到索引和索引到 token 的字典 token2idx, idx2token, = ( {}, {}, ) # 特殊 token 列表 special = ["<eol>", "<bol>", "<unk>"] # 对 token2idx 字典中的键进行排序 words = sorted(self.token2idx.keys()) # 初始化 term_freq 字典 term_freq = {d: {} for d in self.term_freq.keys()} # 遍历排序后的 tokens for w in words: # 如果当前 token 不在特殊 token 列表中 if w not in special: # 获取当前 token 的旧索引 old_ix = self.token2idx[w] # 更新 token2idx 和 idx2token 字典 token2idx[w], idx2token[ix] = ix, w # 更新 term_freq 字典 for d in self.term_freq.keys(): if old_ix in self.term_freq[d]: count = self.term_freq[d][old_ix] term_freq[d][ix] = count ix += 1 # 处理特殊 token for w in special: token2idx[w] = len(token2idx) idx2token[len(idx2token)] = w # 更新对象的 token2idx、idx2token、term_freq 和 vocab_counts 属性 self.token2idx = token2idx self.idx2token = idx2token self.term_freq = term_freq self.vocab_counts = Counter({t.word: t.count for t in self._tokens}) def _calc_idf(self): """ 计算语料库中每个标记的(平滑的)逆文档频率。 对于一个单词标记 `w`,IDF 简单地定义为 IDF(w) = log ( |D| / |{ d in D: w in d }| ) + 1 其中 D 是语料库中所有文档的集合, D = {d1, d2, ..., dD} 如果 `smooth_idf` 为 True,我们对包含给定单词的文档数量进行加法平滑处理,相当于假设存在第 D+1 个文档,其中包含语料库中的每个单词: SmoothedIDF(w) = log ( |D| + 1 / [1 + |{ d in D: w in d }|] ) + 1 """ inv_doc_freq = {} smooth_idf = self.hyperparameters["smooth_idf"] tf, doc_idxs = self.term_freq, self._idx2doc.keys() D = len(self._idx2doc) + int(smooth_idf) for word, w_ix in self.token2idx.items(): d_count = int(smooth_idf) d_count += np.sum([1 if w_ix in tf[d_ix] else 0 for d_ix in doc_idxs]) inv_doc_freq[w_ix] = 1 if d_count == 0 else np.log(D / d_count) + 1 self.inv_doc_freq = inv_doc_freq def transform(self, ignore_special_chars=True): """ 生成文本语料库的词频-逆文档频率编码。 Parameters ---------- ignore_special_chars : bool 是否从最终的tfidf编码中删除与"<eol>", "<bol>", "<unk>"标记对应的列。默认为True。 Returns ------- tfidf : numpy array of shape `(D, M [- 3])` 编码后的语料库,每行对应一个文档,每列对应一个标记ID。如果`ignore_special_chars`为False,则在`idx2token`属性中存储列号与标记之间的映射。否则,映射不准确。 """ D, N = len(self._idx2doc), len(self._tokens) # 初始化词频矩阵和逆文档频率矩阵 tf = np.zeros((D, N)) idf = np.zeros((D, N)) # 遍历文档索引 for d_ix in self._idx2doc.keys(): # 获取文档中的词和词频 words, counts = zip(*self.term_freq[d_ix].items()) # 创建文档索引数组 docs = np.ones(len(words), dtype=int) * d_ix # 更新词频矩阵 tf[docs, words] = counts # 获取所有词的排序列表 words = sorted(self.idx2token.keys()) # 根据词的逆文档频率创建矩阵 idf = np.tile(np.array([self.inv_doc_freq[w] for w in words]), (D, 1)) # 计算tfidf矩阵 tfidf = tf * idf # 如果忽略特殊字符 if ignore_special_chars: # 获取特殊字符的索引 idxs = [ self.token2idx["<unk>"], self.token2idx["<eol>"], self.token2idx["<bol>"], ] # 从tfidf矩阵中删除特殊字符列 tfidf = np.delete(tfidf, idxs, 1) # 返回tfidf矩阵 return tfidf # 定义一个名为 Vocabulary 的类 class Vocabulary: # 初始化方法,设置类的属性 def __init__( self, lowercase=True, # 是否将单词转换为小写,默认为True min_count=None, # 单词最小出现次数,默认为None max_tokens=None, # 最大单词数量,默认为None filter_stopwords=True, # 是否过滤停用词,默认为True filter_punctuation=True, # 是否过滤标点符号,默认为True tokenizer="words", # 分词器类型,默认为"words" ): """ 用于编译和编码文本语料库中唯一标记的对象。 参数 ---------- lowercase : bool 是否在标记化之前将每个字符串转换为小写。 默认为 True。 min_count : int 标记必须出现的最小次数才能包含在词汇表中。 如果为 `None`,则在词汇表中包含来自 `corpus_fp` 的所有标记。 默认为 None。 max_tokens : int 仅将出现次数超过 `min_count` 的前 `max_tokens` 个最常见标记添加到词汇表中。 如果为 None,则添加所有出现次数超过 `min_count` 的标记。 默认为 None。 filter_stopwords : bool 是否在对语料库中的单词进行编码之前删除停用词。 默认为 True。 filter_punctuation : bool 是否在对语料库中的单词进行编码之前删除标点符号。 默认为 True。 tokenizer : {'whitespace', 'words', 'characters', 'bytes'} 在将字符串映射到标记时要遵循的策略。 `'whitespace'` 标记化器在空格字符处拆分字符串。 `'words'` 标记化器使用“单词”正则表达式拆分字符串。 `'characters'` 标记化器将字符串拆分为单个字符。 `'bytes'` 标记化器将字符串拆分为一组单个字节。 """ self.hyperparameters = { "id": "Vocabulary", "encoding": None, "corpus_fps": None, "lowercase": lowercase, "min_count": min_count, "max_tokens": max_tokens, "filter_stopwords": filter_stopwords, "filter_punctuation": filter_punctuation, "tokenizer": tokenizer, } def __len__(self): """返回词汇表中标记的数量""" return len(self._tokens) # 返回一个迭代器,用于遍历词汇表中的标记 def __iter__(self): return iter(self._tokens) # 判断给定的单词是否是词汇表中的一个标记 def __contains__(self, word): return word in self.token2idx # 根据键返回词汇表中的标记(如果键是整数)或索引(如果键是字符串) def __getitem__(self, key): if isinstance(key, str): return self._tokens[self.token2idx[key]] if isinstance(key, int): return self._tokens[key] # 返回词汇表中唯一单词标记的数量 @property def n_tokens(self): return len(self.token2idx) # 返回语料库中单词的总数 @property def n_words(self): return sum(self.counts.values()) # 返回词汇表中唯一单词标记的形状 @property def shape(self): return self._tokens.shape # 返回语料库中出现频率最高的前n个标记 def most_common(self, n=5): return self.counts.most_common()[:n] # 返回在语料库中出现k次的所有标记 def words_with_count(self, k): return [w for w, c in self.counts.items() if c == k] def filter(self, words, unk=True): # noqa: A003 """ Filter (or replace) any word in `words` that is not present in `Vocabulary`. Parameters ---------- words : list of strs A list of words to filter unk : bool Whether to replace any out of vocabulary words in `words` with the ``<unk>`` token (True) or skip them entirely (False). Default is True. Returns ------- filtered : list of strs The list of words filtered against the words in Vocabulary. """ # 如果 unk 为 True,则将不在 Vocabulary 中的单词替换为 "<unk>",否则跳过 if unk: return [w if w in self else "<unk>" for w in words] # 如果 unk 为 False,则只保留在 Vocabulary 中的单词 return [w for w in words if w in self] def words_to_indices(self, words): """ Convert the words in `words` to their token indices. If a word is not in the vocabulary, return the index for the ``<unk>`` token Parameters ---------- words : list of strs A list of words to filter Returns ------- indices : list of ints The token indices for each word in `words` """ # 获取 "<unk>" 的索引 unk_ix = self.token2idx["<unk>"] # 获取是否转换为小写的设置 lowercase = self.hyperparameters["lowercase"] # 如果需要转换为小写,则将单词列表中的单词转换为小写 words = [w.lower() for w in words] if lowercase else words # 将单词转换为它们在词汇表中的索引,如果不在词汇表中,则返回 "<unk>" 的索引 return [self.token2idx[w] if w in self else unk_ix for w in words] def indices_to_words(self, indices): """ Convert the indices in `indices` to their word values. If an index is not in the vocabulary, return the ``<unk>`` token. Parameters ---------- indices : list of ints The token indices for each word in `words` Returns ------- words : list of strs The word strings corresponding to each token index in `indices` """ # 设置 "<unk>" 标记 unk = "<unk>" # 将索引转换为对应的单词,如果索引不在词汇表中,则返回 "<unk>" return [self.idx2token[i] if i in self.idx2token else unk for i in indices] # 保留词汇表中出现频率最高的前 N 个词的索引 def _keep_top_n_tokens(self): # 初始化空字典,用于存储词汇表中词语到索引的映射关系 word2idx, idx2word = {}, {} # 获取最大词汇量 N N = self.hyperparameters["max_tokens"] # 根据词频对词汇表中的词进行排序 tokens = sorted(self._tokens, key=lambda x: x.count, reverse=True) # 重新索引前 N 个词... unk_ix = None for idx, tt in enumerate(tokens[:N]): # 将词语和对应的索引存入字典中 word2idx[tt.word] = idx idx2word[idx] = tt.word # 如果词语是 "<unk>",记录其索引 if tt.word == "<unk>": unk_ix = idx # ... 如果 "<unk>" 不在前 N 个词中,将其添加进去,替换第 N 个最常见的词,并相应调整 "<unk>" 的计数 ... if unk_ix is None: unk_ix = self.token2idx["<unk>"] old_count = tokens[N - 1].count tokens[N - 1] = self._tokens[unk_ix] tokens[N - 1].count += old_count word2idx["<unk>"] = N - 1 idx2word[N - 1] = "<unk>" # ... 将所有被删除的词重新编码为 "<unk>" for tt in tokens[N:]: tokens[unk_ix].count += tt.count # 更新词汇表为前 N 个词 self._tokens = tokens[:N] self.token2idx = word2idx self.idx2token = idx2word # 断言词汇表长度不超过 N assert len(self._tokens) <= N def _drop_low_freq_tokens(self): """ Replace all tokens that occur less than `min_count` with the `<unk>` token. """ # 获取 `<unk>` token 的索引 unk_idx = 0 # 获取 `<unk>`、`<eol>`、`<bol>` token 对应的索引 unk_token = self._tokens[self.token2idx["<unk>"]] eol_token = self._tokens[self.token2idx["<eol>"]] bol_token = self._tokens[self.token2idx["<bol>"]] # 获取超参数 H = self.hyperparameters # 初始化特殊 token 列表 tokens = [unk_token, eol_token, bol_token] # 初始化特殊 token 到索引的映射 word2idx = {"<unk>": 0, "<eol>": 1, "<bol>": 2} # 初始化索引到特殊 token 的映射 idx2word = {0: "<unk>", 1: "<eol>", 2: "<bol>"} # 特殊 token 集合 special = {"<eol>", "<bol>", "<unk>"} # 遍历所有 token for tt in self._tokens: # 如果 token 不是特殊 token if tt.word not in special: # 如果 token 出现次数小于 min_count if tt.count < H["min_count"]: # 将出现次数小于 min_count 的 token 替换为 `<unk>` token tokens[unk_idx].count += tt.count else: # 更新 token 到索引的映射 word2idx[tt.word] = len(tokens) # 更新索引到 token 的映射 idx2word[len(tokens)] = tt.word # 添加当前 token 到 tokens 列表中 tokens.append(tt) # 更新 tokens 列表 self._tokens = tokens # 更新 token 到索引的映射 self.token2idx = word2idx # 更新索引到 token 的映射 self.idx2token = idx2word
Preprocessing
The preprocessing module implements common data preprocessing routines.
nlp.py
: Routines and objects for handling text data.- n-gram generators
- Word and character tokenization
- Punctuation and stop-word removal
- Vocabulary / unigram count objects
- Byte-pair encoding (Gage, 1994; Sennrich, Haddow, & Birch, 2015)
- Huffman tree encoding / decoding
- Term frequency-inverse document frequency (tf-idf) encoding
dsp.py
: Routines for handling audio and image data.- Signal windowing
- Signal autocorrelation
- Discrete Fourier transform
- Discrete cosine transform (type II)
- Signal resampling via (bi-)linear interpolation and nearest neighbor
- Mel-frequency cepstral coefficients (MFCCs) (Mermelstein, 1976; Davis & Mermelstein, 1980)
general.py
: General data preprocessing objects and functions.- Feature hashing (Moody, 1989)
- Mini-batch generators
- One-hot encoding / decoding
- Feature standardization
numpy-ml\numpy_ml\preprocessing\__init__.py
# 从当前目录中导入 general 模块 from . import general # 从当前目录中导入 nlp 模块 from . import nlp # 从当前目录中导入 dsp 模块 from . import dsp
Models
This repo includes code for the following models:
- Gaussian mixture model
- EM training
- Hidden Markov model
- Viterbi decoding
- Likelihood computation
- MLE parameter estimation via Baum-Welch/forward-backward algorithm
- Latent Dirichlet allocation (topic model)
- Standard model with MLE parameter estimation via variational EM
- Smoothed model with MAP parameter estimation via MCMC
- Neural networks
- Layers / Layer-wise ops
- Add
- Flatten
- Multiply
- Softmax
- Fully-connected/Dense
- Sparse evolutionary connections
- LSTM
- Elman-style RNN
- Max + average pooling
- Dot-product attention
- Embedding layer
- Restricted Boltzmann machine (w. CD-n training)
- 2D deconvolution (w. padding and stride)
- 2D convolution (w. padding, dilation, and stride)
- 1D convolution (w. padding, dilation, stride, and causality)
- Modules
- Bidirectional LSTM
- ResNet-style residual blocks (identity and convolution)
- WaveNet-style residual blocks with dilated causal convolutions
- Transformer-style multi-headed scaled dot product attention
- Regularizers
- Dropout
- Normalization
- Batch normalization (spatial and temporal)
- Layer normalization (spatial and temporal)
- Optimizers
- SGD w/ momentum
- AdaGrad
- RMSProp
- Adam
- Learning Rate Schedulers
- Constant
- Exponential
- Noam/Transformer
- Dlib scheduler
- Weight Initializers
- Glorot/Xavier uniform and normal
- He/Kaiming uniform and normal
- Standard and truncated normal
- Losses
- Cross entropy
- Squared error
- Bernoulli VAE loss
- Wasserstein loss with gradient penalty
- Noise contrastive estimation loss
- Activations
- ReLU
- Tanh
- Affine
- Sigmoid
- Leaky ReLU
- ELU
- SELU
- Exponential
- Hard Sigmoid
- Softplus
- Models
- Bernoulli variational autoencoder
- Wasserstein GAN with gradient penalty
- word2vec encoder with skip-gram and CBOW architectures
- Utilities
col2im
(MATLAB port)im2col
(MATLAB port)conv1D
conv2D
deconv2D
minibatch
- Layers / Layer-wise ops
- Tree-based models
- Decision trees (CART)
- [Bagging] Random forests
- [Boosting] Gradient-boosted decision trees
- Linear models
- Ridge regression
- Logistic regression
- Ordinary least squares
- Gaussian naive Bayes classifier
- Generalized linear model (identity, log, and logit links)
- Bayesian linear regression w/ conjugate priors
- Unknown mean, known variance (Gaussian prior)
- Unknown mean, unknown variance (Normal-Gamma / Normal-Inverse-Wishart prior)
- n-Gram sequence models
- Maximum likelihood scores
- Additive/Lidstone smoothing
- Simple Good-Turing smoothing
- Multi-armed bandit models
- UCB1
- LinUCB
- Epsilon-greedy
- Thompson sampling w/ conjugate priors
- Beta-Bernoulli sampler
- LinUCB
- Reinforcement learning models
- Cross-entropy method agent
- First visit on-policy Monte Carlo agent
- Weighted incremental importance sampling Monte Carlo agent
- Expected SARSA agent
- TD-0 Q-learning agent
- Dyna-Q / Dyna-Q+ with prioritized sweeping
- Nonparameteric models
- Nadaraya-Watson kernel regression
- k-Nearest neighbors classification and regression
- Gaussian process regression
- Matrix factorization
- Regularized alternating least-squares
- Non-negative matrix factorization
- Preprocessing
- Discrete Fourier transform (1D signals)
- Discrete cosine transform (type-II) (1D signals)
- Bilinear interpolation (2D signals)
- Nearest neighbor interpolation (1D and 2D signals)
- Autocorrelation (1D signals)
- Signal windowing
- Text tokenization
- Feature hashing
- Feature standardization
- One-hot encoding / decoding
- Huffman coding / decoding
- Byte pair encoding / decoding
- Term frequency-inverse document frequency (TF-IDF) encoding
- MFCC encoding
- Utilities
- Similarity kernels
- Distance metrics
- Priority queue
- Ball tree
- Discrete sampler
- Graph processing and generators
numpy-ml\numpy_ml\rl_models\agents.py
# 引入必要的库 from abc import ABC, abstractmethod from collections import defaultdict import numpy as np # 从自定义的 rl_utils 模块中引入 EnvModel, env_stats, tile_state_space from .rl_utils import EnvModel, env_stats, tile_state_space # 从自定义的 data_structures 模块中引入 Dict from ..utils.data_structures import Dict # 定义一个抽象基类 AgentBase class AgentBase(ABC): # 初始化 AgentBase 类 def __init__(self, env): super().__init__() self.env = env self.parameters = {
} self.hyperparameters = {
} self.derived_variables = {
} self.env_info = env_stats(env) # 创建观测和动作的映射字典 def _create_2num_dicts(self, obs_encoder=None, act_encoder=None): E = self.env_info n_states = np.prod(E["n_obs_per_dim"]) n_actions = np.prod(E["n_actions_per_dim"]) # 创建动作到标量的字典和标量到动作的字典 self._num2action = Dict() self._action2num = Dict(act_encoder) if n_actions != np.inf: self._action2num = {
act: i for i, act in enumerate(E["action_ids"])} self._num2action = {
i: act for act, i in self._action2num.items()} # 创建观测到标量的字典和标量到观测的字典 self._num2obs = Dict() self._obs2num = Dict(obs_encoder) if n_states != np.inf: self._obs2num = {
act: i for i, act in enumerate(E["obs_ids"])} self._num2obs = {
i: act for act, i in self._obs2num.items()} # 清空历史记录 def flush_history(self): """Clear the episode history""" for k, v in self.episode_history.items(): self.episode_history[k] = [] # 抽象方法,根据当前观测生成动作 @abstractmethod def act(self, obs): """Generate an action given the current observation""" raise NotImplementedError # 抽象方法,采取贪婪策略 @abstractmethod def greedy_policy(self, kwargs): """ Take a greedy action. Returns ------- total_reward : float The total reward on the episode. n_steps : float The total number of steps taken on the episode. """ raise NotImplementedError @abstractmethod # 定义一个方法,用于运行 agent 在一个单独的 episode 上 def run_episode(self, max_steps, render=False): """ Run the agent on a single episode. Parameters ---------- max_steps : int The maximum number of steps to run an episode render : bool Whether to render the episode during training Returns ------- reward : float The total reward on the episode, averaged over the theta samples. steps : float The total number of steps taken on the episode, averaged over the theta samples. """ # 抛出未实现的错误,需要在子类中实现该方法 raise NotImplementedError # 定义一个抽象方法,用于更新 agent 的参数根据当前 episode 上获得的奖励 @abstractmethod def update(self): r""" Update the agent parameters according to the rewards accrued on the current episode. Returns ------- avg_reward : float The average reward earned by the best `retain_prcnt` theta samples on the current episode. """ # 抛出未实现的错误,需要在子类中实现该方法 raise NotImplementedError class CrossEntropyAgent(AgentBase): # 定义交叉熵代理类,继承自AgentBase基类 def _init_params(self): # 初始化参数方法 E = self.env_info # 获取环境信息 assert not E["continuous_actions"], "Action space must be discrete" # 断言动作空间必须是离散的 self._create_2num_dicts() # 调用私有方法创建两个数字字典 b_len = np.prod(E["n_actions_per_dim"]) # 计算动作维度的乘积作为b_len W_len = b_len * np.prod(E["obs_dim"]) # 计算观测维度的乘积与b_len相乘作为W_len theta_dim = b_len + W_len # 计算theta的维度 # init mean and variance for mv gaussian with dimensions theta_dim # 初始化维度为theta_dim的多变量高斯分布的均值和方差 theta_mean = np.random.rand(theta_dim) # 生成theta_dim维度的随机均值 theta_var = np.ones(theta_dim) # 生成theta_dim维度的方差为1的数组 self.parameters = {
"theta_mean": theta_mean, "theta_var": theta_var} # 设置参数字典包含均值和方差 self.derived_variables = {
"b_len": b_len, "W_len": W_len, "W_samples": [], "b_samples": [], "episode_num": 0, "cumulative_rewards": [], } # 设置派生变量字典包含b_len、W_len、W_samples、b_samples、episode_num和cumulative_rewards self.hyperparameters = {
"agent": "CrossEntropyAgent", "retain_prcnt": self.retain_prcnt, "n_samples_per_episode": self.n_samples_per_episode, } # 设置超参数字典包含代理名称、保留百分比和每个episode的样本数 self.episode_history = {
"rewards": [], "state_actions": []} # 设置episode历史字典包含奖励和状态动作对 def act(self, obs): r""" Generate actions according to a softmax policy. Notes ----- The softmax policy assumes that the pmf over actions in state :math:`x_t` is given by: .. math:: \pi(a | x^{(t)}) = \text{softmax}( \text{obs}^{(t)} \cdot \mathbf{W}_i^{(t)} + \mathbf{b}_i^{(t)} ) where :math:`\mathbf{W}` is a learned weight matrix, `obs` is the observation at timestep `t`, and b is a learned bias vector. Parameters ---------- obs : int or :py:class:`ndarray <numpy.ndarray>` An observation from the environment. Returns ------- action : int, float, or :py:class:`ndarray <numpy.ndarray>` An action sampled from the distribution over actions defined by the softmax policy. """ E, P = self.env_info, self.parameters W, b = P["W"], P["b"] s = self._obs2num[obs] s = np.array([s]) if E["obs_dim"] == 1 else s # compute softmax # 计算 softmax 分布的分子部分 Z = s.T @ W + b # 对分子部分进行指数化,减去最大值以防止数值不稳定 e_Z = np.exp(Z - np.max(Z, axis=-1, keepdims=True)) # 计算 softmax 分布 action_probs = e_Z / e_Z.sum(axis=-1, keepdims=True) # sample action # 从 softmax 分布中采样一个动作 a = np.random.multinomial(1, action_probs).argmax() # 返回对应动作的编号 return self._num2action[a] # 运行智能体在单个 episode 上的操作 def run_episode(self, max_steps, render=False): """ Run the agent on a single episode. Parameters ---------- max_steps : int The maximum number of steps to run an episode render : bool Whether to render the episode during training Returns ------- reward : float The total reward on the episode, averaged over the theta samples. steps : float The total number of steps taken on the episode, averaged over the theta samples. """ # 从 theta 样本中采样 self._sample_thetas() # 获取环境信息和派生变量 E, D = self.env_info, self.derived_variables n_actions = np.prod(E["n_actions_per_dim"]) W_len, obs_dim = D["W_len"], E["obs_dim"] steps, rewards = [], [] # 遍历 theta 样本 for theta in D["theta_samples"]: W = theta[:W_len].reshape(obs_dim, n_actions) b = theta[W_len:] # 运行 episode,获取总奖励和步数 total_rwd, n_steps = self._episode(W, b, max_steps, render) rewards.append(total_rwd) steps.append(n_steps) # 返回当前 episode 所有样本的平均奖励和平均步数 D["episode_num"] += 1 D["cumulative_rewards"] = rewards return np.mean(D["cumulative_rewards"]), np.mean(steps) def _episode(self, W, b, max_steps, render): """ Run the agent for an episode. Parameters ---------- W : :py:class:`ndarray <numpy.ndarray>` of shape `(obs_dim, n_actions)` The weights for the softmax policy. b : :py:class:`ndarray <numpy.ndarray>` of shape `(bias_len, )` The bias for the softmax policy. max_steps : int The maximum number of steps to run the episode. render : bool Whether to render the episode during training. Returns ------- reward : float The total reward on the episode. steps : float The total number of steps taken on the episode. """ # 初始化奖励列表和状态-动作对列表 rwds, sa = [], [] # 获取当前 episode 的历史记录 H = self.episode_history # 初始化总奖励和步数 total_reward, n_steps = 0.0, 1 # 重置环境并获取初始观察 obs = self.env.reset() # 更新策略参数 self.parameters["W"] = W self.parameters["b"] = b # 循环执行每一步 for i in range(max_steps): # 如果需要渲染环境,则进行渲染 if render: self.env.render() # 增加步数计数 n_steps += 1 # 根据当前观察选择动作 action = self.act(obs) # 将观察和动作转换为数字编码 s, a = self._obs2num[obs], self._action2num[action] sa.append((s, a)) # 执行动作,获取下一个观察和奖励 obs, reward, done, _ = self.env.step(action) rwds.append(reward) total_reward += reward # 如果 episode 结束,则跳出循环 if done: break # 将奖励列表和状态-动作对列表添加到历史记录中 H["rewards"].append(rwds) H["state_actions"].append(sa) # 返回总奖励和步数 return total_reward, n_steps # 更新 mu 和 Sigma,根据当前 episode 中获得的奖励 def update(self): # 获取派生变量和参数 D, P = self.derived_variables, self.parameters # 计算需要保留的样本数量 n_retain = int(self.retain_prcnt * self.n_samples_per_episode) # 对每个 theta 样本的累积奖励进行排序,从大到小 sorted_y_val_idxs = np.argsort(D["cumulative_rewards"])[::-1] top_idxs = sorted_y_val_idxs[:n_retain] # 使用最佳 theta 值更新 theta_mean 和 theta_var P["theta_mean"] = np.mean(D["theta_samples"][top_idxs], axis=0) P["theta_var"] = np.var(D["theta_samples"][top_idxs], axis=0) # 从具有均值为 theta_mean 和协方差为 diag(theta_var) 的多高斯分布中采样 n_samples_per_episode 个 theta def _sample_thetas(self): P, N = self.parameters, self.n_samples_per_episode Mu, Sigma = P["theta_mean"], np.diag(P["theta_var"]) # 从多高斯分布中生成样本 samples = np.random.multivariate_normal(Mu, Sigma, N) # 将生成的样本保存在派生变量中 self.derived_variables["theta_samples"] = samples # 定义一个贪婪策略函数,使用当前代理参数执行 def greedy_policy(self, max_steps, render=True): """ Execute a greedy policy using the current agent parameters. Parameters ---------- max_steps : int The maximum number of steps to run the episode. render : bool Whether to render the episode during execution. Returns ------- total_reward : float The total reward on the episode. n_steps : float The total number of steps taken on the episode. """ # 获取环境信息、派生变量和参数 E, D, P = self.env_info, self.derived_variables, self.parameters # 获取参数中的均值和方差 Mu, Sigma = P["theta_mean"], np.diag(P["theta_var"]) # 从多正态分布中采样一个样本 sample = np.random.multivariate_normal(Mu, Sigma, 1) # 获取权重矩阵的长度和观测维度 W_len, obs_dim = D["W_len"], E["obs_dim"] # 计算动作空间的维度 n_actions = np.prod(E["n_actions_per_dim"]) # 从样本中提取权重矩阵和偏置向量 W = sample[0, :W_len].reshape(obs_dim, n_actions) b = sample[0, W_len:] # 执行一个 episode,返回总奖励和步数 total_reward, n_steps = self._episode(W, b, max_steps, render) # 返回总奖励和步数 return total_reward, n_steps class MonteCarloAgent(AgentBase): # 定义一个 Monte-Carlo 学习代理类,继承自 AgentBase 类 def __init__(self, env, off_policy=False, temporal_discount=0.9, epsilon=0.1): """ A Monte-Carlo learning agent trained using either first-visit Monte Carlo updates (on-policy) or incremental weighted importance sampling (off-policy). Parameters ---------- env : :class:`gym.wrappers` or :class:`gym.envs` instance The environment to run the agent on. off_policy : bool Whether to use a behavior policy separate from the target policy during training. If False, use the same epsilon-soft policy for both behavior and target policies. Default is False. temporal_discount : float between [0, 1] The discount factor used for downweighting future rewards. Smaller values result in greater discounting of future rewards. Default is 0.9. epsilon : float between [0, 1] The epsilon value in the epsilon-soft policy. Larger values encourage greater exploration during training. Default is 0.1. """ # 初始化 MonteCarloAgent 类的实例 super().__init__(env) # 设置 epsilon 值 self.epsilon = epsilon # 设置是否使用 off-policy self.off_policy = off_policy # 设置时间折扣因子 self.temporal_discount = temporal_discount # 初始化参数 self._init_params() # 初始化参数 def _init_params(self): # 获取环境信息 E = self.env_info # 确保动作空间是离散的 assert not E["continuous_actions"], "Action space must be discrete" # 确保观察空间是离散的 assert not E["continuous_observations"], "Observation space must be discrete" # 计算状态数量 n_states = np.prod(E["n_obs_per_dim"]) # 计算动作数量 n_actions = np.prod(E["n_actions_per_dim"]) # 创建状态和动作的映射字典 self._create_2num_dicts() # 行为策略是随机的,epsilon-soft策略 self.behavior_policy = self.target_policy = self._epsilon_soft_policy # 如果是离策略学习 if self.off_policy: # 初始化C矩阵 self.parameters["C"] = np.zeros((n_states, n_actions)) # 目标策略是确定性的,贪婪策略 self.target_policy = self._greedy # 初始化Q函数 self.parameters["Q"] = np.random.rand(n_states, n_actions) # 初始化每个状态-动作对的回报对象 self.derived_variables = {
"returns": {
(s, a): [] for s in range(n_states) for a in range(n_actions)}, "episode_num": 0, } # 设置超参数 self.hyperparameters = {
"agent": "MonteCarloAgent", "epsilon": self.epsilon, "off_policy": self.off_policy, "temporal_discount": self.temporal_discount, } # 初始化历史记录 self.episode_history = {
"state_actions": [], "rewards": []} # 定义一个贪婪行为策略函数,用于在离策略为真时使用 def _greedy(self, s, a=None): """ A greedy behavior policy. Notes ----- Only used when off-policy is True. Parameters ---------- s : int, float, or tuple The state number for the current observation, as returned by ``self._obs2num[obs]``. a : int, float, or tuple The action number in the current state, as returned by ``self._action2num[obs]``. If None, sample an action from the action probabilities in state `s`, otherwise, return the probability of action `a` under the greedy policy. Default is None. Returns ------- action : int, float, or :py:class:`ndarray <numpy.ndarray>` If `a` is None, this is an action sampled from the distribution over actions defined by the greedy policy. If `a` is not None, this is the probability of `a` under the greedy policy. """ # 根据状态 s 对应的 Q 值,找到最大值对应的动作 a_star = self.parameters["Q"][s, :].argmax() # 如果 a 为 None,则从贪婪策略中的动作概率分布中随机选择一个动作 if a is None: out = self._num2action[a_star] # 如果 a 不为 None,则返回 a 在贪婪策略下的概率 else: out = 1 if a == a_star else 0 # 返回结果 return out # 更新 Q 函数,使用基于策略的首次访问蒙特卡洛更新 def _on_policy_update(self): r""" Update the `Q` function using an on-policy first-visit Monte Carlo update. Notes ----- The on-policy first-visit Monte Carlo update is .. math:: Q'(s, a) \leftarrow \text{avg}(\text{reward following first visit to } (s, a) \text{ across all episodes}) RL agents seek to learn action values conditional on subsequent optimal behavior, but they need to behave non-optimally in order to explore all actions (to find the optimal actions). The on-policy approach is a compromise -- it learns action values not for the optimal policy, but for a *near*-optimal policy that still explores (the epsilon-soft policy). """ # 获取派生变量、参数和历史记录 D, P, HS = self.derived_variables, self.parameters, self.episode_history # 获取历史记录中的奖励和状态-动作对 ep_rewards = HS["rewards"] sa_tuples = set(HS["state_actions"]) # 找到每个状态-动作对第一次出现的位置 locs = [HS["state_actions"].index(sa) for sa in sa_tuples] # 计算每个状态-动作对的累积回报 cumulative_returns = [np.sum(ep_rewards[i:]) for i in locs] # 使用首次访问回报的平均值更新 Q 值 for (s, a), cr in zip(sa_tuples, cumulative_returns): # 将首次访问回报添加到返回值列表中 D["returns"][(s, a)].append(cr) # 更新 Q 值为返回值列表的平均值 P["Q"][s, a] = np.mean(D["returns"][(s, a)]) def _off_policy_update(self): """ Update `Q` using weighted importance sampling. Notes ----- In importance sampling updates, we account for the fact that we are updating a different policy from the one we used to generate behavior by weighting the accumulated rewards by the ratio of the probability of the trajectory under the target policy versus its probability under the behavior policies. This is known as the importance sampling weight. In weighted importance sampling, we scale the accumulated rewards for a trajectory by their importance sampling weight, then take the *weighted* average using the importance sampling weight. This weighted average then becomes the value for the trajectory. W = importance sampling weight G_t = total discounted reward from time t until episode end C_n = sum of importance weights for the first n rewards This algorithm converges to Q* in the limit. """ P = self.parameters HS = self.episode_history ep_rewards = HS["rewards"] T = len(ep_rewards) G, W = 0.0, 1.0 # 从最后一个时间步开始向前遍历 for t in reversed(range(T)): s, a = HS["state_actions"][t] # 计算从时间步 t 开始到结束的总折扣奖励 G = self.temporal_discount * G + ep_rewards[t] # 更新状态动作对 (s, a) 的重要性权重和 P["C"][s, a] += W # 使用加权重要性采样更新 Q(s, a) P["Q"][s, a] += (W / P["C"][s, a]) * (G - P["Q"][s, a]) # 将重要性采样比率乘以当前权重 W *= self.target_policy(s, a) / self.behavior_policy(s, a) # 如果权重为零,则终止循环 if W == 0.0: break # 定义一个方法,用于执行行为策略,生成训练过程中的动作 def act(self, obs): r""" Execute the behavior policy--an :math:`\epsilon`-soft policy used to generate actions during training. Parameters ---------- obs : int, float, or :py:class:`ndarray <numpy.ndarray>` as returned by ``env.step(action)`` An observation from the environment. Returns ------- action : int, float, or :py:class:`ndarray <numpy.ndarray>` An action sampled from the distribution over actions defined by the epsilon-soft policy. """ # noqa: E501 # 将观察值转换为数字 s = self._obs2num[obs] # 调用行为策略方法,返回动作 return self.behavior_policy(s) # 运行一个单独的 episode def run_episode(self, max_steps, render=False): """ Run the agent on a single episode. Parameters ---------- max_steps : int The maximum number of steps to run an episode. render : bool Whether to render the episode during training. Returns ------- reward : float The total reward on the episode. steps : float The number of steps taken on the episode. """ # 获取派生变量 D = self.derived_variables # 运行 episode,获取总奖励和步数 total_rwd, n_steps = self._episode(max_steps, render) # 更新 episode 数量 D["episode_num"] += 1 # 返回总奖励和步数 return total_rwd, n_steps def _episode(self, max_steps, render): """ Execute agent on an episode. Parameters ---------- max_steps : int The maximum number of steps to run the episode. render : bool Whether to render the episode during training. Returns ------- reward : float The total reward on the episode. steps : float The number of steps taken on the episode. """ # 重置环境并获取初始观察值 obs = self.env.reset() # 获取当前 episode 的历史记录 HS = self.episode_history # 初始化总奖励和步数 total_reward, n_steps = 0.0, 0 # 循环执行每一步直到达到最大步数 for i in range(max_steps): # 如果需要渲染,则显示环境 if render: self.env.render() # 增加步数计数 n_steps += 1 # 根据当前观察值选择动作 action = self.act(obs) # 将观察值和动作转换为数字 s = self._obs2num[obs] a = self._action2num[action] # 存储 (状态, 动作) 组 HS["state_actions"].append((s, a)) # 执行动作并获取奖励等信息 obs, reward, done, info = self.env.step(action) # 记录奖励 HS["rewards"].append(reward) total_reward += reward # 如果 episode 结束,则跳出循环 if done: break # 返回总奖励和步数 return total_reward, n_steps def update(self): """ Update the parameters of the model following the completion of an episode. Flush the episode history after the update is complete. """ # 获取超参数 H = self.hyperparameters # 如果是离线策略更新,则调用离线策略更新方法 if H["off_policy"]: self._off_policy_update() else: # 否则调用在线策略更新方法 self._on_policy_update() # 清空 episode 历史记录 self.flush_history() def greedy_policy(self, max_steps, render=True): """ Execute a greedy policy using the current agent parameters. Parameters ---------- max_steps : int The maximum number of steps to run the episode. render : bool Whether to render the episode during execution. Returns ------- total_reward : float The total reward on the episode. n_steps : float The total number of steps taken on the episode. """ # 获取当前的 episode 历史记录 H = self.episode_history # 重置环境并获取初始观察值 obs = self.env.reset() # 初始化总奖励和步数 total_reward, n_steps = 0.0, 0 # 循环执行最大步数 for i in range(max_steps): # 如果需要渲染环境,则进行渲染 if render: self.env.render() # 增加步数计数 n_steps += 1 # 根据当前观察值执行贪婪策略选择动作 action = self._greedy(obs) # 将观察值和动作转换为数字表示 s = self._obs2num[obs] a = self._action2num[action] # 存储 (状态, 动作) 组 H["state_actions"].append((s, a)) # 执行动作 obs, reward, done, info = self.env.step(action) # 记录奖励 H["rewards"].append(reward) total_reward += reward # 如果 episode 结束,则跳出循环 if done: break # 返回总奖励和步数 return total_reward, n_steps # 定义一个 TemporalDifferenceAgent 类,继承自 AgentBase 类 class TemporalDifferenceAgent(AgentBase): # 初始化函数,接受环境、学习率、探索率、瓦片数、观测最大值、观测最小值、网格维度、是否离线策略、时间折扣等参数 def __init__( self, env, lr=0.4, epsilon=0.1, n_tilings=8, obs_max=None, obs_min=None, grid_dims=[8, 8], off_policy=False, temporal_discount=0.99, ): # 初始化参数函数 def _init_params(self): # 获取环境信息 E = self.env_info # 断言动作空间必须是离散的 assert not E["continuous_actions"], "Action space must be discrete" obs_encoder = None # 如果观测空间是连续的 if E["continuous_observations"]: # 对观测空间进行编码 obs_encoder, _ = tile_state_space( self.env, self.env_info, self.n_tilings, state_action=False, obs_max=self.obs_max, obs_min=self.obs_min, grid_size=self.grid_dims, ) # 创建观测空间到数字的字典 self._create_2num_dicts(obs_encoder=obs_encoder) # 行为策略是随机的,epsilon-soft 策略 self.behavior_policy = self.target_policy = self._epsilon_soft_policy # 如果是离线策略 if self.off_policy: # 目标策略是确定性的,贪婪策略 self.target_policy = self._greedy # 初始化 Q 函数 self.parameters["Q"] = defaultdict(np.random.rand) # 初始化每个状态-动作对的回报对象 self.derived_variables = {
"episode_num": 0} # 超参数 self.hyperparameters = {
"agent": "TemporalDifferenceAgent", "lr": self.lr, "obs_max": self.obs_max, "obs_min": self.obs_min, "epsilon": self.epsilon, "n_tilings": self.n_tilings, "grid_dims": self.grid_dims, "off_policy": self.off_policy, "temporal_discount": self.temporal_discount, } # 记录每一集的历史数据 self.episode_history = {
"state_actions": [], "rewards": []} def run_episode(self, max_steps, render=False): """ Run the agent on a single episode without updating the priority queue or performing backups. Parameters ---------- max_steps : int The maximum number of steps to run an episode render : bool Whether to render the episode during training Returns ------- reward : float The total reward on the episode, averaged over the theta samples. steps : float The total number of steps taken on the episode, averaged over the theta samples. """ # 调用 _episode 方法运行一个单独的 episode,不更新优先级队列或执行备份 return self._episode(max_steps, render, update=False) def train_episode(self, max_steps, render=False): """ Train the agent on a single episode. Parameters ---------- max_steps : int The maximum number of steps to run an episode. render : bool Whether to render the episode during training. Returns ------- reward : float The total reward on the episode. steps : float The number of steps taken on the episode. """ # 获取派生变量 D = self.derived_variables # 调用 _episode 方法训练一个单独的 episode total_rwd, n_steps = self._episode(max_steps, render, update=True) # 更新 episode_num D["episode_num"] += 1 return total_rwd, n_steps # 定义一个方法,用于运行或训练智能体在一个 episode 上 def _episode(self, max_steps, render, update=True): """ Run or train the agent on an episode. Parameters ---------- max_steps : int The maximum number of steps to run the episode. render : bool Whether to render the episode during training. update : bool Whether to perform the Q function backups after each step. Default is True. Returns ------- reward : float The total reward on the episode. steps : float The number of steps taken on the episode. """ # 清空 episode 历史记录 self.flush_history() # 重置环境并获取初始观察 obs = self.env.reset() HS = self.episode_history # 根据当前观察选择动作 action = self.act(obs) s = self._obs2num[obs] a = self._action2num[action] # 存储初始的 (状态, 动作) 组 HS["state_actions"].append((s, a)) total_reward, n_steps = 0.0, 0 for i in range(max_steps): if render: self.env.render() # 执行动作 obs, reward, done, info = self.env.step(action) n_steps += 1 # 记录奖励 HS["rewards"].append(reward) total_reward += reward # 生成下一个状态和动作 action = self.act(obs) s_ = self._obs2num[obs] if not done else None a_ = self._action2num[action] # 存储下一个 (状态, 动作) 组 HS["state_actions"].append((s_, a_)) # 如果需要更新 Q 函数,则执行更新 if update: self.update() # 如果 episode 结束,则跳出循环 if done: break # 返回总奖励和步数 return total_reward, n_steps def _greedy(self, s, a=None): """ A greedy behavior policy. Only used when off-policy is true. Parameters ---------- s : int, float, or tuple The state number for the current observation, as returned by ``self._obs2num[obs]`` a : int, float, or tuple The action number in the current state, as returned by ``self._action2num[obs]``. If None, sample an action from the action probabilities in state `s`, otherwise, return the probability of action `a` under the greedy policy. Default is None. Returns ------- If `a` is None: action : int, float, or :py:class:`ndarray <numpy.ndarray>` as returned by ``self._num2action`` If `a` is None, returns an action sampled from the distribution over actions defined by the greedy policy. If `a` is not None: action_prob : float in range [0, 1] If `a` is not None, returns the probability of `a` under the greedy policy. """ # noqa: E501 # 获取参数和环境信息 P, E = self.parameters, self.env_info # 计算动作空间的总数 n_actions = np.prod(E["n_actions_per_dim"]) # 找到在当前状态下使得 Q 值最大的动作 a_star = np.argmax([P["Q"][(s, aa)] for aa in range(n_actions)]) # 如果 a 为 None,则从贪婪策略定义的动作分布中随机选择一个动作 if a is None: out = self._num2action[a_star] # 如果 a 不为 None,则返回在贪婪策略下动作 a 的概率 else: out = 1 if a == a_star else 0 return out def _on_policy_update(self, s, a, r, s_, a_): """ Update the Q function using the expected SARSA on-policy TD(0) update: Q[s, a] <- Q[s, a] + lr * [ r + temporal_discount * E[Q[s', a'] | s'] - Q[s, a] ] where E[ Q[s', a'] | s'] is the expected value of the Q function over all a_ given that we're in state s' under the current policy NB. the expected SARSA update can be used for both on- and off-policy methods. In an off-policy context, if the target policy is greedy and the expectation is taken wrt. the target policy then the expected SARSA update is exactly Q-learning. Parameters ---------- s : int as returned by `self._obs2num` The id for the state/observation at timestep t-1 a : int as returned by `self._action2num` The id for the action taken at timestep t-1 r : float The reward after taking action `a` in state `s` at timestep t-1 s_ : int as returned by `self._obs2num` The id for the state/observation at timestep t a_ : int as returned by `self._action2num` The id for the action taken at timestep t """ Q, E, pi = self.parameters["Q"], self.env_info, self.behavior_policy # TODO: this assumes that all actions are available in each state n_actions = np.prod(E["n_actions_per_dim"]) # compute the expected value of Q(s', a') given that we are in state s' E_Q = np.sum([pi(s_, aa) * Q[(s_, aa)] for aa in range(n_actions)]) if s_ else 0 # perform the expected SARSA TD(0) update qsa = Q[(s, a)] Q[(s, a)] = qsa + self.lr * (r + self.temporal_discount * E_Q - qsa) def _off_policy_update(self, s, a, r, s_): """ Update the `Q` function using the TD(0) Q-learning update: Q[s, a] <- Q[s, a] + lr * ( r + temporal_discount * max_a { Q[s', a] } - Q[s, a] ) Parameters ---------- s : int as returned by `self._obs2num` The id for the state/observation at timestep `t-1` a : int as returned by `self._action2num` The id for the action taken at timestep `t-1` r : float The reward after taking action `a` in state `s` at timestep `t-1` s_ : int as returned by `self._obs2num` The id for the state/observation at timestep `t` """ Q, E = self.parameters["Q"], self.env_info n_actions = np.prod(E["n_actions_per_dim"]) qsa = Q[(s, a)] Qs_ = [Q[(s_, aa)] for aa in range(n_actions)] if s_ else [0] Q[(s, a)] = qsa + self.lr * (r + self.temporal_discount * np.max(Qs_) - qsa) def update(self): """Update the parameters of the model online after each new state-action.""" H, HS = self.hyperparameters, self.episode_history (s, a), r = HS["state_actions"][-2], HS["rewards"][-1] s_, a_ = HS["state_actions"][-1] if H["off_policy"]: # 如果是离线策略更新,则调用_off_policy_update函数 self._off_policy_update(s, a, r, s_) else: # 如果是在线策略更新,则调用_on_policy_update函数 self._on_policy_update(s, a, r, s_, a_) # 定义一个方法,执行行为策略--一个用于在训练期间生成动作的 :math:`\epsilon`-soft 策略 def act(self, obs): r""" Execute the behavior policy--an :math:`\epsilon`-soft policy used to generate actions during training. Parameters ---------- obs : int, float, or :py:class:`ndarray <numpy.ndarray>` as returned by ``env.step(action)`` An observation from the environment. Returns ------- action : int, float, or :py:class:`ndarray <numpy.ndarray>` An action sampled from the distribution over actions defined by the epsilon-soft policy. """ # noqa: E501 # 将观察值转换为数字 s = self._obs2num[obs] # 调用行为策略方法,返回动作 return self.behavior_policy(s) # 定义一个方法,执行一个确定性贪婪策略,使用当前代理参数 def greedy_policy(self, max_steps, render=True): """ Execute a deterministic greedy policy using the current agent parameters. Parameters ---------- max_steps : int The maximum number of steps to run the episode. render : bool Whether to render the episode during execution. Returns ------- total_reward : float The total reward on the episode. n_steps : float The total number of steps taken on the episode. """ # 清空历史记录 self.flush_history() # 获取环境的初始观察值 H = self.episode_history obs = self.env.reset() total_reward, n_steps = 0.0, 0 # 循环执行最大步数 for i in range(max_steps): # 如果需要渲染环境,则渲染 if render: self.env.render() # 将观察值转换为数字 s = self._obs2num[obs] # 使用贪婪策略选择动作 action = self._greedy(s) # 将动作转换为数字 a = self._action2num[action] # 存储 (状态, 动作) 组 H["state_actions"].append((s, a)) # 执行动作 obs, reward, done, info = self.env.step(action) n_steps += 1 # 记录奖励 H["rewards"].append(reward) total_reward += reward # 如果完成了一个 episode,则跳出循环 if done: break return total_reward, n_steps # 定义一个名为DynaAgent的类,继承自AgentBase类 class DynaAgent(AgentBase): # 初始化方法,接受多个参数 def __init__( self, env, # 环境对象 lr=0.4, # 学习率,默认值为0.4 epsilon=0.1, # ε-greedy策略中的ε值,默认为0.1 n_tilings=8, # 瓦片编码中的瓦片数量,默认为8 obs_max=None, # 观测值的最大值,默认为None obs_min=None, # 观测值的最小值,默认为None q_plus=False, # 是否使用Q+学习算法,默认为False grid_dims=[8, 8], # 网格维度,默认为[8, 8] explore_weight=0.05, # 探索权重,默认为0.05 temporal_discount=0.9, # 时间折扣因子,默认为0.9 n_simulated_actions=50, # 模拟动作的数量,默认为50 # 初始化参数 def _init_params(self): # 获取环境信息 E = self.env_info # 确保动作空间是离散的 assert not E["continuous_actions"], "Action space must be discrete" # 初始化观测编码器 obs_encoder = None # 如果观测是连续的 if E["continuous_observations"]: # 对状态空间进行切片 obs_encoder, _ = tile_state_space( self.env, self.env_info, self.n_tilings, state_action=False, obs_max=self.obs_max, obs_min=self.obs_min, grid_size=self.grid_dims, ) # 创建状态编码器和动作编码器的字典 self._create_2num_dicts(obs_encoder=obs_encoder) # 设置行为策略和目标策略为 epsilon-soft 策略 self.behavior_policy = self.target_policy = self._epsilon_soft_policy # 初始化 Q 函数和模型 self.parameters["Q"] = defaultdict(np.random.rand) self.parameters["model"] = EnvModel() # 初始化每个状态-动作对的返回对象 self.derived_variables = {
"episode_num": 0, "sweep_queue": {
}, "visited": set(), "steps_since_last_visit": defaultdict(lambda: 0), } # 如果使用 Q+ 算法 if self.q_plus: self.derived_variables["steps_since_last_visit"] = defaultdict( np.random.rand, ) # 设置超参数 self.hyperparameters = {
"agent": "DynaAgent", "lr": self.lr, "q_plus": self.q_plus, "obs_max": self.obs_max, "obs_min": self.obs_min, "epsilon": self.epsilon, "n_tilings": self.n_tilings, "grid_dims": self.grid_dims, "explore_weight": self.explore_weight, "temporal_discount": self.temporal_discount, "n_simulated_actions": self.n_simulated_actions, } # 初始化每一集的历史记录 self.episode_history = {
"state_actions": [], "rewards": []} # 执行行为策略--一个用于在训练期间生成动作的ε-soft策略 def act(self, obs): # 将环境返回的观测转换为数字形式 s = self._obs2num[obs] # 从由ε-soft策略定义的动作分布中采样一个动作 return self.behavior_policy(s) # 定义一个贪婪的行为策略函数 def _greedy(self, s, a=None): """ A greedy behavior policy. Parameters ---------- s : int, float, or tuple The state number for the current observation, as returned by self._obs2num[obs] a : int, float, or tuple The action number in the current state, as returned by self._action2num[obs]. If None, sample an action from the action probabilities in state s, otherwise, return the probability of action `a` under the greedy policy. Default is None. Returns ------- If `a` is None: action : int, float, or :py:class:`ndarray <numpy.ndarray>` as returned by :meth:`_num2action` If `a` is None, returns an action sampled from the distribution over actions defined by the greedy policy. If `a` is not None: action_prob : float in range [0, 1] If `a` is not None, returns the probability of `a` under the greedy policy. """ # noqa: E501 # 获取环境信息和 Q 值 E, Q = self.env_info, self.parameters["Q"] # 计算动作空间的总数 n_actions = np.prod(E["n_actions_per_dim"]) # 找到在当前状态下使 Q 值最大的动作 a_star = np.argmax([Q[(s, aa)] for aa in range(n_actions)]) # 如果 a 为 None,则从贪婪策略定义的动作分布中随机选择一个动作 if a is None: out = self._num2action[a_star] # 如果 a 不为 None,则返回 a 在贪婪策略下的概率 else: out = 1 if a == a_star else 0 return out def update(self): """ Update the priority queue with the most recent (state, action) pair and perform random-sample one-step tabular Q-planning. Notes ----- The planning algorithm uses a priority queue to retrieve the state-action pairs from the agent's history which will result in the largest change to its `Q`-value if backed up. When the first pair in the queue is backed up, the effect on each of its predecessor pairs is computed. If the predecessor's priority is greater than a small threshold the pair is added to the queue and the process is repeated until either the queue is empty or we exceed `n_simulated_actions` updates. """ # 获取最近的 (state, action) 对 s, a = self.episode_history["state_actions"][-1] # 更新优先级队列 self._update_queue(s, a) # 模拟行为 self._simulate_behavior() def _update_queue(self, s, a): """ Update the priority queue by calculating the priority for (s, a) and inserting it into the queue if it exceeds a fixed (small) threshold. Parameters ---------- s : int as returned by `self._obs2num` The id for the state/observation a : int as returned by `self._action2num` The id for the action taken from state `s` """ # 获取派生变量中的优先级队列 sweep_queue = self.derived_variables["sweep_queue"] # TODO: what's a good threshold here? # 计算 (s, a) 的优先级 priority = self._calc_priority(s, a) # 如果优先级大于等于 0.001,则插入到优先级队列中 if priority >= 0.001: if (s, a) in sweep_queue: sweep_queue[(s, a)] = max(priority, sweep_queue[(s, a)]) else: sweep_queue[(s, a)] = priority def _calc_priority(self, s, a): """ 计算状态动作对 (s, a) 的“优先级”。优先级 P 定义为: P = sum_{s_} p(s_) * abs(r + temporal_discount * max_a {Q[s_, a]} - Q[s, a]) 这对应于 TD(0) Q-learning 对 (s, a) 的绝对值大小的备份。 Parameters ---------- s : int as returned by `self._obs2num` 状态/观察的 id a : int as returned by `self._action2num` 从状态 `s` 中采取的动作的 id Returns ------- priority : float (s, a) 的全备份 TD(0) Q-learning 更新的绝对值大小 """ priority = 0.0 E = self.env_info Q = self.parameters["Q"] env_model = self.parameters["model"] n_actions = np.prod(E["n_actions_per_dim"]) outcome_probs = env_model.outcome_probs(s, a) for (r, s_), p_rs_ in outcome_probs: max_q = np.max([Q[(s_, aa)] for aa in range(n_actions)]) P = p_rs_ * (r + self.temporal_discount * max_q - Q[(s, a)]) priority += np.abs(P) return priority def _simulate_behavior(self): """ Perform random-sample one-step tabular Q-planning with prioritized sweeping. Notes ----- This approach uses a priority queue to retrieve the state-action pairs from the agent's history with largest change to their Q-values if backed up. When the first pair in the queue is backed up, the effect on each of its predecessor pairs is computed. If the predecessor's priority is greater than a small threshold the pair is added to the queue and the process is repeated until either the queue is empty or we have exceeded a `n_simulated_actions` updates. """ # 获取环境模型和优先级队列 env_model = self.parameters["model"] sweep_queue = self.derived_variables["sweep_queue"] # 进行一定次数的模拟行为 for _ in range(self.n_simulated_actions): # 如果队列为空,则结束模拟 if len(sweep_queue) == 0: break # 从队列中选择具有最大更新(优先级)的(s, a)对 sq_items = list(sweep_queue.items()) (s_sim, a_sim), _ = sorted(sq_items, key=lambda x: x[1], reverse=True)[0] # 从队列中删除条目 del sweep_queue[(s_sim, a_sim)] # 使用完全备份版本的TD(0) Q-learning更新为(s_sim, a_sim)更新Q函数 self._update(s_sim, a_sim) # 获取导致s_sim的所有(_s, _a)对(即s_sim的前导状态) pairs = env_model.state_action_pairs_leading_to_outcome(s_sim) # 如果前导状态的优先级超过阈值,则将其添加到队列中 for (_s, _a) in pairs: self._update_queue(_s, _a) def _update(self, s, a): """ Update Q using a full-backup version of the TD(0) Q-learning update: Q(s, a) = Q(s, a) + lr * sum_{r, s'} [ p(r, s' | s, a) * (r + gamma * max_a { Q(s', a) } - Q(s, a)) ] Parameters ---------- s : int as returned by ``self._obs2num`` The id for the state/observation a : int as returned by ``self._action2num`` The id for the action taken from state `s` """ # 初始化更新值为0 update = 0.0 # 获取环境模型、环境信息、派生变量和Q值 env_model = self.parameters["model"] E, D, Q = self.env_info, self.derived_variables, self.parameters["Q"] # 计算动作空间的大小 n_actions = np.prod(E["n_actions_per_dim"]) # 从模型中采样奖励 outcome_probs = env_model.outcome_probs(s, a) for (r, s_), p_rs_ in outcome_probs: # 如果启用Q+算法,根据上次访问时间给奖励加上一个“奖励” if self.q_plus: r += self.explore_weight * np.sqrt(D["steps_since_last_visit"][(s, a)]) # 计算下一个状态的最大Q值 max_q = np.max([Q[(s_, a_)] for a_ in range(n_actions)]) # 更新值根据TD(0) Q-learning更新公式计算 update += p_rs_ * (r + self.temporal_discount * max_q - Q[(s, a)]) # 更新Q值 Q[(s, a)] += self.lr * update def run_episode(self, max_steps, render=False): """ Run the agent on a single episode without performing `Q`-function backups. Parameters ---------- max_steps : int The maximum number of steps to run an episode. render : bool Whether to render the episode during training. Returns ------- reward : float The total reward on the episode. steps : float The number of steps taken on the episode. """ # 运行一个不执行Q函数备份的单个episode return self._episode(max_steps, render, update=False) def train_episode(self, max_steps, render=False): """ Train the agent on a single episode. Parameters ---------- max_steps : int The maximum number of steps to run an episode. render : bool Whether to render the episode during training. Returns ------- reward : float The total reward on the episode. steps : float The number of steps taken on the episode. """ # 获取派生变量 D = self.derived_variables # 在一个 episode 上运行 _episode 方法,返回总奖励和步数 total_rwd, n_steps = self._episode(max_steps, render, update=True) # 增加 episode_num 计数 D["episode_num"] += 1 # 返回总奖励和步数 return total_rwd, n_steps def greedy_policy(self, max_steps, render=True): """ Execute a deterministic greedy policy using the current agent parameters. Parameters ---------- max_steps : int The maximum number of steps to run the episode. render : bool Whether to render the episode during execution. Returns ------- total_reward : float The total reward on the episode. n_steps : float The total number of steps taken on the episode. """ # 清空历史记录 self.flush_history() # 获取 episode_history H = self.episode_history # 重置环境并获取初始观察 obs = self.env.reset() total_reward, n_steps = 0.0, 0 for i in range(max_steps): # 如果需要渲染,显示环境 if render: self.env.render() # 将观察转换为数字 s = self._obs2num[obs] # 使用贪婪策略选择动作 action = self._greedy(s) # 将动作转换为数字 a = self._action2num[action] # 存储 (状态, 动作) 组 H["state_actions"].append((s, a)) # 执行动作 obs, reward, done, info = self.env.step(action) n_steps += 1 # 记录奖励 H["rewards"].append(reward) total_reward += reward # 如果 episode 结束,跳出循环 if done: break # 返回总奖励和步数 return total_reward, n_steps
RL Models
The agents.py
module implements a number of standard reinforcement learning (RL) agents that
can be run on OpenAI gym environments.
- Monte Carlo Methods
- First-visit Monte Carlo updates (on-policy)
- Incremental weighted importance sampling (off-policy)
- Cross-entropy method (Mannor, Rubinstein, & Gat, 2003)
- Temporal-Difference Methods
- SARSA (on-policy) (Rummery & Niranjan, 1994)
- Q-learning (off-policy) (Watkins, 1989)
- Model-Based Methods
- Dyna-Q/Dyna-Q+ with prioritized sweeping (Sutton, 1990; Moore & Atkeson, 1993)
Plots
numpy-ml\numpy_ml\rl_models\rl_utils.py
# 导入警告模块 import warnings # 导入 product 函数和 defaultdict 类 from itertools import product from collections import defaultdict # 导入 numpy 库 import numpy as np # 导入自定义的 DependencyWarning 类 from numpy_ml.utils.testing import DependencyWarning # 导入 tiles 和 IHT 函数 from numpy_ml.rl_models.tiles.tiles3 import tiles, IHT # 初始化 NO_PD 变量为 False NO_PD = False # 尝试导入 pandas 库,如果导入失败则将 NO_PD 设置为 True try: import pandas as pd except ModuleNotFoundError: NO_PD = True # 尝试导入 gym 库,如果导入失败则发出警告 try: import gym except ModuleNotFoundError: fstr = ( "Agents in `numpy_ml.rl_models` use the OpenAI gym for training. " "To install the gym environments, run `pip install gym`. For more" " information, see https://github.com/openai/gym." ) warnings.warn(fstr, DependencyWarning) # 定义一个简单的环境模型类 class EnvModel(object): """ A simple tabular environment model that maintains the counts of each reward-outcome pair given the state and action that preceded them. The model can be queried with >>> M = EnvModel() >>> M[(state, action, reward, next_state)] += 1 >>> M[(state, action, reward, next_state)] 1 >>> M.state_action_pairs() [(state, action)] >>> M.outcome_probs(state, action) [(next_state, 1)] """ # 初始化方法 def __init__(self): super(EnvModel, self).__init__() # 使用 defaultdict 创建一个嵌套字典作为环境模型 self._model = defaultdict(lambda: defaultdict(lambda: 0)) # 设置方法,设置环境模型中的值 def __setitem__(self, key, value): """Set self[key] to value""" s, a, r, s_ = key self._model[(s, a)][(r, s_)] = value # 获取方法,获取环境模型中的值 def __getitem__(self, key): """Return the value associated with key""" s, a, r, s_ = key return self._model[(s, a)][(r, s_)] # 包含方法,判断环境模型是否包含某个键 def __contains__(self, key): """True if EnvModel contains `key`, else False""" s, a, r, s_ = key # 判断状态-动作对和奖励-下一个状态对是否在环境模型中 p1 = (s, a) in self.state_action_pairs() p2 = (r, s_) in self.reward_outcome_pairs() return p1 and p2 # 返回环境模型中所有状态和动作对 def state_action_pairs(self): """Return all (state, action) pairs in the environment model""" return list(self._model.keys()) # 返回在状态`s`中采取动作`a`时关联的所有奖励和下一个状态对 def reward_outcome_pairs(self, s, a): """ Return all (reward, next_state) pairs associated with taking action `a` in state `s`. """ return list(self._model[(s, a)].keys()) # 返回在状态`s`中采取动作`a`后每个可能结果状态的环境模型概率 def outcome_probs(self, s, a): """ Return the probability under the environment model of each outcome state after taking action `a` in state `s`. Parameters ---------- s : int as returned by ``self._obs2num`` The id for the state/observation. a : int as returned by ``self._action2num`` The id for the action taken from state `s`. Returns ------- outcome_probs : list of (state, prob) tuples A list of each possible outcome and its associated probability under the model. """ items = list(self._model[(s, a)].items()) total_count = np.sum([c for (_, c) in items]) outcome_probs = [c / total_count for (_, c) in items] outcomes = [p for (p, _) in items] return list(zip(outcomes, outcome_probs)) # 返回所有具有在当前模型下产生`outcome`的非零概率的状态和动作对 def state_action_pairs_leading_to_outcome(self, outcome): """ Return all (state, action) pairs that have a nonzero probability of producing `outcome` under the current model. Parameters ---------- outcome : int The outcome state. Returns ------- pairs : list of (state, action) tuples A list of all (state, action) pairs with a nonzero probability of producing `outcome` under the model. """ pairs = [] for sa in self.state_action_pairs(): outcomes = [o for (r, o) in self.reward_outcome_pairs(*sa)] if outcome in outcomes: pairs.append(sa) return pairs # 定义一个函数,用于将环境生成的连续观测编码为状态空间的一组重叠瓦片 def tile_state_space( env, # 环境对象,openAI环境 env_stats, # 环境统计信息 n_tilings, # 使用的重叠瓦片数量,应为2的幂,决定离散化瓦片编码状态向量的维度 obs_max=None, # 观测空间的最大值,用于计算网格宽度,默认为None,使用env.observation_space.high obs_min=None, # 观测空间的最小值,用于计算网格宽度,默认为None,使用env.observation_space.low state_action=False, # 是否使用瓦片编码来编码状态-动作值(True)或仅状态值(False),默认为False grid_size=(4, 4), # 瓦片的粗糙度列表,每个瓦片由一个4x4的网格组成,默认为[4, 4] ): """ Return a function to encode the continous observations generated by `env` in terms of a collection of `n_tilings` overlapping tilings (each with dimension `grid_size`) of the state space. Arguments --------- env : ``gym.wrappers.time_limit.TimeLimit`` instance An openAI environment. n_tilings : int The number of overlapping tilings to use. Should be a power of 2. This determines the dimension of the discretized tile-encoded state vector. obs_max : float or np.ndarray The value to treat as the max value of the observation space when calculating the grid widths. If None, use ``env.observation_space.high``. Default is None. obs_min : float or np.ndarray The value to treat as the min value of the observation space when calculating the grid widths. If None, use ``env.observation_space.low``. Default is None. state_action : bool Whether to use tile coding to encode state-action values (True) or just state values (False). Default is False. grid_size : list of length 2 A list of ints representing the coarseness of the tilings. E.g., a `grid_size` of [4, 4] would mean each tiling consisted of a 4x4 tile grid. Default is [4, 4]. Returns ------- encode_obs_as_tile : function A function which takes as input continous observation vector and returns a set of the indices of the active tiles in the tile coded observation space. n_states : int An integer reflecting the total number of unique states possible under this tile coding regimen. """ # 如果obs_max为None,则将env.observation_space.high转换为数值,否则保持obs_max不变 obs_max = np.nan_to_num(env.observation_space.high) if obs_max is None else obs_max # 如果obs_min为None,则将env.observation_space.low转换为数值,否则保持obs_min不变 obs_min = np.nan_to_num(env.observation_space.low) if obs_min is None else obs_min # 如果状态动作存在 if state_action: # 如果环境统计中包含组动作 if env_stats["tuple_action"]: # 计算每个动作空间的数量 n = [space.n - 1.0 for space in env.action_spaces.spaces] else: # 获取环境动作空间的数量 n = [env.action_space.n] # 更新观测最大值和最小值 obs_max = np.concatenate([obs_max, n]) obs_min = np.concatenate([obs_min, np.zeros_like(n)]) # 计算观测范围 obs_range = obs_max - obs_min # 计算缩放比例 scale = 1.0 / obs_range # 定义缩放观测向量的函数 scale_obs = lambda obs: obs * scale # noqa: E731 # 计算总瓦片数和总状态数 n_tiles = np.prod(grid_size) * n_tilings n_states = np.prod([n_tiles - i for i in range(n_tilings)]) # 创建指示器哈希表 iht = IHT(16384) # 定义将观测编码为瓦片的函数 def encode_obs_as_tile(obs): # 缩放观测向量 obs = scale_obs(obs) return tuple(tiles(iht, n_tilings, obs)) # 返回编码观测为瓦片的函数和总状态数 return encode_obs_as_tile, n_states # 返回所有有效的 OpenAI ``gym`` 环境的 ID 列表 def get_gym_environs(): return [e.id for e in gym.envs.registry.all()] # 返回一个包含环境 ID 的 pandas DataFrame def get_gym_stats(): df = [] # 遍历所有 gym 环境 for e in gym.envs.registry.all(): # 打印环境 ID print(e.id) # 获取环境统计信息并添加到 DataFrame 中 df.append(env_stats(gym.make(e.id))) cols = [ "id", "continuous_actions", "continuous_observations", "action_dim", # "action_ids", "deterministic", "multidim_actions", "multidim_observations", "n_actions_per_dim", "n_obs_per_dim", "obs_dim", # "obs_ids", "seed", "tuple_actions", "tuple_observations", ] # 如果没有安装 pandas,则返回列表,否则返回 DataFrame return df if NO_PD else pd.DataFrame(df)[cols] # 检查环境的动作和观察空间是否为 ``gym.spaces.Tuple`` 或 ``gym.spaces.Dict`` def is_tuple(env): tuple_space, dict_space = gym.spaces.Tuple, gym.spaces.dict.Dict # 检查动作空间是否为 Tuple 或 Dict tuple_action = isinstance(env.action_space, (tuple_space, dict_space)) # 检查观察空间是否为 Tuple 或 Dict tuple_obs = isinstance(env.observation_space, (tuple_space, dict_space)) return tuple_action, tuple_obs # 检查环境的动作和观察空间是否为多维空间或 ``Tuple`` 空间 def is_multidimensional(env): # 多维空间是指动作/观察空间中有多个素的空间,包括 ``Tuple`` 空间 includes single action/observation spaces with several dimensions. Parameters ---------- env : ``gym.wrappers`` or ``gym.envs`` instance The environment to evaluate. Returns ------- md_action : bool Whether the `env`'s action space is multidimensional. md_obs : bool Whether the `env`'s observation space is multidimensional. tuple_action : bool Whether the `env`'s action space is a ``Tuple`` instance. tuple_obs : bool Whether the `env`'s observation space is a ``Tuple`` instance. """ # 初始化变量,假设环境的动作空间和观测空间都是多维的 md_action, md_obs = True, True # 检查环境的动作空间和观测空间是否为组类型 tuple_action, tuple_obs = is_tuple(env) # 如果动作空间不是组类型 if not tuple_action: # 从动作空间中随机采样一个动作 act = env.action_space.sample() # 判断采样的动作是否为列表、组或者 NumPy 数组,并且长度大于1 md_action = isinstance(act, (list, tuple, np.ndarray)) and len(act) > 1 # 如果观测空间不是组类型 if not tuple_obs: # 获取观测空间对象 OS = env.observation_space # 如果观测空间对象有 'low' 属性,则获取 'low' 属性值,否则随机采样一个观测 obs = OS.low if "low" in dir(OS) else OS.sample() # sample causes problems # 判断采样的观测是否为列表、组或者 NumPy 数组,并且长度大于1 md_obs = isinstance(obs, (list, tuple, np.ndarray)) and len(obs) > 1 # 返回动作空间是否多维、观测空间是否多维、动作空间是否为组、观测空间是否为组的结果 return md_action, md_obs, tuple_action, tuple_obs # 检查环境的观测和动作空间是否连续 def is_continuous(env, tuple_action, tuple_obs): # 导入 gym 库中的相关模块 Continuous = gym.spaces.box.Box # 如果观测空间是组类型 if tuple_obs: # 获取环境的观测空间 spaces = env.observation_space.spaces # 检查所有子空间是否为连续空间 cont_obs = all(isinstance(s, Continuous) for s in spaces) else: # 检查观测空间是否为连续空间 cont_obs = isinstance(env.observation_space, Continuous) # 如果动作空间是组类型 if tuple_action: # 获取环境的动作空间 spaces = env.action_space.spaces # 检查所有子空间是否为连续空间 cont_action = all(isinstance(s, Continuous) for s in spaces) else: # 检查动作空间是否为连续空间 cont_action = isinstance(env.action_space, Continuous) # 返回动作空间是否连续和观测空间是否连续的布尔值 return cont_action, cont_obs # 获取关于环境动作空间的信息 def action_stats(env, md_action, cont_action): # 参数 md_action 表示动作空间是否为多维的 # 参数 cont_action 表示动作空间是否为连续的 # 返回值 n_actions_per_dim 表示每个维度的动作空间可能的动作数量 # 返回值 action_ids 表示空间内所有有效动作的列表,如果 cont_action 为 True,则为 None # 返回值 action_dim 表示单个动作的维度数量 # 如果需要考虑动作,则初始化动作维度为1,动作ID为空,每个维度的动作数量为无穷大 if cont_action: action_dim = 1 action_ids = None n_actions_per_dim = [np.inf] # 如果需要考虑多维动作,则获取环境中动作空间的维度 if md_action: action_dim = env.action_space.shape[0] n_actions_per_dim = [np.inf for _ in range(action_dim)] # 如果不需要考虑动作 else: # 如果需要考虑多维动作 if md_action: # 获取每个维度的动作数量,如果动作空间有属性"n"则获取其值,否则为无穷大 n_actions_per_dim = [ space.n if hasattr(space, "n") else np.inf for space in env.action_space.spaces ] # 如果动作数量不为无穷大,则生成动作ID列表 action_ids = ( None if np.inf in n_actions_per_dim else list(product(*[range(i) for i in n_actions_per_dim])) ) # 动作维度为动作数量列表的长度 action_dim = len(n_actions_per_dim) # 如果不需要考虑多维动作 else: # 初始化动作维度为1,每个维度的动作数量为环境中动作空间的数量,生成动作ID列表 action_dim = 1 n_actions_per_dim = [env.action_space.n] action_ids = list(range(n_actions_per_dim[0])) # 返回每个维度的动作数量列表,动作ID列表,动作维度 return n_actions_per_dim, action_ids, action_dim # 获取环境的观测空间信息 def obs_stats(env, md_obs, cont_obs): """ Get information on the observation space for `env`. Parameters ---------- env : ``gym.wrappers`` or ``gym.envs`` instance The environment to evaluate. md_obs : bool Whether the `env`'s action space is multidimensional. cont_obs : bool Whether the `env`'s observation space is multidimensional. Returns ------- n_obs_per_dim : list of length (obs_dim,) The number of possible observation classes for each dimension of the observation space. obs_ids : list or None A list of all valid observations within the space. If `cont_obs` is True, this value will be None. obs_dim : int or None The number of dimensions in a single observation. """ # 如果观测空间是连续的 if cont_obs: # 观测空间的观测值列表设为 None obs_ids = None # 观测空间的维度设为观测空间的第一个维度 obs_dim = env.observation_space.shape[0] # 每个维度的可能观测类别数设为无穷大 n_obs_per_dim = [np.inf for _ in range(obs_dim)] else: # 如果观测空间不是连续的 if md_obs: # 对于每个子空间,获取可能的观测类别数 n_obs_per_dim = [ space.n if hasattr(space, "n") else np.inf for space in env.observation_space.spaces ] # 如果观测类别数中包含无穷大,则观测值列表设为 None,否则生成所有可能的观测值组合 obs_ids = ( None if np.inf in n_obs_per_dim else list(product(*[range(i) for i in n_obs_per_dim])) ) # 观测空间的维度为子空间的数量 obs_dim = len(n_obs_per_dim) else: # 如果观测空间是单维度的 obs_dim = 1 # 观测空间的可能观测类别数为观测空间的类别数 n_obs_per_dim = [env.observation_space.n] # 观测值列表为所有可能的观测值 obs_ids = list(range(n_obs_per_dim[0]) # 返回观测空间信息 return n_obs_per_dim, obs_ids, obs_dim # 计算当前环境的统计信息 def env_stats(env): """ Compute statistics for the current environment. Parameters ---------- env : ``gym.wrappers`` or ``gym.envs`` instance The environment to evaluate. Returns ------- env_info : dict A dictionary containing information about the action and observation spaces of `env`. """ # 检查环境是否是多维度的,获取动作和观测空间的信息 md_action, md_obs, tuple_action, tuple_obs = is_multidimensional(env) # 检查环境是否具有连续动作和连续观测 cont_action, cont_obs = is_continuous(env, tuple_action, tuple_obs) # 获取动作的统计信息,包括每个维度的动作数量、动作的 ID 和动作的维度 n_actions_per_dim, action_ids, action_dim = action_stats( env, md_action, cont_action, ) # 获取观测的统计信息,包括每个维度的观测数量、观测的 ID 和观测的维度 n_obs_per_dim, obs_ids, obs_dim = obs_stats(env, md_obs, cont_obs) # 构建环境信息字典,包括环境的 ID、种子、是否确定性环境、动作和观测的类型、维度等信息 env_info = {
"id": env.spec.id, "seed": env.spec.seed if "seed" in dir(env.spec) else None, "deterministic": bool(~env.spec.nondeterministic), "tuple_actions": tuple_action, "tuple_observations": tuple_obs, "multidim_actions": md_action, "multidim_observations": md_obs, "continuous_actions": cont_action, "continuous_observations": cont_obs, "n_actions_per_dim": n_actions_per_dim, "action_dim": action_dim, "n_obs_per_dim": n_obs_per_dim, "obs_dim": obs_dim, "action_ids": action_ids, "obs_ids": obs_ids, } # 返回环境信息字典 return env_info
numpy-ml\numpy_ml\rl_models\tiles\tiles3.py
""" Tile Coding Software version 3.0beta by Rich Sutton based on a program created by Steph Schaeffer and others External documentation and recommendations on the use of this code is available in the reinforcement learning textbook by Sutton and Barto, and on the web. These need to be understood before this code is. This software is for Python 3 or more. This is an implementation of grid-style tile codings, based originally on the UNH CMAC code (see http://www.ece.unh.edu/robots/cmac.htm), but by now highly changed. Here we provide a function, "tiles", that maps floating and integer variables to a list of tiles, and a second function "tiles-wrap" that does the same while wrapping some floats to provided widths (the lower wrap value is always 0). The float variables will be gridded at unit intervals, so generalization will be by approximately 1 in each direction, and any scaling will have to be done externally before calling tiles. Num-tilings should be a power of 2, e.g., 16. To make the offsetting work properly, it should also be greater than or equal to four times the number of floats. The first argument is either an index hash table of a given size (created by (make-iht size)), an integer "size" (range of the indices from 0), or nil (for testing, indicating that the tile coordinates are to be returned without being converted to indices). """ # 导入 math 模块中的 floor 函数和 itertools 模块中的 zip_longest 函数 from math import floor from itertools import zip_longest # 将 basehash 函数赋值给 basehash 变量 basehash = hash # 定义 IHT 类 class IHT: "Structure to handle collisions" # 初始化方法,接受一个参数 sizeval def __init__(self, sizeval): # 设置实例变量 size 为传入的 sizeval self.size = sizeval # 初始化 overfullCount 为 0 self.overfullCount = 0 # 初始化 dictionary 为空字典 self.dictionary = {
} # 定义 __str__ 方法,用于对象打印时返回字符串 def __str__(self): # 返回包含对象信息的字符串 return ( "Collision table:" + " size:" + str(self.size) + " overfullCount:" + str(self.overfullCount) + " dictionary:" + str(len(self.dictionary)) + " items" ) # 返回字典中键值对的数量 def count(self): return len(self.dictionary) # 检查字典是否已满 def fullp(self): return len(self.dictionary) >= self.size # 获取对象在字典中的索引,如果对象不存在且只读模式,则返回None def getindex(self, obj, readonly=False): # 获取字典引用 d = self.dictionary # 如果对象在字典中存在,则返回其索引 if obj in d: return d[obj] # 如果对象不存在且为只读模式,则返回None elif readonly: return None # 获取字典大小和当前键值对数量 size = self.size count = self.count() # 如果键值对数量大于等于字典大小 if count >= size: # 如果超出计数为0,则打印信息 if self.overfullCount == 0: print("IHT full, starting to allow collisions") # 增加超出计数 self.overfullCount += 1 # 返回对象的哈希值对字典大小取模作为索引 return basehash(obj) % self.size else: # 将对象添加到字典中,并返回其索引 d[obj] = count return count # 根据输入的坐标、哈希表或大小、只读标志,返回哈希索引 def hashcoords(coordinates, m, readonly=False): # 如果哈希表类型为IHT,则调用getindex方法获取索引 if type(m) == IHT: return m.getindex(tuple(coordinates), readonly) # 如果哈希表类型为整数,则对坐标进行哈希运算并取模 if type(m) == int: return basehash(tuple(coordinates)) % m # 如果哈希表为None,则直接返回坐标 if m == None: return coordinates # 返回num-tilings个瓦片索引,对应于浮点数和整数 def tiles(ihtORsize, numtilings, floats, ints=[], readonly=False): """returns num-tilings tile indices corresponding to the floats and ints""" # 将浮点数乘以numtilings并向下取整 qfloats = [floor(f * numtilings) for f in floats] Tiles = [] for tiling in range(numtilings): tilingX2 = tiling * 2 coords = [tiling] b = tiling for q in qfloats: coords.append((q + b) // numtilings) b += tilingX2 coords.extend(ints) Tiles.append(hashcoords(coords, ihtORsize, readonly)) return Tiles # 返回num-tilings个瓦片索引,对应于浮点数和整数,其中一些浮点数进行了包装 def tileswrap(ihtORsize, numtilings, floats, wrapwidths, ints=[], readonly=False): """returns num-tilings tile indices corresponding to the floats and ints, wrapping some floats""" qfloats = [floor(f * numtilings) for f in floats] Tiles = [] for tiling in range(numtilings): tilingX2 = tiling * 2 coords = [tiling] b = tiling for q, width in zip_longest(qfloats, wrapwidths): c = (q + b % numtilings) // numtilings coords.append(c % width if width else c) b += tilingX2 coords.extend(ints) Tiles.append(hashcoords(coords, ihtORsize, readonly)) return Tiles
numpy-ml\numpy_ml\rl_models\tiles\__init__.py
# 从当前目录中导入 tiles3 模块 from . import tiles3
numpy-ml\numpy_ml\rl_models\trainer.py
from time import time import numpy as np # 定义一个 Trainer 类,用于方便地进行 agent 的训练和评估 class Trainer(object): def __init__(self, agent, env): """ An object to facilitate agent training and evaluation. Parameters ---------- agent : :class:`AgentBase` instance The agent to train. env : ``gym.wrappers`` or ``gym.envs`` instance The environment to run the agent on. """ # 初始化 Trainer 对象,设置 agent 和 env 属性 self.env = env self.agent = agent # 初始化 rewards 字典,用于存储训练过程中的奖励和相关信息 self.rewards = {
"total": [], "smooth_total": [], "n_steps": [], "duration": []} def _train_episode(self, max_steps, render_every=None): # 记录当前时间 t0 = time() if "train_episode" in dir(self.agent): # 如果 agent 中有 train_episode 方法,则在线训练更新 reward, n_steps = self.agent.train_episode(max_steps) else: # 如果 agent 中没有 train_episode 方法,则离线训练更新 reward, n_steps = self.agent.run_episode(max_steps) # 更新 agent self.agent.update() # 计算训练时长 duration = time() - t0 return reward, duration, n_steps def train( self, n_episodes, max_steps, seed=None, plot=True, verbose=True, render_every=None, smooth_factor=0.05, def plot_rewards(self, rwd_greedy): """ Plot the cumulative reward per episode as a function of episode number. Notes ----- Saves plot to the file ``./img/<agent>-<env>.png`` Parameters ---------- rwd_greedy : float The cumulative reward earned with a final execution of a greedy target policy. """ try: import matplotlib.pyplot as plt import seaborn as sns # 设置 seaborn 库的样式为白色 sns.set_style("white") # 设置 seaborn 库的上下文为 notebook,字体大小为 1 sns.set_context("notebook", font_scale=1) except: fstr = "Error importing `matplotlib` and `seaborn` -- plotting functionality is disabled" # 如果导入 matplotlib 和 seaborn 失败,则抛出 ImportError 异常 raise ImportError(fstr) # 获取累积奖励数据 R = self.rewards # 创建图形和轴对象 fig, ax = plt.subplots() # 创建 x 轴数据,表示每一轮的序号 x = np.arange(len(R["total"])) # 创建 y 轴数据,表示平滑后的累积奖励 y = R["smooth_total"] # 创建 y_raw 轴数据,表示原始的累积奖励 y_raw = R["total"] # 绘制平滑后的累积奖励曲线 ax.plot(x, y, label="smoothed") # 绘制原始的累积奖励曲线,透明度为 0.5 ax.plot(x, y_raw, alpha=0.5, label="raw") # 添加一条虚线,表示最终贪婪策略的累积奖励 ax.axhline(y=rwd_greedy, xmin=min(x), xmax=max(x), ls=":", label="final greedy") # 添加图例 ax.legend() # 移除图形的上边界和右边界 sns.despine() # 获取环境名称和智能体名称 env = self.agent.env_info["id"] agent = self.agent.hyperparameters["agent"] # 设置 x 轴标签为 "Episode" ax.set_xlabel("Episode") # 设置 y 轴标签为 "Cumulative reward" ax.set_ylabel("Cumulative reward") # 设置图形标题为智能体名称和环境名称的组合 ax.set_title("{} on '{}'".format(agent, env)) # 保存图形到文件 img/<agent>-<env>.png plt.savefig("img/{}-{}.png".format(agent, env)) # 关闭所有图形 plt.close("all")
numpy-ml\numpy_ml\rl_models\__init__.py
# 从当前目录中导入 rl_utils 模块 from . import rl_utils # 从当前目录中导入 agents 模块 from . import agents # 从当前目录中导入 trainer 模块 from . import trainer # 从当前目录中导入 tiles 模块 from . import tiles
numpy-ml\numpy_ml\tests\nn_torch_models.py
# 禁用 flake8 检查 # 导入 PyTorch 库 import torch import torch.nn as nn import torch.nn.functional as F # 导入 TensorFlow 库 import tensorflow as tf # 导入 NumPy 库 import numpy as np # # 用于测试自定义层的黄金标准实现 # # (需要 PyTorch) # # # 将输入转换为 PyTorch 变量 def torchify(var, requires_grad=True): return torch.autograd.Variable(torch.FloatTensor(var), requires_grad=requires_grad) # 生成 PyTorch 梯度计算器 def torch_gradient_generator(fn, kwargs): def get_grad(z): z1 = torch.autograd.Variable(torch.FloatTensor(z), requires_grad=True) z2 = fn(z1, kwargs).sum() z2.backward() grad = z1.grad.numpy() return grad return get_grad # 计算交叉熵损失函数的梯度 def torch_xe_grad(y, z): z = torch.autograd.Variable(torch.FloatTensor(z), requires_grad=True) y = torch.LongTensor(y.argmax(axis=1)) loss = F.cross_entropy(z, y, reduction="sum") loss.backward() grad = z.grad.numpy() return grad # 计算均方误差损失函数的梯度 def torch_mse_grad(y, z, act_fn): y = torch.FloatTensor(y) z = torch.autograd.Variable(torch.FloatTensor(z), requires_grad=True) y_pred = act_fn(z) loss = F.mse_loss(y_pred, y, reduction="sum") # size_average=False).sum() loss.backward() grad = z.grad.numpy() return grad # PyTorch VAE 损失函数类 class TorchVAELoss(nn.Module): def __init__(self): super(TorchVAELoss, self).__init__() # 从输入数据中提取梯度信息 def extract_grads(self, X, X_recon, t_mean, t_log_var): # 定义一个极小的浮点数,用于处理梯度计算中的数值稳定性 eps = np.finfo(float).eps # 将输入数据转换为 Torch 张量,并设置不需要梯度信息 X = torchify(X, requires_grad=False) # 将重构后的输入数据转换为 Torch 张量,并进行数值裁剪,避免出现极端值 X_recon = torchify(np.clip(X_recon, eps, 1 - eps)) # 将均值数据转换为 Torch 张量 t_mean = torchify(t_mean) # 将对数方差数据转换为 Torch 张量 t_log_var = torchify(t_log_var) # 计算重构误差,使用二交叉熵损失函数 BCE = torch.sum(F.binary_cross_entropy(X_recon, X, reduction="none"), dim=1) # 计算 KL 散度,参考 VAE 论文的附录 B # Kingma and Welling. Auto-Encoding Variational Bayes. ICLR, 2014 # https://arxiv.org/abs/1312.6114 # 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2) KLD = -0.5 * torch.sum(1 + t_log_var - t_mean.pow(2) - t_log_var.exp(), dim=1) # 计算总损失,包括重构误差和 KL 散度 loss = torch.mean(BCE + KLD) # 反向传播计算梯度 loss.backward() # 将损失值和各个梯度信息保存到字典中并返回 grads = {
"loss": loss.detach().numpy(), "dX_recon": X_recon.grad.numpy(), "dt_mean": t_mean.grad.numpy(), "dt_log_var": t_log_var.grad.numpy(), } return grads # 定义一个 TorchWGANGPLoss 类,继承自 nn.Module class TorchWGANGPLoss(nn.Module): # 初始化函数,接受一个 lambda_ 参数,默认值为 10 def __init__(self, lambda_=10): # 将 lambda_ 转换为张量形式 self.lambda_ = torchify([lambda_]) # 调用父类的初始化函数 super(TorchWGANGPLoss, self).__init__() # 前向传播函数,接受 Y_real, Y_fake, gradInterp 三个参数 def forward(self, Y_real, Y_fake, gradInterp): # 复制 Y_fake 到 GY_fake GY_fake = Y_fake.copy() # 将 Y_real, Y_fake, GY_fake, gradInterp 转换为张量形式 self.Y_real = torchify(Y_real) self.Y_fake = torchify(Y_fake) self.GY_fake = torchify(GY_fake) self.gradInterp = torchify(gradInterp) # 计算梯度惩罚 norm = self.gradInterp.norm(2, dim=1) self.norm1 = torch.sqrt(torch.sum(self.gradInterp.pow(2), dim=1)) # 断言两种计算方式得到的结果应该非常接近 assert torch.allclose(norm, self.norm1) # 计算梯度惩罚项 self.gpenalty = self.lambda_ * ((self.norm1 - 1).pow(2)).mean() # 计算 C_loss 和 G_loss self.C_loss = self.Y_fake.mean() - self.Y_real.mean() + self.gpenalty self.G_loss = -self.GY_fake.mean() # 提取梯度信息函数,接受 Y_real, Y_fake, gradInterp 三个参数 def extract_grads(self, Y_real, Y_fake, gradInterp): # 调用前向传播函数 self.forward(Y_real, Y_fake, gradInterp) # 计算 C_loss 和 G_loss 的梯度 self.C_loss.backward() self.G_loss.backward() # 将各个梯度信息转换为 numpy 数组形式,存储在字典中并返回 grads = {
"Y_real": self.Y_real.detach().numpy(), "Y_fake": self.Y_fake.detach().numpy(), "gradInterp": self.gradInterp.detach().numpy(), "GP": self.gpenalty.detach().numpy(), "C_loss": self.C_loss.detach().numpy(), "G_loss": self.G_loss.detach().numpy(), "C_dY_real": self.Y_real.grad.numpy(), "C_dGradInterp": self.gradInterp.grad.numpy(), "C_dY_fake": self.Y_fake.grad.numpy(), "G_dY_fake": self.GY_fake.grad.numpy(), } return grads # 定义一个 TorchLinearActivation 类,继承自 nn.Module class TorchLinearActivation(nn.Module): # 初始化函数 def __init__(self): # 调用父类的初始化函数 super(TorchLinearActivation, self).__init__() pass # 静态方法,实现前向传播 @staticmethod def forward(input): return input # 静态方法,实现反向传播 @staticmethod def backward(grad_output): return torch.ones_like(grad_output) # 定义一个 TorchBatchNormLayer 类,继承自 nn.Module class TorchBatchNormLayer(nn.Module): # 初始化批量归一化层对象 def __init__(self, n_in, params, mode, momentum=0.9, epsilon=1e-5): # 调用父类的初始化方法 super(TorchBatchNormLayer, self).__init__() # 从参数中获取缩放因子和截距 scaler = params["scaler"] intercept = params["intercept"] # 根据模式选择不同维度的批量归一化层 if mode == "1D": self.layer1 = nn.BatchNorm1d( num_features=n_in, momentum=1 - momentum, eps=epsilon, affine=True ) elif mode == "2D": self.layer1 = nn.BatchNorm2d( num_features=n_in, momentum=1 - momentum, eps=epsilon, affine=True ) # 设置批量归一化层的权重和偏置 self.layer1.weight = nn.Parameter(torch.FloatTensor(scaler)) self.layer1.bias = nn.Parameter(torch.FloatTensor(intercept)) # 前向传播函数 def forward(self, X): # 调整输入张量的维度顺序,从(N, H, W, C)到(N, C, H, W) if X.ndim == 4: X = np.moveaxis(X, [0, 1, 2, 3], [0, -2, -1, -3]) # 如果输入不是torch张量,则转换为torch张量 if not isinstance(X, torch.Tensor): X = torchify(X) # 保存输入张量和经过批量归一化层后的输出张量 self.X = X self.Y = self.layer1(self.X) # 保留输出张量的梯度信息 self.Y.retain_grad() # 从神经网络中提取梯度信息 def extract_grads(self, X, Y_true=None): # 进行前向传播 self.forward(X) # 如果真实标签是 NumPy 数组 if isinstance(Y_true, np.ndarray): # 调整真实标签的维度顺序 Y_true = np.moveaxis(Y_true, [0, 1, 2, 3], [0, -2, -1, -3]) # 计算损失函数 self.loss1 = ( 0.5 * F.mse_loss(self.Y, torchify(Y_true), size_average=False).sum() ) else: # 如果没有真实标签,直接将输出求和作为损失 self.loss1 = self.Y.sum() # 反向传播计算梯度 self.loss1.backward() # 将张量转换为 NumPy 数组 X_np = self.X.detach().numpy() Y_np = self.Y.detach().numpy() dX_np = self.X.grad.numpy() dY_np = self.Y.grad.numpy() # 如果输入数据的维度为4 if self.X.dim() == 4: orig, X_swap = [0, 1, 2, 3], [0, -1, -3, -2] # 调整真实标签的维度顺序 if isinstance(Y_true, np.ndarray): Y_true = np.moveaxis(Y_true, orig, X_swap) X_np = np.moveaxis(X_np, orig, X_swap) Y_np = np.moveaxis(Y_np, orig, X_swap) dX_np = np.moveaxis(dX_np, orig, X_swap) dY_np = np.moveaxis(dY_np, orig, X_swap) # 构建梯度字典 grads = {
"loss": self.loss1.detach().numpy(), "X": X_np, "momentum": 1 - self.layer1.momentum, "epsilon": self.layer1.eps, "intercept": self.layer1.bias.detach().numpy(), "scaler": self.layer1.weight.detach().numpy(), "running_mean": self.layer1.running_mean.detach().numpy(), "running_var": self.layer1.running_var.detach().numpy(), "y": Y_np, "dLdy": dY_np, "dLdIntercept": self.layer1.bias.grad.numpy(), "dLdScaler": self.layer1.weight.grad.numpy(), "dLdX": dX_np, } # 如果真实标签是 NumPy 数组,将其加入梯度字典 if isinstance(Y_true, np.ndarray): grads["Y_true"] = Y_true # 返回梯度字典 return grads # 定义一个继承自 nn.Module 的 TorchLayerNormLayer 类 class TorchLayerNormLayer(nn.Module): # 初始化方法,接受特征维度、参数、模式和 epsilon 参数 def __init__(self, feat_dims, params, mode, epsilon=1e-5): super(TorchLayerNormLayer, self).__init__() # 创建 LayerNorm 层,指定特征维度、epsilon 值和是否启用 elementwise_affine self.layer1 = nn.LayerNorm( normalized_shape=feat_dims, eps=epsilon, elementwise_affine=True ) # 从参数中获取 scaler 和 intercept scaler = params["scaler"] intercept = params["intercept"] # 如果模式为 "2D",则调整 scaler 和 intercept 的维度 if mode == "2D": scaler = np.moveaxis(scaler, [0, 1, 2], [-2, -1, -3]) intercept = np.moveaxis(intercept, [0, 1, 2], [-2, -1, -3]) # 断言 scaler 和 intercept 的形状与 LayerNorm 层的权重和偏置形状相同 assert scaler.shape == self.layer1.weight.shape assert intercept.shape == self.layer1.bias.shape # 将 scaler 和 intercept 转换为 nn.Parameter 类型,并赋值给 LayerNorm 层的权重和偏置 self.layer1.weight = nn.Parameter(torch.FloatTensor(scaler)) self.layer1.bias = nn.Parameter(torch.FloatTensor(intercept)) # 前向传播方法,接受输入 X def forward(self, X): # 如果输入 X 的维度为 4,则调整维度顺序为 (N, C, H, W) if X.ndim == 4: X = np.moveaxis(X, [0, 1, 2, 3], [0, -2, -1, -3]) # 如果输入 X 不是 torch.Tensor 类型,则转换为 torch.Tensor if not isinstance(X, torch.Tensor): X = torchify(X) # 将输入 X 保存在 self.X 中,并通过 LayerNorm 层得到输出 self.Y self.X = X self.Y = self.layer1(self.X) # 保留 self.Y 的梯度信息 self.Y.retain_grad() # 从输入数据 X 中提取梯度信息,如果提供了真实标签 Y_true,则计算损失 def extract_grads(self, X, Y_true=None): # 进行前向传播 self.forward(X) # 如果 Y_true 是 numpy 数组,则调整其维度顺序 if isinstance(Y_true, np.ndarray): Y_true = np.moveaxis(Y_true, [0, 1, 2, 3], [0, -2, -1, -3]) # 计算损失函数 self.loss1 = ( 0.5 * F.mse_loss(self.Y, torchify(Y_true), size_average=False).sum() ) else: # 如果没有提供 Y_true,则将损失设为 Y 的总和 self.loss1 = self.Y.sum() # 反向传播计算梯度 self.loss1.backward() # 将张量转换为 numpy 数组 X_np = self.X.detach().numpy() Y_np = self.Y.detach().numpy() dX_np = self.X.grad.numpy() dY_np = self.Y.grad.numpy() intercept_np = self.layer1.bias.detach().numpy() scaler_np = self.layer1.weight.detach().numpy() dIntercept_np = self.layer1.bias.grad.numpy() dScaler_np = self.layer1.weight.grad.numpy() # 如果输入数据 X 的维度为 4,则调整维度顺序 if self.X.dim() == 4: orig, X_swap = [0, 1, 2, 3], [0, -1, -3, -2] orig_p, p_swap = [0, 1, 2], [-1, -3, -2] if isinstance(Y_true, np.ndarray): Y_true = np.moveaxis(Y_true, orig, X_swap) X_np = np.moveaxis(X_np, orig, X_swap) Y_np = np.moveaxis(Y_np, orig, X_swap) dX_np = np.moveaxis(dX_np, orig, X_swap) dY_np = np.moveaxis(dY_np, orig, X_swap) scaler_np = np.moveaxis(scaler_np, orig_p, p_swap) intercept_np = np.moveaxis(intercept_np, orig_p, p_swap) dScaler_np = np.moveaxis(dScaler_np, orig_p, p_swap) dIntercept_np = np.moveaxis(dIntercept_np, orig_p, p_swap) # 构建梯度字典 grads = {
"loss": self.loss1.detach().numpy(), "X": X_np, "epsilon": self.layer1.eps, "intercept": intercept_np, "scaler": scaler_np, "y": Y_np, "dLdy": dY_np, "dLdIntercept": dIntercept_np, "dLdScaler": dScaler_np, "dLdX": dX_np, } # 如果提供了 Y_true,则将其加入梯度字典 if isinstance(Y_true, np.ndarray): grads["Y_true"] = Y_true # 返回梯度字典 return grads class TorchAddLayer(nn.Module): # 定义 TorchAddLayer 类,继承自 nn.Module def __init__(self, act_fn, kwargs): # 初始化函数,接受激活函数 act_fn 和其他关键字参数 super(TorchAddLayer, self).__init__() # 调用父类的初始化函数 self.act_fn = act_fn # 设置实例变量 act_fn 为传入的激活函数 def forward(self, Xs): # 前向传播函数,接受输入 Xs self.Xs = [] # 初始化实例变量 Xs 为空列表 x = Xs[0].copy() # 复制输入列表中的第一个素 if not isinstance(x, torch.Tensor): # 如果 x 不是 torch.Tensor 类型 x = torchify(x) # 将 x 转换为 torch.Tensor 类型 self.sum = x.clone() # 克隆 x 并赋值给实例变量 sum x.retain_grad() # 保留 x 的梯度信息 self.Xs.append(x) # 将 x 添加到 Xs 列表中 for i in range(1, len(Xs)): # 遍历输入列表中的其他素 x = Xs[i] # 获取当前素 if not isinstance(x, torch.Tensor): # 如果 x 不是 torch.Tensor 类型 x = torchify(x) # 将 x 转换为 torch.Tensor 类型 x.retain_grad() # 保留 x 的梯度信息 self.Xs.append(x) # 将 x 添加到 Xs 列表中 self.sum += x # 将 x 加到 sum 中 self.sum.retain_grad() # 保留 sum 的梯度信息 self.Y = self.act_fn(self.sum) # 计算 sum 的激活值并赋值给实例变量 Y self.Y.retain_grad() # 保留 Y 的梯度信息 return self.Y # 返回 Y def extract_grads(self, X): # 提取梯度信息函数,接受输入 X self.forward(X) # 调用前向传播函数 self.loss = self.Y.sum() # 计算损失值并赋值给实例变量 loss self.loss.backward() # 反向传播计算梯度 grads = {
# 定义梯度字典 "Xs": X, # 输入 X "Sum": self.sum.detach().numpy(), # sum 的值 "Y": self.Y.detach().numpy(), # Y 的值 "dLdY": self.Y.grad.numpy(), # Y 的梯度 "dLdSum": self.sum.grad.numpy(), # sum 的梯度 } grads.update( # 更新梯度字典 {
"dLdX{}".format(i + 1): xi.grad.numpy() for i, xi in enumerate(self.Xs)} # 遍历 Xs 列表,获取每个素的梯度信息 ) return grads # 返回梯度字典 class TorchMultiplyLayer(nn.Module): # 定义 TorchMultiplyLayer 类,继承自 nn.Module def __init__(self, act_fn, kwargs): # 初始化函数,接受激活函数 act_fn 和其他关键字参数 super(TorchMultiplyLayer, self).__init__() # 调用父类的初始化函数 self.act_fn = act_fn # 设置实例变量 act_fn 为传入的激活函数 def forward(self, Xs): # 前向传播函数,接受输入 Xs self.Xs = [] # 初始化实例变量 Xs 为空列表 x = Xs[0].copy() # 复制输入列表中的第一个素 if not isinstance(x, torch.Tensor): # 如果 x 不是 torch.Tensor 类型 x = torchify(x) # 将 x 转换为 torch.Tensor 类型 self.prod = x.clone() # 克隆 x 并赋值给实例变量 prod x.retain_grad() # 保留 x 的梯度信息 self.Xs.append(x) # 将 x 添加到 Xs 列表中 for i in range(1, len(Xs)): # 遍历输入列表中的其他素 x = Xs[i] # 获取当前素 if not isinstance(x, torch.Tensor): # 如果 x 不是 torch.Tensor 类型 x = torchify(x) # 将 x 转换为 torch.Tensor 类型 x.retain_grad() # 保留 x 的梯度信息 self.Xs.append(x) # 将 x 添加到 Xs 列表中 self.prod *= x # 将 x 乘到 prod 中 self.prod.retain_grad() # 保留 prod 的梯度信息 self.Y = self.act_fn(self.prod) # 计算 prod 的激活值并赋值给实例变量 Y self.Y.retain_grad() # 保留 Y 的梯度信息 return self.Y # 返回 Y # 定义一个方法用于提取梯度信息 def extract_grads(self, X): # 调用神经网络的前向传播方法 self.forward(X) # 计算损失值,将所有素求和 self.loss = self.Y.sum() # 反向传播计算梯度 self.loss.backward() # 构建包含各个梯度信息的字典 grads = {
"Xs": X, # 输入数据 "Prod": self.prod.detach().numpy(), # 中间变量 prod 的值 "Y": self.Y.detach().numpy(), # 神经网络输出的值 "dLdY": self.Y.grad.numpy(), # 损失函数对 Y 的梯度 "dLdProd": self.prod.grad.numpy(), # 损失函数对 prod 的梯度 } # 更新字典,包含每个输入数据对应的梯度信息 grads.update( {
"dLdX{}".format(i + 1): xi.grad.numpy() for i, xi in enumerate(self.Xs)} ) # 返回包含梯度信息的字典 return grads class TorchSkipConnectionIdentity(nn.Module): # 定义一个 TorchSkipConnectionIdentity 类,继承自 nn.Module def forward(self, X): # 定义 forward 方法,接受输入 X if not isinstance(X, torch.Tensor): # 如果 X 不是 torch.Tensor 类型 # 将 X 的维度从 (N, H, W, C) 调整为 (N, C, H, W) X = np.moveaxis(X, [0, 1, 2, 3], [0, -2, -1, -3]) # 将调整后的 X 转换为 torch.Tensor 类型 X = torchify(X) self.X = X # 保留 X 的梯度信息 self.X.retain_grad() self.conv1_out = self.conv1(self.X) # 保留 conv1_out 的梯度信息 self.conv1_out.retain_grad() self.act_fn1_out = self.act_fn(self.conv1_out) # 保留 act_fn1_out 的梯度信息 self.act_fn1_out.retain_grad() self.batchnorm1_out = self.batchnorm1(self.act_fn1_out) # 保留 batchnorm1_out 的梯度信息 self.batchnorm1_out.retain_grad() self.conv2_out = self.conv2(self.batchnorm1_out) # 保留 conv2_out 的梯度信息 self.conv2_out.retain_grad() self.batchnorm2_out = self.batchnorm2(self.conv2_out) # 保留 batchnorm2_out 的梯度信息 self.batchnorm2_out.retain_grad() self.layer3_in = self.batchnorm2_out + self.X # 保留 layer3_in 的梯度信息 self.layer3_in.retain_grad() self.Y = self.act_fn(self.layer3_in) # 保留 Y 的梯度信息 self.Y.retain_grad() class TorchCausalConv1d(torch.nn.Conv1d): """https://github.com/pytorch/pytorch/issues/1333 NB: this is only ensures that the convolution out length is the same as the input length IFF stride = 1. Otherwise, in/out lengths will differ. """ # 定义 TorchCausalConv1d 类,继承自 torch.nn.Conv1d def __init__( self, in_channels, out_channels, kernel_size, stride=1, dilation=1, groups=1, bias=True, ): # 初始化方法,接受输入参数 self.__padding = (kernel_size - 1) * dilation super(TorchCausalConv1d, self).__init__( in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=self.__padding, dilation=dilation, groups=groups, bias=bias, ) def forward(self, input): # 定义 forward 方法,接受输入 input result = super(TorchCausalConv1d, self).forward(input) if self.__padding != 0: return result[:, :, : -self.__padding] return result class TorchWavenetModule(nn.Module): # 定义 TorchWavenetModule 类,继承自 nn.Module # 初始化 TorchWavenetModule 类,接受参数 params, hparams, conv_1x1_pad def __init__(self, params, hparams, conv_1x1_pad): # 调用父类的初始化方法 super(TorchWavenetModule, self).__init__() # 创建 TorchCausalConv1d 对象,用于实现膨胀卷积 self.conv_dilation = TorchCausalConv1d( in_channels=hparams["components"]["conv_dilation"]["in_ch"], out_channels=hparams["components"]["conv_dilation"]["out_ch"], kernel_size=hparams["components"]["conv_dilation"]["kernel_width"], stride=hparams["components"]["conv_dilation"]["stride"], dilation=hparams["components"]["conv_dilation"]["dilation"] + 1, bias=True, ) # 创建 nn.Conv1d 对象,用于实现 1x1 卷积 self.conv_1x1 = nn.Conv1d( in_channels=hparams["components"]["conv_1x1"]["in_ch"], out_channels=hparams["components"]["conv_1x1"]["out_ch"], kernel_size=hparams["components"]["conv_1x1"]["kernel_width"], stride=hparams["components"]["conv_1x1"]["stride"], padding=conv_1x1_pad, dilation=hparams["components"]["conv_1x1"]["dilation"] + 1, bias=True, ) # 初始化膨胀卷积的权重和偏置 W = params["components"]["conv_dilation"]["W"] b = params["components"]["conv_dilation"]["b"] W = np.moveaxis(W, [0, 1, 2], [-1, -2, -3]) # 调整权重的维度顺序 self.conv_dilation.weight = nn.Parameter(torch.FloatTensor(W)) self.conv_dilation.bias = nn.Parameter(torch.FloatTensor(b.flatten())) assert self.conv_dilation.weight.shape == W.shape assert self.conv_dilation.bias.shape == b.flatten().shape # 初始化 1x1 卷积的权重和偏置 W = params["components"]["conv_1x1"]["W"] b = params["components"]["conv_1x1"]["b"] W = np.moveaxis(W, [0, 1, 2], [-1, -2, -3]) # 调整权重的维度顺序 self.conv_1x1.weight = nn.Parameter(torch.FloatTensor(W)) self.conv_1x1.bias = nn.Parameter(torch.FloatTensor(b.flatten())) assert self.conv_1x1.weight.shape == W.shape assert self.conv_1x1.bias.shape == b.flatten().shape def forward(self, X_main, X_skip): # 将输入数据的维度顺序从(N, W, C)转换为(N, C, W) self.X_main = np.moveaxis(X_main, [0, 1, 2], [0, -1, -2]) # 将转换后的数据转换为torch张量 self.X_main = torchify(self.X_main) # 保留梯度信息 self.X_main.retain_grad() # 使用卷积扩张操作处理转换后的数据 self.conv_dilation_out = self.conv_dilation(self.X_main) self.conv_dilation_out.retain_grad() # 对卷积扩张输出进行tanh和sigmoid激活函数处理 self.tanh_out = torch.tanh(self.conv_dilation_out) self.sigm_out = torch.sigmoid(self.conv_dilation_out) # 保留梯度信息 self.tanh_out.retain_grad() self.sigm_out.retain_grad() # 将tanh和sigmoid输出相乘 self.multiply_gate_out = self.tanh_out * self.sigm_out self.multiply_gate_out.retain_grad() # 使用1x1卷积处理相乘结果 self.conv_1x1_out = self.conv_1x1(self.multiply_gate_out) self.conv_1x1_out.retain_grad() # 初始化X_skip为与conv_1x1_out相同形状的全零张量 self.X_skip = torch.zeros_like(self.conv_1x1_out) # 如果X_skip不为空,则将其转换为torch张量 if X_skip is not None: self.X_skip = torchify(np.moveaxis(X_skip, [0, 1, 2], [0, -1, -2])) self.X_skip.retain_grad() # 计算Y_skip和Y_main self.Y_skip = self.X_skip + self.conv_1x1_out self.Y_main = self.X_main + self.conv_1x1_out # 保留梯度信息 self.Y_skip.retain_grad() self.Y_main.retain_grad() class TorchSkipConnectionConv(nn.Module): def __init__( self, act_fn, pad1, pad2, pad_skip, params, hparams, momentum=0.9, epsilon=1e-5 # 初始化函数,定义了跳跃连接卷积层的参数和超参数 def forward(self, X): # 检查输入是否为 torch.Tensor 类型,如果不是则进行转换 if not isinstance(X, torch.Tensor): # 将输入的维度顺序从 (N, H, W, C) 调整为 (N, C, H, W) X = np.moveaxis(X, [0, 1, 2, 3], [0, -2, -1, -3]) X = torchify(X) self.X = X self.X.retain_grad() # 对输入进行第一次卷积操作 self.conv1_out = self.conv1(self.X) self.conv1_out.retain_grad() # 对第一次卷积结果应用激活函数 self.act_fn1_out = self.act_fn(self.conv1_out) self.act_fn1_out.retain_grad() # 对激活函数输出进行批归一化 self.batchnorm1_out = self.batchnorm1(self.act_fn1_out) self.batchnorm1_out.retain_grad() # 对批归一化结果进行第二次卷积操作 self.conv2_out = self.conv2(self.batchnorm1_out) self.conv2_out.retain_grad() # 对第二次卷积结果进行批归一化 self.batchnorm2_out = self.batchnorm2(self.conv2_out) self.batchnorm2_out.retain_grad() # 对输入进行跳跃连接卷积操作 self.c_skip_out = self.conv_skip(self.X) self.c_skip_out.retain_grad() # 对跳跃连接卷积结果进行批归一化 self.bn_skip_out = self.batchnorm_skip(self.c_skip_out) self.bn_skip_out.retain_grad() # 将第二次卷积结果和跳跃连接卷积结果相加作为第三层的输入 self.layer3_in = self.batchnorm2_out + self.bn_skip_out self.layer3_in.retain_grad() # 对第三层的输入应用激活函数 self.Y = self.act_fn(self.layer3_in) self.Y.retain_grad() class TorchBidirectionalLSTM(nn.Module): def forward(self, X): # 将输入的维度顺序从 (batch, input_size, seq_len) 调整为 (seq_len, batch, input_size) self.X = np.moveaxis(X, [0, 1, 2], [-2, -1, -3]) # 检查输入是否为 torch.Tensor 类型,如果不是则进行转换 if not isinstance(self.X, torch.Tensor): self.X = torchify(self.X) self.X.retain_grad() # 初始化隐藏状态为 0 n_ex, n_in, n_timesteps = self.X.shape n_out, n_out = self.layer1.weight_hh_l0.shape # 前向传播 self.A, (At, Ct) = self.layer1(self.X) self.A.retain_grad() return self.A class TorchPool2DLayer(nn.Module): # 初始化 TorchPool2DLayer 类,设置输入通道数和超参数 def __init__(self, in_channels, hparams, kwargs): # 调用父类的初始化方法 super(TorchPool2DLayer, self).__init__() # 根据超参数中的模式选择不同的池化层 if hparams["mode"] == "max": self.layer1 = nn.MaxPool2d( kernel_size=hparams["kernel_shape"], padding=hparams["pad"], stride=hparams["stride"], ) elif hparams["mode"] == "average": self.layer1 = nn.AvgPool2d( kernel_size=hparams["kernel_shape"], padding=hparams["pad"], stride=hparams["stride"], ) # 前向传播函数 def forward(self, X): # 将输入数据的维度顺序从 (N, H, W, C) 调整为 (N, C, H, W) self.X = np.moveaxis(X, [0, 1, 2, 3], [0, -2, -1, -3]) # 如果输入数据不是 torch.Tensor 类型,则转换为 torch.Tensor if not isinstance(self.X, torch.Tensor): self.X = torchify(self.X) # 保留输入数据的梯度 self.X.retain_grad() # 对输入数据进行池化操作,得到输出数据 self.Y = self.layer1(self.X) # 保留输出数据的梯度 self.Y.retain_grad() # 返回输出数据 return self.Y # 提取梯度信息函数 def extract_grads(self, X): # 运行前向传播函数得到输出数据 self.forward(X) # 计算损失函数为输出数据的和 self.loss = self.Y.sum() # 反向传播计算梯度 self.loss.backward() # 调整梯度信息的维度顺序,以适应不同的表示方式 orig, X_swap = [0, 1, 2, 3], [0, -1, -3, -2] grads = {
"X": np.moveaxis(self.X.detach().numpy(), orig, X_swap), "y": np.moveaxis(self.Y.detach().numpy(), orig, X_swap), "dLdY": np.moveaxis(self.Y.grad.numpy(), orig, X_swap), "dLdX": np.moveaxis(self.X.grad.numpy(), orig, X_swap), } # 返回梯度信息字典 return grads # 定义一个 TorchConv2DLayer 类,继承自 nn.Module 类 class TorchConv2DLayer(nn.Module): # 初始化函数,接受输入通道数、输出通道数、激活函数、参数、超参数等参数 def __init__(self, in_channels, out_channels, act_fn, params, hparams, kwargs): # 调用父类的初始化函数 super(TorchConv2DLayer, self).__init__() # 从参数中获取权重 W 和偏置 b W = params["W"] b = params["b"] # 保存激活函数 self.act_fn = act_fn # 创建一个卷积层,设置输入通道数、输出通道数、卷积核形状、填充、步长、膨胀等参数 self.layer1 = nn.Conv2d( in_channels, out_channels, hparams["kernel_shape"], padding=hparams["pad"], stride=hparams["stride"], dilation=hparams["dilation"] + 1, bias=True, ) # 调整权重 W 的维度顺序,使其与卷积层的权重维度匹配 W = np.moveaxis(W, [0, 1, 2, 3], [-2, -1, -3, -4]) # 断言卷积层的权重形状与调整后的 W 的形状相同 assert self.layer1.weight.shape == W.shape # 断言卷积层的偏置形状与展平后的 b 的形状相同 assert self.layer1.bias.shape == b.flatten().shape # 将调整后的 W 转换为 PyTorch 的参数形式,并赋值给卷积层的权重 self.layer1.weight = nn.Parameter(torch.FloatTensor(W)) # 将展平后的 b 转换为 PyTorch 的参数形式,并赋值给卷积层的偏置 self.layer1.bias = nn.Parameter(torch.FloatTensor(b.flatten())) # 前向传播函数,接受输入 X,进行卷积操作和激活函数操作,并返回结果 def forward(self, X): # 调整输入 X 的维度顺序,使其与卷积层的输入维度匹配 self.X = np.moveaxis(X, [0, 1, 2, 3], [0, -2, -1, -3]) # 如果输入 X 不是 torch.Tensor 类型,则转换为 torch.Tensor 类型 if not isinstance(self.X, torch.Tensor): self.X = torchify(self.X) # 保留输入 X 的梯度信息 self.X.retain_grad() # 对输入 X 进行卷积操作,保存结果并保留梯度信息 self.Z = self.layer1(self.X) self.Z.retain_grad() # 对卷积结果进行激活函数操作,保存结果并保留梯度信息 self.Y = self.act_fn(self.Z) self.Y.retain_grad() # 返回激活函数的结果 return self.Y # 提取梯度信息 def extract_grads(self, X): # 进行前向传播 self.forward(X) # 计算损失值 self.loss = self.Y.sum() # 反向传播计算梯度 self.loss.backward() # 定义坐标转换规则 orig, X_swap, W_swap = [0, 1, 2, 3], [0, -1, -3, -2], [-1, -2, -4, -3] # 提取各个梯度信息并进行坐标转换 grads = {
"X": np.moveaxis(self.X.detach().numpy(), orig, X_swap), "W": np.moveaxis(self.layer1.weight.detach().numpy(), orig, W_swap), "b": self.layer1.bias.detach().numpy().reshape(1, 1, 1, -1), "y": np.moveaxis(self.Y.detach().numpy(), orig, X_swap), "dLdY": np.moveaxis(self.Y.grad.numpy(), orig, X_swap), "dLdZ": np.moveaxis(self.Z.grad.numpy(), orig, X_swap), "dLdW": np.moveaxis(self.layer1.weight.grad.numpy(), orig, W_swap), "dLdB": self.layer1.bias.grad.numpy().reshape(1, 1, 1, -1), "dLdX": np.moveaxis(self.X.grad.numpy(), orig, X_swap), } # 返回梯度信息字典 return grads class TorchConv1DLayer(nn.Module): # 定义一个继承自 nn.Module 的 TorchConv1DLayer 类 def __init__(self, in_channels, out_channels, act_fn, params, hparams, kwargs): # 初始化函数,接受输入通道数、输出通道数、激活函数、参数、超参数等参数 # 调用父类的初始化函数 super(TorchConv1DLayer, self).__init__() # 从参数中获取权重 W 和偏置 b W = params["W"] b = params["b"] self.act_fn = act_fn # 创建一个一维卷积层,设置输入通道数、输出通道数、卷积核宽度、填充、步长、膨胀等参数 self.layer1 = nn.Conv1d( in_channels, out_channels, hparams["kernel_width"], padding=hparams["pad"], stride=hparams["stride"], dilation=hparams["dilation"] + 1, bias=True, ) # 调整权重 W 的维度顺序 W = np.moveaxis(W, [0, 1, 2], [-1, -2, -3]) # 断言卷积层的权重形状与调整后的 W 的形状相同 assert self.layer1.weight.shape == W.shape # 断言卷积层的偏置形状与展平后的 b 的形状相同 assert self.layer1.bias.shape == b.flatten().shape # 将调整后的 W 赋值给卷积层的权重 self.layer1.weight = nn.Parameter(torch.FloatTensor(W)) # 将展平后的 b 赋值给卷积层的偏置 self.layer1.bias = nn.Parameter(torch.FloatTensor(b.flatten())) def forward(self, X): # 前向传播函数,接受输入 X # 调整输入 X 的维度顺序 self.X = np.moveaxis(X, [0, 1, 2], [0, -1, -2]) # 如果输入 X 不是 torch.Tensor 类型,则转换为 torch.Tensor 类型 if not isinstance(self.X, torch.Tensor): self.X = torchify(self.X) # 保留输入 X 的梯度信息 self.X.retain_grad() # 对输入 X 进行卷积操作,得到 Z self.Z = self.layer1(self.X) # 保留 Z 的梯度信息 self.Z.retain_grad() # 对 Z 应用激活函数,得到 Y self.Y = self.act_fn(self.Z) # 保留 Y 的梯度信息 self.Y.retain_grad() # 返回 Y return self.Y # 提取梯度信息 def extract_grads(self, X): # 进行前向传播 self.forward(X) # 计算损失值 self.loss = self.Y.sum() # 反向传播计算梯度 self.loss.backward() # 定义坐标转换规则 orig, X_swap, W_swap = [0, 1, 2], [0, -1, -2], [-1, -2, -3] # 提取各个梯度信息并进行坐标转换 grads = {
"X": np.moveaxis(self.X.detach().numpy(), orig, X_swap), "W": np.moveaxis(self.layer1.weight.detach().numpy(), orig, W_swap), "b": self.layer1.bias.detach().numpy().reshape(1, 1, -1), "y": np.moveaxis(self.Y.detach().numpy(), orig, X_swap), "dLdY": np.moveaxis(self.Y.grad.numpy(), orig, X_swap), "dLdZ": np.moveaxis(self.Z.grad.numpy(), orig, X_swap), "dLdW": np.moveaxis(self.layer1.weight.grad.numpy(), orig, W_swap), "dLdB": self.layer1.bias.grad.numpy().reshape(1, 1, -1), "dLdX": np.moveaxis(self.X.grad.numpy(), orig, X_swap), } # 返回梯度信息字典 return grads # 定义一个 TorchDeconv2DLayer 类,继承自 nn.Module class TorchDeconv2DLayer(nn.Module): # 初始化函数,接受输入通道数、输出通道数、激活函数、参数、超参数等参数 def __init__(self, in_channels, out_channels, act_fn, params, hparams, kwargs): # 调用父类的初始化函数 super(TorchDeconv2DLayer, self).__init__() # 从参数中获取权重和偏置 W = params["W"] b = params["b"] self.act_fn = act_fn # 创建一个反卷积层,设置输入通道数、输出通道数、卷积核形状、填充、步幅等参数 self.layer1 = nn.ConvTranspose2d( in_channels, out_channels, hparams["kernel_shape"], padding=hparams["pad"], stride=hparams["stride"], dilation=1, bias=True, ) # 调整权重的维度顺序,使其与反卷积层的权重维度匹配 W = np.moveaxis(W, [0, 1, 2, 3], [-2, -1, -4, -3]) # 断言反卷积层的权重形状与调整后的权重形状相同 assert self.layer1.weight.shape == W.shape # 断言反卷积层的偏置形状与调整后的偏置形状相同 assert self.layer1.bias.shape == b.flatten().shape # 将调整后的权重设置为反卷积层的权重参数 self.layer1.weight = nn.Parameter(torch.FloatTensor(W)) # 将调整后的偏置设置为反卷积层的偏置参数 self.layer1.bias = nn.Parameter(torch.FloatTensor(b.flatten())) # 前向传播函数,接受输入数据 X,返回激活后的输出数据 Y def forward(self, X): # 调整输入数据的维度顺序,使其与反卷积层的输入数据维度匹配 self.X = np.moveaxis(X, [0, 1, 2, 3], [0, -2, -1, -3]) # 如果输入数据不是 torch.Tensor 类型,则转换为 torch.Tensor 类型 if not isinstance(self.X, torch.Tensor): self.X = torchify(self.X) # 保留输入数据的梯度信息 self.X.retain_grad() # 将输入数据传入反卷积层,得到输出数据 Z,并保留输出数据的梯度信息 self.Z = self.layer1(self.X) self.Z.retain_grad() # 对输出数据 Z 应用激活函数,得到最终输出数据 Y,并保留输出数据的梯度信息 self.Y = self.act_fn(self.Z) self.Y.retain_grad() # 返回最终输出数据 Y return self.Y # 提取梯度信息 def extract_grads(self, X): # 进行前向传播 self.forward(X) # 计算损失值 self.loss = self.Y.sum() # 反向传播计算梯度 self.loss.backward() # 定义坐标转换规则 orig, X_swap, W_swap = [0, 1, 2, 3], [0, -1, -3, -2], [-2, -1, -4, -3] # 提取各个梯度信息并进行坐标转换 grads = {
"X": np.moveaxis(self.X.detach().numpy(), orig, X_swap), "W": np.moveaxis(self.layer1.weight.detach().numpy(), orig, W_swap), "b": self.layer1.bias.detach().numpy().reshape(1, 1, 1, -1), "y": np.moveaxis(self.Y.detach().numpy(), orig, X_swap), "dLdY": np.moveaxis(self.Y.grad.numpy(), orig, X_swap), "dLdZ": np.moveaxis(self.Z.grad.numpy(), orig, X_swap), "dLdW": np.moveaxis(self.layer1.weight.grad.numpy(), orig, W_swap), "dLdB": self.layer1.bias.grad.numpy().reshape(1, 1, 1, -1), "dLdX": np.moveaxis(self.X.grad.numpy(), orig, X_swap), } # 返回梯度信息字典 return grads # 定义一个继承自 nn.Module 的 TorchLSTMCell 类 class TorchLSTMCell(nn.Module): # 初始化方法,接受输入维度、输出维度、参数字典和其他关键字参数 def __init__(self, n_in, n_out, params, kwargs): # 调用父类的初始化方法 super(TorchLSTMCell, self).__init__() # 从参数字典中获取权重矩阵,并转置 Wiu = params["Wu"][n_out:, :].T Wif = params["Wf"][n_out:, :].T Wic = params["Wc"][n_out:, :].T Wio = params["Wo"][n_out:, :].T # 将权重矩阵堆叠成输入权重矩阵 W_ih = np.vstack([Wiu, Wif, Wic, Wio]) # 从参数字典中获取权重矩阵,并转置 Whu = params["Wu"][:n_out, :].T Whf = params["Wf"][:n_out, :].T Whc = params["Wc"][:n_out, :].T Who = params["Wo"][:n_out, :].T # 将权重矩阵堆叠成隐藏状态权重矩阵 W_hh = np.vstack([Whu, Whf, Whc, Who]) # 创建一个 LSTMCell 层,设置输入维度、输出维度和是否包含偏置 self.layer1 = nn.LSTMCell(input_size=n_in, hidden_size=n_out, bias=True) # 断言输入权重矩阵的形状与 LSTMCell 层的输入权重矩阵形状相同 assert self.layer1.weight_ih.shape == W_ih.shape # 断言隐藏状态权重矩阵的形状与 LSTMCell 层的隐藏状态权重矩阵形状相同 assert self.layer1.weight_hh.shape == W_hh.shape # 将输入权重矩阵转换为可训练参数并赋值给 LSTMCell 层的输入权重矩阵 self.layer1.weight_ih = nn.Parameter(torch.FloatTensor(W_ih)) # 将隐藏状态权重矩阵转换为可训练参数并赋值给 LSTMCell 层的隐藏状态权重矩阵 self.layer1.weight_hh = nn.Parameter(torch.FloatTensor(W_hh)) # 将偏置参数从参数字典中提取并拼接成一个一维数组 b = np.concatenate( [params["bu"], params["bf"], params["bc"], params["bo"]], axis=-1 ).flatten() # 断言输入偏置参数的形状与 LSTMCell 层的输入偏置参数形状相同 assert self.layer1.bias_ih.shape == b.shape # 断言隐藏状态偏置参数的形状与 LSTMCell 层的隐藏状态偏置参数形状相同 assert self.layer1.bias_hh.shape == b.shape # 将偏置参数转换为可训练参数并赋值给 LSTMCell 层的输入偏置参数 self.layer1.bias_ih = nn.Parameter(torch.FloatTensor(b)) # 将偏置参数转换为可训练参数并赋值给 LSTMCell 层的隐藏状态偏置参数 self.layer1.bias_hh = nn.Parameter(torch.FloatTensor(b)) # 定义一个前向传播函数,接受输入 X def forward(self, X): # 将输入 X 存储在对象中 self.X = X # 如果输入 X 不是 torch.Tensor 类型,则将其转换为 torch.Tensor 类型 if not isinstance(self.X, torch.Tensor): self.X = torchify(self.X) # 保留输入 X 的梯度信息 self.X.retain_grad() # 初始化隐藏状态为 0 n_ex, n_in, n_timesteps = self.X.shape n_out, n_out = self.layer1.weight_hh.shape # 初始化隐藏状态 a0 和 c0 a0 = torchify(np.zeros((n_ex, n_out))) c0 = torchify(np.zeros((n_ex, n_out))) a0.retain_grad() c0.retain_grad() # 执行前向传播 A, C = [], [] at = a0 ct = c0 for t in range(n_timesteps): A.append(at) C.append(ct) at1, ct1 = self.layer1(self.X[:, :, t], (at, ct)) at.retain_grad() ct.retain_grad() at = at1 ct = ct1 at.retain_grad() ct.retain_grad() A.append(at) C.append(ct) # 不包括 a0 在输出中 self.A = A[1:] self.C = C[1:] # 返回隐藏状态 A 和 C return self.A, self.C class TorchRNNCell(nn.Module): # 定义 TorchRNNCell 类,继承自 nn.Module def __init__(self, n_in, n_hid, params, kwargs): # 初始化方法 super(TorchRNNCell, self).__init__() # 创建一个 RNNCell 层,输入维度为 n_in,隐藏层维度为 n_hid,包含偏置,激活函数为 tanh self.layer1 = nn.RNNCell(n_in, n_hid, bias=True, nonlinearity="tanh") # 设置权重和偏置以匹配 RNNCell 的权重和偏置 # 注意:我们将 RNNCell 的权重和偏置的转置传递给 pytorch,这意味着我们需要针对权重的转置检查我们的输出的转置 self.layer1.weight_ih = nn.Parameter(torch.FloatTensor(params["Wax"].T)) self.layer1.weight_hh = nn.Parameter(torch.FloatTensor(params["Waa"].T)) self.layer1.bias_ih = nn.Parameter(torch.FloatTensor(params["bx"].T)) self.layer1.bias_hh = nn.Parameter(torch.FloatTensor(params["ba"].T)) def forward(self, X): # 前向传播方法 self.X = X if not isinstance(self.X, torch.Tensor): self.X = torchify(self.X) self.X.retain_grad() # 初始隐藏状态为 0 n_ex, n_in, n_timesteps = self.X.shape n_out, n_out = self.layer1.weight_hh.shape # 初始化隐藏状态 a0 = torchify(np.zeros((n_ex, n_out))) a0.retain_grad() # 前向传播 A = [] at = a0 for t in range(n_timesteps): A += [at] at1 = self.layer1(self.X[:, :, t], at) at.retain_grad() at = at1 at.retain_grad() A += [at] # 不包括 a0 在我们的输出中 self.A = A[1:] return self.A # 定义一个方法用于提取梯度信息 def extract_grads(self, X): # 运行前向传播 self.forward(X) # 计算损失值并将所有损失值叠加在一起 self.loss = torch.stack(self.A).sum() # 反向传播计算梯度 self.loss.backward() # 提取并保存各个参数的梯度信息到字典中 grads = {
"X": self.X.detach().numpy(), "ba": self.layer1.bias_hh.detach().numpy(), "bx": self.layer1.bias_ih.detach().numpy(), "Wax": self.layer1.weight_ih.detach().numpy(), "Waa": self.layer1.weight_hh.detach().numpy(), "y": torch.stack(self.A).detach().numpy(), "dLdA": np.array([a.grad.numpy() for a in self.A]), "dLdWaa": self.layer1.weight_hh.grad.numpy(), "dLdWax": self.layer1.weight_ih.grad.numpy(), "dLdBa": self.layer1.bias_hh.grad.numpy(), "dLdBx": self.layer1.bias_ih.grad.numpy(), "dLdX": self.X.grad.numpy(), } # 返回保存梯度信息的字典 return grads class TorchFCLayer(nn.Module): # 定义一个全连接层的类 def __init__(self, n_in, n_hid, act_fn, params, kwargs): # 初始化函数,接受输入维度、隐藏层维度、激活函数、参数等参数 super(TorchFCLayer, self).__init__() # 调用父类的初始化函数 self.layer1 = nn.Linear(n_in, n_hid) # 创建一个线性层,输入维度为n_in,输出维度为n_hid # explicitly set weights and bias # 明确设置权重和偏置 # NB: we pass the *transpose* of the weights to pytorch, meaning # we'll need to check against the *transpose* of our outputs for # any function of the weights # 注意:我们将权重的转置传递给pytorch,这意味着我们需要检查权重的输出的转置 self.layer1.weight = nn.Parameter(torch.FloatTensor(params["W"].T)) # 设置权重为参数中W的转置 self.layer1.bias = nn.Parameter(torch.FloatTensor(params["b"])) # 设置偏置为参数中b self.act_fn = act_fn # 设置激活函数 self.model = nn.Sequential(self.layer1, self.act_fn) # 创建一个包含线性层和激活函数的序列模型 def forward(self, X): # 前向传播函数 self.X = X # 保存输入数据 if not isinstance(X, torch.Tensor): self.X = torchify(X) # 如果输入数据不是torch张量,则转换为torch张量 self.z1 = self.layer1(self.X) # 计算线性层的输出 self.z1.retain_grad() # 保留梯度信息 self.out1 = self.act_fn(self.z1) # 计算激活函数的输出 self.out1.retain_grad() # 保留梯度信息 def extract_grads(self, X): # 提取梯度信息的函数 self.forward(X) # 调用前向传播函数 self.loss1 = self.out1.sum() # 计算损失值 self.loss1.backward() # 反向传播计算梯度 grads = {
"X": self.X.detach().numpy(), "b": self.layer1.bias.detach().numpy(), "W": self.layer1.weight.detach().numpy(), "y": self.out1.detach().numpy(), "dLdy": self.out1.grad.numpy(), "dLdZ": self.z1.grad.numpy(), "dLdB": self.layer1.bias.grad.numpy(), "dLdW": self.layer1.weight.grad.numpy(), "dLdX": self.X.grad.numpy(), } # 保存梯度信息到字典中 return grads # 返回梯度信息字典 class TorchEmbeddingLayer(nn.Module): # 定义一个嵌入层的类 def __init__(self, vocab_size, n_out, params, kwargs): # 初始化函数,接受词汇表大小、输出维度、参数等参数 super(TorchEmbeddingLayer, self).__init__() # 调用父类的初始化函数 self.layer1 = nn.Embedding(vocab_size, n_out) # 创建一个嵌入层,词汇表大小为vocab_size,输出维度为n_out # explicitly set embedding weights # 明确设置嵌入权重 self.layer1.weight = nn.Parameter(torch.FloatTensor(params["W"])) # 设置嵌入层的权重为参数中的W self.model = nn.Sequential(self.layer1) # 创建一个包含嵌入层的序列模型 # 定义一个前向传播函数,接受输入 X def forward(self, X): # 将输入 X 存储在对象中 self.X = X # 如果输入 X 不是 torch.Tensor 类型,则将其转换为 torch.Tensor 类型 if not isinstance(X, torch.Tensor): self.X = torch.from_numpy(X) # 将输入 X 传递给第一层神经网络,并存储输出 self.out1 = self.layer1(self.X) # 保留输出的梯度信息 self.out1.retain_grad() # 定义一个提取梯度信息的函数,接受输入 X def extract_grads(self, X): # 调用前向传播函数 self.forward(X) # 计算损失函数 loss1,为输出的和 self.loss1 = self.out1.sum() # 反向传播计算梯度 self.loss1.backward() # 提取并返回梯度信息 grads = {
"X": self.X.detach().numpy(), "W": self.layer1.weight.detach().numpy(), "y": self.out1.detach().numpy(), "dLdy": self.out1.grad.numpy(), "dLdW": self.layer1.weight.grad.numpy(), } return grads class TorchSDPAttentionLayer(nn.Module): # 定义一个基于PyTorch的自注意力层 def __init__(self): super(TorchSDPAttentionLayer, self).__init__() def forward(self, Q, K, V, mask=None): # 将输入的查询、键、值保存到对象中 self.Q = Q self.K = K self.V = V # 如果查询、键、值不是PyTorch张量,则转换为张量 if not isinstance(self.Q, torch.Tensor): self.Q = torchify(self.Q) if not isinstance(self.K, torch.Tensor): self.K = torchify(self.K) if not isinstance(self.V, torch.Tensor): self.V = torchify(self.V) # 保留查询、键、值的梯度信息 self.Q.retain_grad() self.K.retain_grad() self.V.retain_grad() # 获取键值对应的维度 self.d_k = self.Q.size(-1) # 计算注意力分数 self.scores = torch.matmul(self.Q, self.K.transpose(-2, -1)) / np.sqrt(self.d_k) # 如果存在掩码,则将分数中对应位置的值替换为负无穷 if mask is not None: self.scores = self.scores.masked_fill(mask == 0, -1e9) self.scores.retain_grad() # 计算注意力权重 self.weights = F.softmax(self.scores, dim=-1) self.weights.retain_grad() # 计算加权后的值 self.Y = torch.matmul(self.weights, self.V) self.Y.retain_grad() # 返回加权后的值和注意力权重 return self.Y, self.weights def extract_grads(self, Q, K, V, mask=None): # 调用前向传播计算梯度 self.forward(Q, K, V, mask=mask) # 计算损失值 self.loss1 = self.Y.sum() # 反向传播计算梯度 self.loss1.backward() # 提取并返回各个参数的梯度信息 grads = {
"Q": self.Q.detach().numpy(), "K": self.K.detach().numpy(), "V": self.V.detach().numpy(), "d_k": self.d_k, "scores": self.scores.detach().numpy(), "weights": self.weights.detach().numpy(), "Y": self.Y.detach().numpy(), "dLdV": self.V.grad.numpy(), "dWeights": self.weights.grad.numpy(), "dScores": self.scores.grad.numpy(), "dLdQ": self.Q.grad.numpy(), "dLdK": self.K.grad.numpy(), } return grads class TorchMultiHeadedAttentionModule(nn.Module): # 初始化多头注意力模块,接受参数和超参数 def __init__(self, params, hparams): # 调用父类的初始化方法 super(TorchMultiHeadedAttentionModule, self).__init__() # 确保每个头的维度能够整除总维度 assert hparams["kqv_dim"] % hparams["n_heads"] == 0 # 设置头的数量 self.n_heads = hparams["n_heads"] # 计算每个头的潜在维度 self.latent_dim = hparams["kqv_dim"] // hparams["n_heads"] # 设置丢弃概率 self.p_dropout = hparams["dropout_p"] # 初始化投影矩阵 self.projections = {
"Q": nn.Linear(hparams["kqv_dim"], hparams["kqv_dim"]), "K": nn.Linear(hparams["kqv_dim"], hparams["kqv_dim"]), "V": nn.Linear(hparams["kqv_dim"], hparams["kqv_dim"]), "O": nn.Linear(hparams["kqv_dim"], hparams["kqv_dim"]), } # 设置投影矩阵的权重和偏置 self.projections["Q"].weight = nn.Parameter( torch.FloatTensor(params["components"]["Q"]["W"].T) ) self.projections["Q"].bias = nn.Parameter( torch.FloatTensor(params["components"]["Q"]["b"]) ) self.projections["K"].weight = nn.Parameter( torch.FloatTensor(params["components"]["K"]["W"].T) ) self.projections["K"].bias = nn.Parameter( torch.FloatTensor(params["components"]["K"]["b"]) ) self.projections["V"].weight = nn.Parameter( torch.FloatTensor(params["components"]["V"]["W"].T) ) self.projections["V"].bias = nn.Parameter( torch.FloatTensor(params["components"]["V"]["b"]) ) self.projections["O"].weight = nn.Parameter( torch.FloatTensor(params["components"]["O"]["W"].T) ) self.projections["O"].bias = nn.Parameter( torch.FloatTensor(params["components"]["O"]["b"]) ) # 初始化注意力和丢弃层 self.attn = None self.dropout = nn.Dropout(p=hparams["dropout_p"]) # 定义前向传播函数,接收查询(Q)、键(K)、值(V)和掩码(mask)作为输入 def forward(self, Q, K, V, mask=None): # 将输入的查询(Q)、键(K)、值(V)保存到当前对象中 self.Q = Q self.K = K self.V = V # 如果查询(Q)不是torch.Tensor类型,则将其转换为torch.Tensor类型 if not isinstance(self.Q, torch.Tensor): self.Q = torchify(self.Q) # 如果键(K)不是torch.Tensor类型,则将其转换为torch.Tensor类型 if not isinstance(self.K, torch.Tensor): self.K = torchify(self.K) # 如果值(V)不是torch.Tensor类型,则将其转换为torch.Tensor类型 if not isinstance(self.V, torch.Tensor): self.V = torchify(self.V) # 保留查询(Q)、键(K)、值(V)的梯度信息 self.Q.retain_grad() self.K.retain_grad() self.V.retain_grad() # 如果存在掩码(mask),则将其扩展维度 if mask is not None: mask = mask.unsqueeze(1) # 获取输入查询(Q)的样本数量 n_ex = self.Q.size(0) # 对查询(Q)、键(K)、值(V)进行线性变换并重塑维度,然后转置 self.Q_proj = ( self.projections["Q"](self.Q) .view(n_ex, -1, self.n_heads, self.latent_dim) .transpose(1, 2) ) self.K_proj = ( self.projections["K"](self.K) .view(n_ex, -1, self.n_heads, self.latent_dim) .transpose(1, 2) ) self.V_proj = ( self.projections["V"](self.V) .view(n_ex, -1, self.n_heads, self.latent_dim) .transpose(1, 2) ) # 保留查询(Q)、键(K)、值(V)的梯度信息 self.Q_proj.retain_grad() self.K_proj.retain_grad() self.V_proj.retain_grad() # 2) 在批处理中对所有投影向量应用注意力机制 self.attn_out, self.attn = TorchSDPAttentionLayer().forward( self.Q_proj, self.K_proj, self.V_proj, mask=mask ) # 保留注意力权重和输出的梯度信息 self.attn.retain_grad() self.attn_out.retain_grad() # 3) 使用视图(view)进行“连接”并应用最终的线性变换 self.attn_out_reshaped = ( self.attn_out.transpose(1, 2) .contiguous() .view(n_ex, -1, self.n_heads * self.latent_dim) ) # 保留连接后的输出的梯度信息 self.attn_out_reshaped.retain_grad() print(self.attn_out_reshaped.shape) # 对连接后的输出应用最终的线性变换 self.Y = self.projections["O"](self.attn_out_reshaped) print(self.Y.shape) # 保留最终输出的梯度信息 self.Y.retain_grad() # 定义全局变量_params和_param_aliases,用于存储参数和参数别名 _params = {
} _param_aliases = {
} # 定义param函数,用于创建共享参数变量 def param(name, *args, kwargs): """ A wrapper for `tf.Variable` which enables parameter sharing in models. Creates and returns theano shared variables similarly to `tf.Variable`, except if you try to create a param with the same name as a previously-created one, `param(...)` will just return the old one instead of making a new one. This constructor also adds a `param` attribute to the shared variables it creates, so that you can easily search a graph for all params. """ # 如果参数名不在_params中,则创建新的参数并添加到_params中 if name not in _params: kwargs["name"] = name param = tf.Variable(*args, kwargs) param.param = True _params[name] = param # 如果参数名已存在于_params中,则直接返回已存在的参数 result = _params[name] i = 0 # 处理参数别名 while result in _param_aliases: i += 1 result = _param_aliases[result] return result # 根据参数名查找所有包含该名称的参数 def params_with_name(name): return [p for n, p in _params.items() if name in n] # 定义ReLULayer函数,实现ReLU激活函数的全连接层 def ReLULayer(name, n_in, n_out, inputs, w_initialization): if isinstance(w_initialization, np.ndarray): weight_values = w_initialization.astype("float32") # 创建权重参数W,并进行矩阵乘法运算 W = param(name + ".W", weight_values) result = tf.matmul(inputs, W) # 添加偏置并进行ReLU激活 output = tf.nn.bias_add( result, param(name + ".b", np.zeros((n_out,), dtype="float32")) ) output = tf.nn.relu(output) return output, W # 定义LinearLayer函数,实现线性全连接层 def LinearLayer(name, n_in, n_out, inputs, w_initialization): if isinstance(w_initialization, np.ndarray): weight_values = w_initialization.astype("float32") # 创建权重参数W,并进行矩阵乘法运算 W = param(name + ".W", weight_values) result = tf.matmul(inputs, W) # 添加偏置 output = tf.nn.bias_add( result, param(name + ".b", np.zeros((n_out,), dtype="float32")) ) # 返回 output 和 W 两个变量 return output, W # 生成器函数,用于生成数据 def Generator(n_samples, X_real, params=None): # 设置特征数为2 n_feats = 2 # 初始化权重矩阵 W1 = W2 = W3 = W4 = "he" # 生成噪声数据 noise = tf.random.normal([n_samples, 2]) # 如果参数不为空,则使用参数中的值 if params is not None: # 转换噪声数据为张量 noise = tf.convert_to_tensor(params["noise"], dtype="float32") # 获取生成器的权重矩阵 W1 = params["generator"]["FC1"]["W"] W2 = params["generator"]["FC2"]["W"] W3 = params["generator"]["FC3"]["W"] W4 = params["generator"]["FC4"]["W"] # 获取隐藏层维度和输入特征数 DIM = params["g_hidden"] n_feats = params["n_in"] # 初始化输出字典和权重字典 outs = {
} weights = {
} # 第一层全连接层 output, W = ReLULayer("Generator.1", n_feats, DIM, noise, w_initialization=W1) outs["FC1"] = output weights["FC1"] = W # 第二层全连接层 output, W = ReLULayer("Generator.2", DIM, DIM, output, w_initialization=W2) outs["FC2"] = output weights["FC2"] = W # 第三层全连接层 output, W = ReLULayer("Generator.3", DIM, DIM, output, w_initialization=W3) outs["FC3"] = output weights["FC3"] = W # 第四层全连接层 output, W = LinearLayer("Generator.4", DIM, n_feats, output, w_initialization=W4) outs["FC4"] = output weights["FC4"] = W # 返回输出、输出字典和权重字典 return output, outs, weights # 判别器函数,用于判别数据真伪 def Discriminator(inputs, params=None): # 设置特征数为2 n_feats = 2 # 初始化权重矩阵 W1 = W2 = W3 = W4 = "he" # 如果参数不为空,则使用参数中的值 if params is not None: # 获取判别器的权重矩阵 W1 = params["critic"]["FC1"]["W"] W2 = params["critic"]["FC2"]["W"] W3 = params["critic"]["FC3"]["W"] W4 = params["critic"]["FC4"]["W"] # 获取隐藏层维度和输入特征数 DIM = params["g_hidden"] n_feats = params["n_in"] # 初始化输出字典和权重字典 outs = {
} weights = {
} # 第一层全连接层 output, W = ReLULayer("Discriminator.1", n_feats, DIM, inputs, w_initialization=W1) outs["FC1"] = output weights["FC1"] = W # 第二层全连接层 output, W = ReLULayer("Discriminator.2", DIM, DIM, output, w_initialization=W2) outs["FC2"] = output weights["FC2"] = W # 第三层全连接层 output, W = ReLULayer("Discriminator.3", DIM, DIM, output, w_initialization=W3) outs["FC3"] = output weights["FC3"] = W # 第四层全连接层 output, W = LinearLayer("Discriminator.4", DIM, 1, output, w_initialization=W4) outs["FC4"] = output weights["FC4"] = W # 获取偏置项 # 遍历参数列表中包含名称为"Discriminator"的参数 for var in params_with_name("Discriminator"): # 如果参数名称中包含"1.b:",将该参数存入权重字典中的"FC1_b"键 if "1.b:" in var.name: weights["FC1_b"] = var # 如果参数名称中包含"2.b:",将该参数存入权重字典中的"FC2_b"键 elif "2.b:" in var.name: weights["FC2_b"] = var # 如果参数名称中包含"3.b:",将该参数存入权重字典中的"FC3_b"键 elif "3.b:" in var.name: weights["FC3_b"] = var # 如果参数名称中包含"4.b:",将该参数存入权重字典中的"FC4_b"键 elif "4.b:" in var.name: weights["FC4_b"] = var # 将输出结果进行重塑,将其形状变为一维数组 return tf.reshape(output, [-1]), outs, weights # 定义 WGAN-GP 模型的 TensorFlow 函数 def WGAN_GP_tf(X, lambda_, params, batch_size): # 禁用即时执行模式 tf.compat.v1.disable_eager_execution() # 获取输入数据的批量大小 batch_size = X.shape[0] # 获取超参数 n_steps = params["n_steps"] c_updates_per_epoch = params["c_updates_per_epoch"] alpha = tf.convert_to_tensor(params["alpha"], dtype="float32") # 定义真实数据的占位符 X_real = tf.compat.v1.placeholder(tf.float32, shape=[None, params["n_in"]]) # 生成器生成假数据,获取生成器输出和权重 X_fake, G_out_X_fake, G_weights = Generator(batch_size, X_real, params) # 判别器对真实数据进行判别,获取判别器输出和权重 Y_real, C_out_Y_real, C_Y_real_weights = Discriminator(X_real, params) # 判别器对假数据进行判别,获取判别器输出和权重 Y_fake, C_out_Y_fake, C_Y_fake_weights = Discriminator(X_fake, params) # 计算 WGAN 损失 mean_fake = tf.reduce_mean(Y_fake) mean_real = tf.reduce_mean(Y_real) C_loss = tf.reduce_mean(Y_fake) - tf.reduce_mean(Y_real) G_loss = -tf.reduce_mean(Y_fake) # 计算 WGAN 梯度惩罚 X_interp = alpha * X_real + ((1 - alpha) * X_fake) Y_interp, C_out_Y_interp, C_Y_interp_weights = Discriminator(X_interp, params) gradInterp = tf.gradients(Y_interp, [X_interp])[0] norm_gradInterp = tf.sqrt( tf.compat.v1.reduce_sum(tf.square(gradInterp), reduction_indices=[1]) ) gradient_penalty = tf.reduce_mean((norm_gradInterp - 1) 2) C_loss += lambda_ * gradient_penalty # 提取判别器对插值数据的梯度 C_bwd_Y_interp = {
} for k, v in C_out_Y_interp.items(): C_bwd_Y_interp[k] = tf.gradients(Y_interp, [v])[0] # 提取判别器权重的梯度 C_bwd_W = {
} for k, v in C_Y_interp_weights.items(): C_bwd_W[k] = tf.gradients(C_loss, [v])[0] # 获取梯度 dC_Y_fake = tf.gradients(C_loss, [Y_fake])[0] dC_Y_real = tf.gradients(C_loss, [Y_real])[0] dC_gradInterp = tf.gradients(C_loss, [gradInterp])[0] dG_Y_fake = tf.gradients(G_loss, [Y_fake])[0] # 返回梯度 return grads # 定义 TensorFlow 的负采样交叉熵损失函数 def TFNCELoss(X, target_word, L): from tensorflow.python.ops.nn_impl import _compute_sampled_logits from tensorflow.python.ops.nn_impl import sigmoid_cross_entropy_with_logits # 禁用 TensorFlow 2.x 中的即时执行模式 tf.compat.v1.disable_eager_execution() # 创建占位符,用于接收输入数据 in_embed = tf.compat.v1.placeholder(tf.float32, shape=X.shape) in_bias = tf.compat.v1.placeholder(tf.float32, shape=L.parameters["b"].flatten().shape) in_weights = tf.compat.v1.placeholder(tf.float32, shape=L.parameters["W"].shape) in_target_word = tf.compat.v1.placeholder(tf.int64) in_neg_samples = tf.compat.v1.placeholder(tf.int32) in_target_prob = tf.compat.v1.placeholder(tf.float32) in_neg_samp_prob = tf.compat.v1.placeholder(tf.float32) # 创建 feed 字典,将输入数据传入对应的占位符 feed = {
in_embed: X, in_weights: L.parameters["W"], in_target_word: target_word, in_bias: L.parameters["b"].flatten(), in_neg_samples: L.derived_variables["noise_samples"][0], in_target_prob: L.derived_variables["noise_samples"][1], in_neg_samp_prob: L.derived_variables["noise_samples"][2], } # 使用负采样计算 NCE 损失 nce_unreduced = tf.nn.nce_loss( weights=in_weights, biases=in_bias, labels=in_target_word, inputs=in_embed, sampled_values=(in_neg_samples, in_target_prob, in_neg_samp_prob), num_sampled=L.num_negative_samples, num_classes=L.n_classes, ) # 计算总损失 loss = tf.reduce_sum(nce_unreduced) # 计算损失对权重的梯度 dLdW = tf.gradients(loss, [in_weights])[0] # 计算损失对偏置的梯度 dLdb = tf.gradients(loss, [in_bias])[0] # 计算损失对输入数据的梯度 dLdX = tf.gradients(loss, [in_embed])[0] # 计算采样后的logits和labels sampled_logits, sampled_labels = _compute_sampled_logits( weights=in_weights, # 输入权重 biases=in_bias, # 输入偏置 labels=in_target_word, # 目标词标签 inputs=in_embed, # 输入嵌入 sampled_values=(in_neg_samples, in_target_prob, in_neg_samp_prob), # 采样值 num_sampled=L.num_negative_samples, # 负采样数量 num_classes=L.n_classes, # 类别数量 num_true=1, # 真实样本数量 subtract_log_q=True, # 是否减去log(q) ) # 计算采样后的损失 sampled_losses = sigmoid_cross_entropy_with_logits( labels=sampled_labels, # 采样标签 logits=sampled_logits # 采样logits ) # 创建一个会话 with tf.compat.v1.Session() as session: # 初始化全局变量 session.run(tf.compat.v1.global_variables_initializer()) # 运行会话,获取损失和相关变量 ( _final_loss, _nce_unreduced, _dLdW, _dLdb, _dLdX, _sampled_logits, _sampled_labels, _sampled_losses, ) = session.run( [ loss, nce_unreduced, dLdW, dLdb, dLdX, sampled_logits, sampled_labels, sampled_losses, ], feed_dict=feed, # 喂入数据 ) # 重置默认图 tf.compat.v1.reset_default_graph() # 返回结果字典 return {
"final_loss": _final_loss, # 最终损失 "nce_unreduced": _nce_unreduced, # 未减少的nce "dLdW": _dLdW, # dL/dW "dLdb": _dLdb, # dL/db "dLdX": _dLdX, # dL/dX "out_logits": _sampled_logits, # 输出logits "out_labels": _sampled_labels, # 输出标签 "sampled_loss": _sampled_losses, # 采样损失 }
今天的文章
NumPyML 源码解析(五)分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/100327.html