All Articles

AntiVirus を支える情報検索アルゴリズム 1 - Aho–Corasick アルゴリズム

前回の記事 では、clamscan で Eicar ファイルをスキャンする際に cli_scan_fmap 関数が呼び出されるまでの処理を簡単に追っていきました。

perf_start(ctx, PERFT_RAW);
ret = cli_scan_fmap(ctx,
                    type == CL_TYPE_TEXT_ASCII ? CL_TYPE_ANY : type,
                    false,
                    &ftoffset,
                    acmode,
                    NULL);
perf_stop(ctx, PERFT_RAW);

今回は、ClamAV におけるファイルスキャンのコア関数である cli_scan_fmap 関数の実装から、AntiVirus のシグネチャマッチングに使用される Aho–Corasick アルゴリズムの概要についてまとめることとします。

もくじ

(前提) ClamAV におけるパターンマッチングの処理

ClamAV におけるファイルスキャンのコア関数である cli_scan_fmap 関数は、ざっくり言うと前回の記事で紹介した cl_fmap 構造体として抽象化されたメモリマップのスキャンを行う関数です。

この中では、ハッシュベースのチェックや、ロードされているシグネチャデータベースとのパターンマッチングなどの処理が行われます。

cli_scan_fmap 関数は、スキャンのための初期化を行った後にファイルマップから最大 SCANBUFF(0x20000 バイト) 分のバッファを 1 チャンクとして読み込み、 matcher_run 関数によるスキャンを行うようです。

image-20251214111115528

この matcher_run 関数は以下のパラメーターと共に呼び出されます。

static inline cl_error_t matcher_run(
    const struct cli_matcher *root,
    const unsigned char *buffer, uint32_t length,
    const char **virname, struct cli_ac_data *mdata,
    uint32_t offset,
    const struct cli_target_info *tinfo,
    cli_file_t ftype,
    struct cli_matched_type **ftoffset,
    unsigned int acmode,
    unsigned int pcremode,
    struct cli_ac_result **acres,
    fmap_t *map,
    struct cli_bm_off *offdata,
    struct cli_pcre_off *poffdata,
    cli_ctx *ctx
)

matcher_run 関数は、受け取ったバッファのスキャンを行う関数で、初めに cli_bm_scanbuff 関数にて Boyer-Moore 法によるスキャンを実施した後に、cli_ac_scanbuff 関数で Aho-Corasick アルゴリズムによるパターンマッチングを行うようです。

Aho-Corasick アルゴリズムとは

Aho-Corasick アルゴリズムとは、もともとは出版物の中から特定のキーワードを効率的に検索するために考案された文字列検索アルゴリズムであり、「共通接頭辞検索(Common Prefix Search)」などと呼ばれる「特定のテキストの任意の位置から部分一致するキーワードをすべて取り出す処理」を行う方法の 1 つです。

参考:Aho–Corasick algorithm - Wikipedia

参考:Aho Corasick 法 - naoyaのはてなダイアリー

参考:高速な文字列探索:Daachorseの技術解説 - LegalOn Technologies Engineering Blog

Aho-Corasick アルゴリズムでは、事前に検索したいキーワードのパターンからオートマトン(コンピュータの状態、遷移をモデル化したもの)を作成し、与えられたバッファに対してパターンマッチングを行います。

ClamAV のような AntiVirus のパターンマッチングの場合には、オートマトンは事前にロードされているシグネチャデータベースから予め作成しておくことができるため、パターンマッチングに必要な計算量は入力されたバッファ(とマッチしたエントリ)に対して線形になります。

このように Aho-Corasick アルゴリズムは、シグネチャデータベースで定義された多数のシグネチャが特定のデータストリーム内に含まれるか否かのチェックを効率的に行うことができるため、ClamAV のような AntiVirus のパターンマッチング処理の実装に適しているようです。

image-20251214132434653

