All Articles

Information Retrieval Algorithms Behind AntiVirus, Part 1 - The Aho–Corasick Algorithm

This page has been machine-translated from the original page.

In the previous article, I briefly followed the processing flow up to the point where the cli_scan_fmap function is called when scanning an Eicar file with clamscan.

perf_start(ctx, PERFT_RAW);
ret = cli_scan_fmap(ctx,
                    type == CL_TYPE_TEXT_ASCII ? CL_TYPE_ANY : type,
                    false,
                    &ftoffset,
                    acmode,
                    NULL);
perf_stop(ctx, PERFT_RAW);

This time, starting from the implementation of cli_scan_fmap, the core function for file scanning in ClamAV, I will summarize the outline of the Aho–Corasick algorithm used for signature matching in AntiVirus software.

Table of Contents

Background: Pattern Matching in ClamAV

The cli_scan_fmap function, the core function for file scanning in ClamAV, roughly speaking scans a memory map abstracted as the cl_fmap structure introduced in the previous article.

Inside it, processing such as hash-based checks and pattern matching against the loaded signature database is performed.

After performing initialization for scanning, cli_scan_fmap appears to read up to SCANBUFF (0x20000 bytes) from the file map as a single chunk and then scan it with the matcher_run function.

image-20251214111115528

This matcher_run function is called with the following parameters.

static inline cl_error_t matcher_run(
    const struct cli_matcher *root,
    const unsigned char *buffer, uint32_t length,
    const char **virname, struct cli_ac_data *mdata,
    uint32_t offset,
    const struct cli_target_info *tinfo,
    cli_file_t ftype,
    struct cli_matched_type **ftoffset,
    unsigned int acmode,
    unsigned int pcremode,
    struct cli_ac_result **acres,
    fmap_t *map,
    struct cli_bm_off *offdata,
    struct cli_pcre_off *poffdata,
    cli_ctx *ctx
)

The matcher_run function scans the received buffer. It first performs a Boyer-Moore-based scan with cli_bm_scanbuff (its actual behavior is closer to the Wu method), and then performs pattern matching with the Aho-Corasick algorithm in cli_ac_scanbuff.

What Is the Aho-Corasick Algorithm?

The Aho-Corasick algorithm is a string-search algorithm originally devised to efficiently search for specific keywords in documents. It is one way to perform what is sometimes called a “Common Prefix Search”: extracting every keyword that partially matches from any position in a given text.

Reference: Aho–Corasick algorithm - Wikipedia

Reference: Aho Corasick 法 - naoyaのはてなダイアリー

Reference: 高速な文字列探索:Daachorseの技術解説 - LegalOn Technologies Engineering Blog

In the Aho-Corasick algorithm, an automaton (a model of computer states and transitions) is built in advance from the keyword patterns you want to search for, and pattern matching is then performed against a given buffer.

For AntiVirus pattern matching such as in ClamAV, the automaton can be created beforehand from the preloaded signature database, so the computational cost required for pattern matching is linear with respect to the input buffer (and the matched entries).

In this way, the Aho-Corasick algorithm can efficiently check whether many signatures defined in a signature database are contained in a specific data stream, making it well suited to implementing pattern-matching processing in AntiVirus software such as ClamAV.

image-20251214132434653

Reference: The Aho-Corasick Paradigm in Modern Antivirus Engines: A Cornerstone of Signature-Based Malware Detection

Implementing the Aho-Corasick Algorithm

Building a Trie

To implement the Aho-Corasick algorithm, we first create an automaton based on a Trie built from a fixed set of keywords.

A Trie is a tree structure that represents a set of strings.

As shown below, a Trie uses the empty string as the Root and manages multiple strings together by grouping those that share the same prefix.

image-20251214151725777

In the example above, the set of keywords "A", "to", "tea", "ted", "ten", "i", "in", "inn" is represented as a Trie, and the first node contains the unique prefixes "t", "A", "i".

After that, the Trie is completed by assigning nodes one character at a time for items in the keyword set that share common prefixes.

Reference: すごいTrie #アルゴリズム - Qiita

Reference: トライ (データ構造) - Wikipedia

Implementing a Trie

Next, we will implement a Trie in Python.

First, we define the TrieNode class as a node in the Trie.

Each node will hold the following two pieces of information.

  1. A list indicating the next nodes (TrieNode) (defined as a dictionary in this example)
  2. A value indicating whether this node is terminal