参考:The Aho-Corasick Paradigm in Modern Antivirus Engines: A Cornerstone of Signature-Based Malware Detection

Aho-Corasick アルゴリズムの実装

Trie 木の作成

Aho-Corasick アルゴリズムの実装のため、まずは固定されたキーワードの集合から定義した Trie 木をベースにオートマトンを作成します。

Trie 木とは文字列の集合を表す木構造です。

Trie 木は以下の図ように空の文字列を Root とし、複数の文字列を先頭(prefix)が同じもの同士でまとめて管理するデータ構造です。

image-20251214151725777

上記の例では、「“A”, “to”, “tea”, “ted”, “ten”, “i”, “in”, “inn”」というキーワードの集合を Trie 木で表現しており、最初のノードにはユニークな prefix を抜き出した「“t”, “A”, “i”」が入っています。

その後は、キーワードの集合内の共通する prefix を持つ項について 1 文字ずつノードを割り当てしていくことで Trie 木が完成します。

参考:すごいTrie #アルゴリズム - Qiita

参考:トライ (データ構造) - Wikipedia

Trie 木を実装する

続いて、Trie 木を Python で実装していきます。

まずは Trie 木のノードとして、TrieNode クラスを定義します。

ノードには、以下の 2 つの情報を持たせるようにします。

  1. 次のノード(TrieNode)を示すリスト(今回は辞書として定義)
  2. このノードが末端か否かを示す値
@dataclass
class TrieNode:
    children: Dict[str, "TrieNode"] = field(default_factory=dict)
    is_end: bool = False

なお、field(default_factory=dict) は、@dataclass を使用して定義したクラスのメンバとして、インスタンスを生成する度に dict で新規に生成したオブジェクトを使用するように指示しています。(詳しくは下記参照)

参考:dataclass で万物に型を付けよう|しゅんけー

参考:【Python】dataclassで初心者が必ずハマる default_factory の罠を徹底解説 #Python - Qiita

これで Trie 木のノードを定義できたので、次はノードの追加や検索を行う Trie クラスを実装します。

前述の通り、Trie 木の Root は空とするので、クラスのインスタンス化時に空の TrieNode を self.root に割り当てします。

class Trie:
    def __init__(self) -> None:
        self.root = TrieNode()

Trie 木にキーワードを登録するための insert メソッドでは、文字列 keyword を 1 文字ずつ取り出していき、Root から順に Trie 木を探索させます。

この時、もしまだ対象の文字が子ノードに存在していない場合は、新しく生成した TrieNode インスタンスを子ノードとして辞書に登録します。

次に、特定のキーワードが Trie 木に登録されているかを探索する is_contains メソッドを以下の通り実装します。

def is_contains(self, keyword: str) -> bool:
    node = self._find(keyword)
    return node.is_end if node else False

def _find(self, keyword: str) -> Optional[TrieNode]: # TrieNode | None
    node = self.root
    for ch in keyword:
        node = node.children.get(ch)
        if node is None:
            return None
    return node

以下が、最終的に作成したコードです。

from dataclasses import dataclass, field
from typing import Dict, Optional


@dataclass
class TrieNode:
    children: Dict[str, "TrieNode"] = field(default_factory=dict)
    is_end: bool = False


class Trie:
    def __init__(self) -> None:
        self.root = TrieNode()

    def insert(self, keyword: str) -> None:
        node = self.root
        for ch in keyword:
            if ch not in node.children:
                node.children[ch] = TrieNode()
            node = node.children[ch]
        node.is_end = True

    def is_contains(self, keyword: str) -> bool:
        node = self._find(keyword)
        return node.is_end if node else False

    def _find(self, keyword: str) -> Optional[TrieNode]: # TrieNode | None
        node = self.root
        for ch in keyword:
            node = node.children.get(ch)
            if node is None:
                return None
        return node