@dataclass
class TrieNode:
    children: Dict[str, "TrieNode"] = field(default_factory=dict)
    is_end: bool = False

Here, field(default_factory=dict) instructs the class defined with @dataclass to use a newly created object produced by dict each time an instance is generated. (See the references below for details.)

Reference: dataclass で万物に型を付けよう|しゅんけー

Reference: 【Python】dataclassで初心者が必ずハマる default_factory の罠を徹底解説 #Python - Qiita

Now that we have defined Trie nodes, next we implement the Trie class, which adds nodes and searches them.

As described above, the Root of the Trie is empty, so when instantiating the class we assign an empty TrieNode to self.root.

class Trie:
    def __init__(self) -> None:
        self.root = TrieNode()

In the insert method for registering keywords in the Trie, the string keyword is taken one character at a time and the Trie is traversed from the Root in order.

At this time, if the target character does not yet exist in the child nodes, a newly created TrieNode instance is registered in the dictionary as a child node.

Next, we implement the is_contains method, which searches whether a specific keyword is registered in the Trie, as follows.

def is_contains(self, keyword: str) -> bool:
    node = self._find(keyword)
    return node.is_end if node else False

def _find(self, keyword: str) -> Optional[TrieNode]: # TrieNode | None
    node = self.root
    for ch in keyword:
        node = node.children.get(ch)
        if node is None:
            return None
    return node

The final code is shown below.

from dataclasses import dataclass, field
from typing import Dict, Optional


@dataclass
class TrieNode:
    children: Dict[str, "TrieNode"] = field(default_factory=dict)
    is_end: bool = False


class Trie:
    def __init__(self) -> None:
        self.root = TrieNode()

    def insert(self, keyword: str) -> None:
        node = self.root
        for ch in keyword:
            if ch not in node.children:
                node.children[ch] = TrieNode()
            node = node.children[ch]
        node.is_end = True

    def is_contains(self, keyword: str) -> bool:
        node = self._find(keyword)
        return node.is_end if node else False

    def _find(self, keyword: str) -> Optional[TrieNode]: # TrieNode | None
        node = self.root
        for ch in keyword:
            node = node.children.get(ch)
            if node is None:
                return None
        return node


if __name__ == "__main__":
    trie = Trie()
    for w in ["he", "she", "his", "hers", "I", "my", "ME", "mine"]:
        trie.insert(w)

    assert(trie.is_contains("her") == False)
    assert(trie.is_contains("hero") == False)
    assert(trie.is_contains("she") == True)
    assert(trie.is_contains("I") == True)
    assert(trie.is_contains("i") == False)
    assert(trie.is_contains("ME") == True)
    assert(trie.is_contains("Me") == False)

About the Aho-Corasick Automaton

With the Trie implementation above, we can now determine whether “a specific keyword exists in a predefined set,” but a simple Trie cannot be used to detect the “partial matches within a byte stream” that AntiVirus pattern matching requires.

So, based on the Trie structure, we create a finite automaton for searching with the Aho-Corasick method.

Specifically, we add Failure Links and Output to the Trie implemented as a tree structure.

Failure Links are links that let the search transition to the next state that may match, instead of restarting the search, when matching a keyword character fails partway through.

As a result, for example, when a pattern that has matched ...ABCD mismatches on the next character, instead of starting over it can use the fact that the suffix ...BCD may still match, transition to the B -> C -> D nodes in the Trie structure, and continue searching efficiently.

Reference: AHO CORASICK ALGORITHM. The Aho-Corasick algorithm is a… | by Viraj Tammana | Medium

Reference: Aho-Corasick Algorithm: Key Concepts, Code & Real Examples

This mechanism prevents the search position from ever rewinding within the input byte stream, so pattern matching can be performed in linear time with respect to the input.

The Output values, meanwhile, contain the keywords that match when that state is reached.

image-20251214133135610

Reference: Aho–Corasick algorithm - Wikipedia

Implementing Aho-Corasick

Based on the Trie implementation above, I will build an automaton and try implementing the Aho-Corasick string-search algorithm.

First, as before, I define the Trie node with Failure Links added as the ACNode class.

The string array outputs is used to store the keywords matched up to that node.

@dataclass
class ACNode:
    children: Dict[str, "ACNode"] = field(default_factory=dict)
    outputs: List[str] = field(default_factory=list) # mached keywords list
    failure_link: Optional["ACNode"] = None # ACNode | None

Next, I define the AhoCorasick class, which creates the automaton and performs searches with the Aho-Corasick method.

At initialization time, it sets an empty node at the Root just as before. (The Root’s failure_link is set to the Root itself.)

class AhoCorasick:

    def __init__(self) -> None:
        self.root = ACNode()
        self.root.failure_link = self.root

Next, we define the insert method that creates a Trie from predefined keywords as shown below.

The implementation is exactly the same as the insert method earlier.

def insert(self, keyword: str) -> None:
        node = self.root
        for ch in keyword:
            if ch not in node.children:
                node.children[ch] = ACNode()
            node = node.children[ch]
        node.outputs.append(keyword)

Once the Trie has been created, we next implement the build method, which constructs the Aho-Corasick finite automaton from it.

To create the Aho-Corasick finite automaton, we traverse the Trie with BFS and add the required information to each node.

First, define q, a deque that stores instances of the ACNode class imported from collections, and while adding the nodes directly under the Root to q, link all of their failure_link values to the Root.

q: deque[ACNode] = deque()

for child in self.root.children.values():
    child.failure_link = self.root
    q.append(child)

Next, we take nodes out of the queue with popleft and examine the dictionary keys pointing to child nodes together with the ACNode instances.

Here, the code follows the parent node’s failure_link in order until it finds a child node matching the prefix, and then assigns the ultimately found node (or the Root) to that child node’s failure_link.

while q:
    cursor = q.popleft()

    for ch, next_item in cursor.children.items():
        q.append(next_item)

        f = cursor.failure_link
        while f is not None and f is not self.root and ch not in f.children:
            f = f.failure_link

        if f is not None and ch in f.children:
            next_item.failure_link = f.children[ch]
        else:
            next_item.failure_link = self.root

        if next_item.failure_link and next_item.failure_link.outputs:
            next_item.outputs.extend(next_item.failure_link.outputs)

Finally, the extend method is used to concatenate the failure_link outputs into outputs, which is a List[str].

The reason the node’s outputs is extended with the failure_link outputs is that, when that node is treated as a terminal node, there can also be matches with the outputs at the failure destination.

Reference: 【Python】appendとextendでよく間違えるところ #Python3 - Qiita

Now that the finite automaton is complete, we finally implement the search method, which searches the received text for predefined keywords in O(N).

def search(self, text: str) -> None:
    res: List[Tuple[int, int, str]] = []
    node = self.root

    for i, ch in enumerate(text):
        while node is not self.root and ch not in node.children:
            node = node.failure_link if node.failure_link is not None else self.root

        node = node.children.get(ch, self.root)

        if len(node.outputs) > 0:
            print(i, node.outputs)

When you run the above, it displays which keywords matched at the position of the matched terminal character.

(There are also cases where two keywords match at the same position, such as "she" and "he".)

The final implementation looks like this.

from __future__ import annotations

from dataclasses import dataclass, field
from collections import deque
from typing import Dict, List, Optional, Tuple


@dataclass
class ACNode:
    children: Dict[str, "ACNode"] = field(default_factory=dict)
    outputs: List[str] = field(default_factory=list) # mached keywords list
    failure_link: Optional["ACNode"] = None # ACNode | None


class AhoCorasick:

    def __init__(self) -> None:
        self.root = ACNode()
        self.root.failure_link = self.root

    def insert(self, keyword: str) -> None:
        node = self.root
        for ch in keyword:
            if ch not in node.children:
                node.children[ch] = ACNode()
            node = node.children[ch]
        node.outputs.append(keyword)


    def build(self) -> None:
        q: deque[ACNode] = deque()

        for child in self.root.children.values():
            child.failure_link = self.root
            q.append(child)

        while q:
            cursor = q.popleft()

            for ch, next_item in cursor.children.items():
                q.append(next_item)

                f = cursor.failure_link
                while f is not None and f is not self.root and ch not in f.children:
                    f = f.failure_link

                if f is not None and ch in f.children:
                    next_item.failure_link = f.children[ch]
                else:
                    next_item.failure_link = self.root

                if next_item.failure_link and next_item.failure_link.outputs:
                    next_item.outputs.extend(next_item.failure_link.outputs)


    def search(self, text: str) -> None:
        res: List[Tuple[int, int, str]] = []
        node = self.root

        for i, ch in enumerate(text):
            while node is not self.root and ch not in node.children:
                node = node.failure_link if node.failure_link is not None else self.root

            node = node.children.get(ch, self.root)

            if len(node.outputs) > 0:
                print(i, node.outputs)