if __name__ == "__main__":
    trie = Trie()
    for w in ["he", "she", "his", "hers", "I", "my", "ME", "mine"]:
        trie.insert(w)

    assert(trie.is_contains("her") == False)
    assert(trie.is_contains("hero") == False)
    assert(trie.is_contains("she") == True)
    assert(trie.is_contains("I") == True)
    assert(trie.is_contains("i") == False)
    assert(trie.is_contains("ME") == True)
    assert(trie.is_contains("Me") == False)

Aho-Corasick のオートマトンについて

上記の Trie 木の実装により、「特定のキーワードが事前定義された集合の中に存在するか」を特定できるようになりましたが、単純な Trie 木では AntiVirus のパターンマッチングに適した「バイトストリーム内の部分一致」の検出には利用できません。

そこで、Trie 木の構造をベースに Aho-Corasick 法による検索を行うための有限オートマトンを作成します。

具体的には、木構造として実装されている Trie に対して Failure Links と Output を追加します。

Failure Links とは、キーワード文字の検索が途中で失敗した場合に、検索を再実行するのではなく次にマッチングする可能性のある状態に遷移するためのリンクです。

これにより、例えば「…ABCD」とマッチングしてきたパターンが次の文字で不一致した場合に、再建策を行うのではなく「…BCD」という接尾辞に対してマッチングがあることを利用して、Trie 構造の B -> C -> D のノードに遷移することで検索を効率的に継続できます。

参考:AHO CORASICK ALGORITHM. The Aho-Corasick algorithm is a… | by Viraj Tammana | Medium

参考:Aho-Corasick Algorithm: Key Concepts, Code & Real Examples

この仕組みにより、入力として受け取ったバイトストリームに対して検索位置の巻き戻しが発生しなくなるため、入力に対して線形の計算量でパターンマッチングを行うことができます。

また、Output とは、その状態に到達したときにマッチするキーワードを含むものです。

image-20251214133135610

参考:Aho–Corasick algorithm - Wikipedia

Aho-Corasick を実装する

先ほどの Trie 木の実装をベースにオートマトンを作成し、Aho-Corasick による文字列検索アルゴリズムを実装してみることにします。

まずは、先ほど同様 Failure Links を加えた Trie のノードを ACNode クラスとして定義します。

文字列配列である outputs は、そのノードまでにマッチしたキーワードを保持するために使用します。

@dataclass
class ACNode:
    children: Dict[str, "ACNode"] = field(default_factory=dict)
    outputs: List[str] = field(default_factory=list) # mached keywords list
    failure_link: Optional["ACNode"] = None # ACNode | None

次に、オートマトンの作成や Aho-Corasick 法による検索を行うための AhoCorasick クラスを定義します。

初期化時には先ほど同様 Root に空のノードを設定します。(Root の failure_link は Root 自身に設定しておきます)

class AhoCorasick:

    def __init__(self) -> None:
        self.root = ACNode()
        self.root.failure_link = self.root

次に、事前定義されたキーワードから Trie 木を作成する insert メソッドを以下の通り定義します。

実装としては先ほどの insert メソッドと全く同じです。

def insert(self, keyword: str) -> None:
        node = self.root
        for ch in keyword:
            if ch not in node.children:
                node.children[ch] = ACNode()
            node = node.children[ch]
        node.outputs.append(keyword)

Trie 木を作成したら、ここから Aho-Corasick の有限オートマトンを作成するメソッド build を実装します。

Aho-Corasick の有限オートマトンを作成するために、Trie 木を BFS で探索しつつ、各ノードに対して必要な情報を追加していきます。

まずは、collections からインポートした ACNode クラスのインスタンスを登録する両端キュー q を定義し、Root 直下のノードを q に追加しつつ failure_link をすべて Root に紐づけます。

q: deque[ACNode] = deque()

for child in self.root.children.values():
    child.failure_link = self.root
    q.append(child)

続けて、キューから popleft でノードを取り出していき、子ノードを指す辞書のキーと ACNode インスタンスを分析します。

この中では、子ノードの中に prefix と一致するものが見つかるまで親のノードの failure_link を順に辿っていき、最終的に見つかったノード(または Root)をその子ノードの failure_link に追加しています。

while q:
    cursor = q.popleft()

    for ch, next_item in cursor.children.items():
        q.append(next_item)

        f = cursor.failure_link
        while f is not None and f is not self.root and ch not in f.children:
            f = f.failure_link

        if f is not None and ch in f.children:
            next_item.failure_link = f.children[ch]
        else:
            next_item.failure_link = self.root

        if next_item.failure_link and next_item.failure_link.outputs:
            next_item.outputs.extend(next_item.failure_link.outputs)

そして、最後に List[str] である outputs に failure_link の outputs を連結するために、extend メソッドを使用しています。

ノードの outputs に failure_link の outputs を連結しているのは、そのノードを終端とした場合に failure 先の outputs とマッチしている場合もあるためです。

参考:【Python】appendとextendでよく間違えるところ #Python3 - Qiita

これで有限オートマトンが完成したので、最後に受け取ったテキストから事前定義されたキーワードを O(N) で検索する search メソッドを実装します。

def search(self, text: str) -> None:
    res: List[Tuple[int, int, str]] = []
    node = self.root

    for i, ch in enumerate(text):
        while node is not self.root and ch not in node.children:
            node = node.failure_link if node.failure_link is not None else self.root

        node = node.children.get(ch, self.root)

        if len(node.outputs) > 0:
            print(i, node.outputs)

上記を実行すると、マッチした終端文字の位置でどのキーワードとマッチしたのかを表示してくれます。

(“she” と “he” など、同じ位置で 2 つのキーワードとマッチする場合もあります。)

最終的に実装したコードは以下のようになりました。

from __future__ import annotations

from dataclasses import dataclass, field
from collections import deque
from typing import Dict, List, Optional, Tuple


@dataclass
class ACNode:
    children: Dict[str, "ACNode"] = field(default_factory=dict)
    outputs: List[str] = field(default_factory=list) # mached keywords list
    failure_link: Optional["ACNode"] = None # ACNode | None


class AhoCorasick:

    def __init__(self) -> None:
        self.root = ACNode()
        self.root.failure_link = self.root

    def insert(self, keyword: str) -> None:
        node = self.root
        for ch in keyword:
            if ch not in node.children:
                node.children[ch] = ACNode()
            node = node.children[ch]
        node.outputs.append(keyword)


    def build(self) -> None:
        q: deque[ACNode] = deque()

        for child in self.root.children.values():
            child.failure_link = self.root
            q.append(child)

        while q:
            cursor = q.popleft()

            for ch, next_item in cursor.children.items():
                q.append(next_item)

                f = cursor.failure_link
                while f is not None and f is not self.root and ch not in f.children:
                    f = f.failure_link

                if f is not None and ch in f.children:
                    next_item.failure_link = f.children[ch]
                else:
                    next_item.failure_link = self.root

                if next_item.failure_link and next_item.failure_link.outputs:
                    next_item.outputs.extend(next_item.failure_link.outputs)


    def search(self, text: str) -> None:
        res: List[Tuple[int, int, str]] = []
        node = self.root

        for i, ch in enumerate(text):
            while node is not self.root and ch not in node.children:
                node = node.failure_link if node.failure_link is not None else self.root

            node = node.children.get(ch, self.root)

            if len(node.outputs) > 0:
                print(i, node.outputs)


if __name__ == "__main__":
    ac = AhoCorasick()
    for w in ["he", "she", "his", "hers", "I", "my", "ME", "mine"]:
        ac.insert(w)
    ac.build()

    text = "asdfahishersIadfsamaMEandOhers-mefsadfasmines"
    ac.search(text)

これを実行すると以下のような結果を得ることができます。

image-20251216221634682