if __name__ == "__main__":
    ac = AhoCorasick()
    for w in ["he", "she", "his", "hers", "I", "my", "ME", "mine"]:
        ac.insert(w)
    ac.build()

    text = "asdfahishersIadfsamaMEandOhers-mefsadfasmines"
    ac.search(text)

When you run this, you can get the following result.

image-20251216221634682

Aho-Corasick in ClamAV

Now that we have an overview of the Aho–Corasick algorithm, let’s briefly look at how scanning using the Aho–Corasick algorithm is performed in ClamAV scans.

First, scanning using AC in ClamAV is performed by the cli_ac_scanbuff function.

cl_error_t cli_ac_scanbuff(
    const unsigned char *buffer,
    uint32_t length,
    const char **virname,
    void **customdata,
    struct cli_ac_result **res,
    const struct cli_matcher *root,
    struct cli_ac_data *mdata,
    uint32_t offset,
    cli_file_t ftype,
    struct cli_matched_type **ftoffset,
    unsigned int mode,
    cli_ctx *ctx)

The buffer provided when this function is called is assigned the data to be scanned, such as the Eicar text.

image-20251217204907446

Also, nodes in the AC method are defined as the cli_ac_node structure.

struct cli_ac_patt {
    uint16_t *pattern, *prefix, length[3], prefix_length[3];
    uint32_t mindist, maxdist;
    uint32_t sigid;
    uint32_t lsigid[3];
    uint16_t ch[2];
    char *virname;
    void *customdata;
    uint16_t ch_mindist[2];
    uint16_t ch_maxdist[2];
    uint16_t parts, partno, special, special_pattern;
    struct cli_ac_special **special_table;
    uint16_t rtype, type;
    uint32_t offdata[4], offset_min, offset_max;
    uint32_t boundary;
    uint8_t depth;
    uint8_t sigopts;
};

struct cli_ac_list {
    struct cli_ac_patt *me;
    union {
        struct cli_ac_node *node;
        struct cli_ac_list *next;
    };
    struct cli_ac_list *next_same;
};

struct cli_ac_node {
    struct cli_ac_list *list;
    struct cli_ac_node **trans, *fail;
};

Within this implementation, it starts from the Root node obtained by root->ac_root, then follows the transition destinations corresponding to buffer[i] one by one from trans (the next node), and when the node is terminal it begins the pattern-matching process.

current = root->ac_root;

for (i = 0; i < length; i++) {
    current = current->trans[buffer[i]];

    if (UNLIKELY(IS_FINAL(current))) {
        struct cli_ac_list *faillist = current->fail->list;
        pattN                        = current->list;
        /* omitted */
    }
}

Various checks are performed here, but the point I particularly want to focus on is the ac_forward_match_branch function, which is called further from the ac_findmatch function.

/* state should reset on call, recursion depth = number of alternate specials */
/* each loop iteration starts on the NEXT sequence to validate */
static int ac_forward_match_branch(const unsigned char *buffer, uint32_t bp, uint32_t offset, uint32_t fileoffset, uint32_t length,
                                   const struct cli_ac_patt *pattern, uint32_t pp, uint16_t specialcnt, uint32_t *start, uint32_t *end)
{
    int match;
    uint16_t wc, i;

    match = 1;

    /* forward (pattern) validation; determines end */
    for (i = pp; i < pattern->length[0] && bp < length; i++) {
        AC_MATCH_CHAR(pattern->pattern[i], buffer[bp], 0);
        if (!match)
            return 0;

        bp++;
    }
    *end = bp;

In the code above, this is where the scanned buffer and the corresponding signature pattern are finally compared.

If you actually debug an Eicar scan, you can confirm that at this point the signature pattern in pattern->pattern and the scan data in buffer are being compared one byte at a time.

image-20251217210820193

Summary

While tracing ClamAV’s scan logic, I ended up reaching the implementation of an information retrieval algorithm.

It was worthwhile because it deepened my understanding of how AV can scan large amounts of data at high speed.