ClamAV における Aho-Corasick の実装

Aho–Corasick アルゴリズムの概要について把握したところで、ClamAV のスキャンにおいて Aho–Corasick アルゴリズムを使用したスキャンがどのように行われているのかを簡単に見ていくことにします。

まず、ClamAV における AC を使用したスキャンは cli_ac_scanbuff 関数で行われています。

cl_error_t cli_ac_scanbuff(
    const unsigned char *buffer,
    uint32_t length,
    const char **virname,
    void **customdata,
    struct cli_ac_result **res,
    const struct cli_matcher *root,
    struct cli_ac_data *mdata,
    uint32_t offset,
    cli_file_t ftype,
    struct cli_matched_type **ftoffset,
    unsigned int mode,
    cli_ctx *ctx)

この関数の呼び出し時に与えられる buffer には、Eicar テキストのようにスキャン対象のデータが割り当てられています。

image-20251217204907446

また、AC 法におけるノードは cli_ac_node 構造体として定義されています。

struct cli_ac_patt {
    uint16_t *pattern, *prefix, length[3], prefix_length[3];
    uint32_t mindist, maxdist;
    uint32_t sigid;
    uint32_t lsigid[3];
    uint16_t ch[2];
    char *virname;
    void *customdata;
    uint16_t ch_mindist[2];
    uint16_t ch_maxdist[2];
    uint16_t parts, partno, special, special_pattern;
    struct cli_ac_special **special_table;
    uint16_t rtype, type;
    uint32_t offdata[4], offset_min, offset_max;
    uint32_t boundary;
    uint8_t depth;
    uint8_t sigopts;
};

struct cli_ac_list {
    struct cli_ac_patt *me;
    union {
        struct cli_ac_node *node;
        struct cli_ac_list *next;
    };
    struct cli_ac_list *next_same;
};

struct cli_ac_node {
    struct cli_ac_list *list;
    struct cli_ac_node **trans, *fail;
};

この中では、root->ac_root で取り出した Root ノードの trans(次のノード) から、buffer[i] と対応する遷移先を順に取り出していき、そのノードが終端である場合にはパターンマッチング処理を開始するような実装になっています。

current = root->ac_root;

for (i = 0; i < length; i++) {
    current = current->trans[buffer[i]];

    if (UNLIKELY(IS_FINAL(current))) {
        struct cli_ac_list *faillist = current->fail->list;
        pattN                        = current->list;
        /* 中略 */
    }
}

この中では様々なチェックが行われていますが、特に着目する点は ac_findmatch 関数からさらに呼び出される ac_forward_match_branch 関数です。

/* state should reset on call, recursion depth = number of alternate specials */
/* each loop iteration starts on the NEXT sequence to validate */
static int ac_forward_match_branch(const unsigned char *buffer, uint32_t bp, uint32_t offset, uint32_t fileoffset, uint32_t length,
                                   const struct cli_ac_patt *pattern, uint32_t pp, uint16_t specialcnt, uint32_t *start, uint32_t *end)
{
    int match;
    uint16_t wc, i;

    match = 1;

    /* forward (pattern) validation; determines end */
    for (i = pp; i < pattern->length[0] && bp < length; i++) {
        AC_MATCH_CHAR(pattern->pattern[i], buffer[bp], 0);
        if (!match)
            return 0;

        bp++;
    }
    *end = bp;

上記のコードでは、ようやくスキャン対象の buffer と、対応するシグネチャパターンの照合を行います。

実際に Eicar スキャン時のデバッグをしてみると、このタイミングで pattern->pattern のシグネチャパターンとスキャンデータである buffer が 1 バイトずつ比較されていることを確認できます。

image-20251217210820193

まとめ

ClamAV のスキャンロジックを追っていたら情報検索アルゴリズムの実装にたどりついてしまいました。

AV がどうやって大量のデータを高速でスキャンしているのか理解が深まったのでよかったです。