This page has been machine-translated from the original page.
In the previous article, I briefly followed the processing flow up to the point where the cli_scan_fmap function is called when scanning an Eicar file with clamscan.
perf_start(ctx, PERFT_RAW);
ret = cli_scan_fmap(ctx,
type == CL_TYPE_TEXT_ASCII ? CL_TYPE_ANY : type,
false,
&ftoffset,
acmode,
NULL);
perf_stop(ctx, PERFT_RAW);This time, starting from the implementation of cli_scan_fmap, the core function for file scanning in ClamAV, I will summarize the outline of the Aho–Corasick algorithm used for signature matching in AntiVirus software.
Table of Contents
- Background: Pattern Matching in ClamAV
- What Is the Aho-Corasick Algorithm?
- Aho-Corasick in ClamAV
- Summary
Background: Pattern Matching in ClamAV
The cli_scan_fmap function, the core function for file scanning in ClamAV, roughly speaking scans a memory map abstracted as the cl_fmap structure introduced in the previous article.
Inside it, processing such as hash-based checks and pattern matching against the loaded signature database is performed.
After performing initialization for scanning, cli_scan_fmap appears to read up to SCANBUFF (0x20000 bytes) from the file map as a single chunk and then scan it with the matcher_run function.
This matcher_run function is called with the following parameters.
static inline cl_error_t matcher_run(
const struct cli_matcher *root,
const unsigned char *buffer, uint32_t length,
const char **virname, struct cli_ac_data *mdata,
uint32_t offset,
const struct cli_target_info *tinfo,
cli_file_t ftype,
struct cli_matched_type **ftoffset,
unsigned int acmode,
unsigned int pcremode,
struct cli_ac_result **acres,
fmap_t *map,
struct cli_bm_off *offdata,
struct cli_pcre_off *poffdata,
cli_ctx *ctx
)The matcher_run function scans the received buffer. It first performs a Boyer-Moore-based scan with cli_bm_scanbuff (its actual behavior is closer to the Wu method), and then performs pattern matching with the Aho-Corasick algorithm in cli_ac_scanbuff.
What Is the Aho-Corasick Algorithm?
The Aho-Corasick algorithm is a string-search algorithm originally devised to efficiently search for specific keywords in documents. It is one way to perform what is sometimes called a “Common Prefix Search”: extracting every keyword that partially matches from any position in a given text.
Reference: Aho–Corasick algorithm - Wikipedia
Reference: Aho Corasick 法 - naoyaのはてなダイアリー
Reference: 高速な文字列探索:Daachorseの技術解説 - LegalOn Technologies Engineering Blog
In the Aho-Corasick algorithm, an automaton (a model of computer states and transitions) is built in advance from the keyword patterns you want to search for, and pattern matching is then performed against a given buffer.
For AntiVirus pattern matching such as in ClamAV, the automaton can be created beforehand from the preloaded signature database, so the computational cost required for pattern matching is linear with respect to the input buffer (and the matched entries).
In this way, the Aho-Corasick algorithm can efficiently check whether many signatures defined in a signature database are contained in a specific data stream, making it well suited to implementing pattern-matching processing in AntiVirus software such as ClamAV.
Implementing the Aho-Corasick Algorithm
Building a Trie
To implement the Aho-Corasick algorithm, we first create an automaton based on a Trie built from a fixed set of keywords.
A Trie is a tree structure that represents a set of strings.
As shown below, a Trie uses the empty string as the Root and manages multiple strings together by grouping those that share the same prefix.
In the example above, the set of keywords "A", "to", "tea", "ted", "ten", "i", "in", "inn" is represented as a Trie, and the first node contains the unique prefixes "t", "A", "i".
After that, the Trie is completed by assigning nodes one character at a time for items in the keyword set that share common prefixes.
Reference: すごいTrie #アルゴリズム - Qiita
Reference: トライ (データ構造) - Wikipedia
Implementing a Trie
Next, we will implement a Trie in Python.
First, we define the TrieNode class as a node in the Trie.
Each node will hold the following two pieces of information.
- A list indicating the next nodes (
TrieNode) (defined as a dictionary in this example) - A value indicating whether this node is terminal
@dataclass
class TrieNode:
children: Dict[str, "TrieNode"] = field(default_factory=dict)
is_end: bool = FalseHere, field(default_factory=dict) instructs the class defined with @dataclass to use a newly created object produced by dict each time an instance is generated. (See the references below for details.)
Reference: dataclass で万物に型を付けよう|しゅんけー
Reference: 【Python】dataclassで初心者が必ずハマる default_factory の罠を徹底解説 #Python - Qiita
Now that we have defined Trie nodes, next we implement the Trie class, which adds nodes and searches them.
As described above, the Root of the Trie is empty, so when instantiating the class we assign an empty TrieNode to self.root.
class Trie:
def __init__(self) -> None:
self.root = TrieNode()In the insert method for registering keywords in the Trie, the string keyword is taken one character at a time and the Trie is traversed from the Root in order.
At this time, if the target character does not yet exist in the child nodes, a newly created TrieNode instance is registered in the dictionary as a child node.
Next, we implement the is_contains method, which searches whether a specific keyword is registered in the Trie, as follows.
def is_contains(self, keyword: str) -> bool:
node = self._find(keyword)
return node.is_end if node else False
def _find(self, keyword: str) -> Optional[TrieNode]: # TrieNode | None
node = self.root
for ch in keyword:
node = node.children.get(ch)
if node is None:
return None
return nodeThe final code is shown below.
from dataclasses import dataclass, field
from typing import Dict, Optional
@dataclass
class TrieNode:
children: Dict[str, "TrieNode"] = field(default_factory=dict)
is_end: bool = False
class Trie:
def __init__(self) -> None:
self.root = TrieNode()
def insert(self, keyword: str) -> None:
node = self.root
for ch in keyword:
if ch not in node.children:
node.children[ch] = TrieNode()
node = node.children[ch]
node.is_end = True
def is_contains(self, keyword: str) -> bool:
node = self._find(keyword)
return node.is_end if node else False
def _find(self, keyword: str) -> Optional[TrieNode]: # TrieNode | None
node = self.root
for ch in keyword:
node = node.children.get(ch)
if node is None:
return None
return node
if __name__ == "__main__":
trie = Trie()
for w in ["he", "she", "his", "hers", "I", "my", "ME", "mine"]:
trie.insert(w)
assert(trie.is_contains("her") == False)
assert(trie.is_contains("hero") == False)
assert(trie.is_contains("she") == True)
assert(trie.is_contains("I") == True)
assert(trie.is_contains("i") == False)
assert(trie.is_contains("ME") == True)
assert(trie.is_contains("Me") == False)About the Aho-Corasick Automaton
With the Trie implementation above, we can now determine whether “a specific keyword exists in a predefined set,” but a simple Trie cannot be used to detect the “partial matches within a byte stream” that AntiVirus pattern matching requires.
So, based on the Trie structure, we create a finite automaton for searching with the Aho-Corasick method.
Specifically, we add Failure Links and Output to the Trie implemented as a tree structure.
Failure Links are links that let the search transition to the next state that may match, instead of restarting the search, when matching a keyword character fails partway through.
As a result, for example, when a pattern that has matched ...ABCD mismatches on the next character, instead of starting over it can use the fact that the suffix ...BCD may still match, transition to the B -> C -> D nodes in the Trie structure, and continue searching efficiently.
Reference: AHO CORASICK ALGORITHM. The Aho-Corasick algorithm is a… | by Viraj Tammana | Medium
Reference: Aho-Corasick Algorithm: Key Concepts, Code & Real Examples
This mechanism prevents the search position from ever rewinding within the input byte stream, so pattern matching can be performed in linear time with respect to the input.
The Output values, meanwhile, contain the keywords that match when that state is reached.
Reference: Aho–Corasick algorithm - Wikipedia
Implementing Aho-Corasick
Based on the Trie implementation above, I will build an automaton and try implementing the Aho-Corasick string-search algorithm.
First, as before, I define the Trie node with Failure Links added as the ACNode class.
The string array outputs is used to store the keywords matched up to that node.
@dataclass
class ACNode:
children: Dict[str, "ACNode"] = field(default_factory=dict)
outputs: List[str] = field(default_factory=list) # mached keywords list
failure_link: Optional["ACNode"] = None # ACNode | NoneNext, I define the AhoCorasick class, which creates the automaton and performs searches with the Aho-Corasick method.
At initialization time, it sets an empty node at the Root just as before. (The Root’s failure_link is set to the Root itself.)
class AhoCorasick:
def __init__(self) -> None:
self.root = ACNode()
self.root.failure_link = self.rootNext, we define the insert method that creates a Trie from predefined keywords as shown below.
The implementation is exactly the same as the insert method earlier.
def insert(self, keyword: str) -> None:
node = self.root
for ch in keyword:
if ch not in node.children:
node.children[ch] = ACNode()
node = node.children[ch]
node.outputs.append(keyword)Once the Trie has been created, we next implement the build method, which constructs the Aho-Corasick finite automaton from it.
To create the Aho-Corasick finite automaton, we traverse the Trie with BFS and add the required information to each node.
First, define q, a deque that stores instances of the ACNode class imported from collections, and while adding the nodes directly under the Root to q, link all of their failure_link values to the Root.
q: deque[ACNode] = deque()
for child in self.root.children.values():
child.failure_link = self.root
q.append(child)Next, we take nodes out of the queue with popleft and examine the dictionary keys pointing to child nodes together with the ACNode instances.
Here, the code follows the parent node’s failure_link in order until it finds a child node matching the prefix, and then assigns the ultimately found node (or the Root) to that child node’s failure_link.
while q:
cursor = q.popleft()
for ch, next_item in cursor.children.items():
q.append(next_item)
f = cursor.failure_link
while f is not None and f is not self.root and ch not in f.children:
f = f.failure_link
if f is not None and ch in f.children:
next_item.failure_link = f.children[ch]
else:
next_item.failure_link = self.root
if next_item.failure_link and next_item.failure_link.outputs:
next_item.outputs.extend(next_item.failure_link.outputs)Finally, the extend method is used to concatenate the failure_link outputs into outputs, which is a List[str].
The reason the node’s outputs is extended with the failure_link outputs is that, when that node is treated as a terminal node, there can also be matches with the outputs at the failure destination.
Reference: 【Python】appendとextendでよく間違えるところ #Python3 - Qiita
Now that the finite automaton is complete, we finally implement the search method, which searches the received text for predefined keywords in O(N).
def search(self, text: str) -> None:
res: List[Tuple[int, int, str]] = []
node = self.root
for i, ch in enumerate(text):
while node is not self.root and ch not in node.children:
node = node.failure_link if node.failure_link is not None else self.root
node = node.children.get(ch, self.root)
if len(node.outputs) > 0:
print(i, node.outputs)When you run the above, it displays which keywords matched at the position of the matched terminal character.
(There are also cases where two keywords match at the same position, such as "she" and "he".)
The final implementation looks like this.
from __future__ import annotations
from dataclasses import dataclass, field
from collections import deque
from typing import Dict, List, Optional, Tuple
@dataclass
class ACNode:
children: Dict[str, "ACNode"] = field(default_factory=dict)
outputs: List[str] = field(default_factory=list) # mached keywords list
failure_link: Optional["ACNode"] = None # ACNode | None
class AhoCorasick:
def __init__(self) -> None:
self.root = ACNode()
self.root.failure_link = self.root
def insert(self, keyword: str) -> None:
node = self.root
for ch in keyword:
if ch not in node.children:
node.children[ch] = ACNode()
node = node.children[ch]
node.outputs.append(keyword)
def build(self) -> None:
q: deque[ACNode] = deque()
for child in self.root.children.values():
child.failure_link = self.root
q.append(child)
while q:
cursor = q.popleft()
for ch, next_item in cursor.children.items():
q.append(next_item)
f = cursor.failure_link
while f is not None and f is not self.root and ch not in f.children:
f = f.failure_link
if f is not None and ch in f.children:
next_item.failure_link = f.children[ch]
else:
next_item.failure_link = self.root
if next_item.failure_link and next_item.failure_link.outputs:
next_item.outputs.extend(next_item.failure_link.outputs)
def search(self, text: str) -> None:
res: List[Tuple[int, int, str]] = []
node = self.root
for i, ch in enumerate(text):
while node is not self.root and ch not in node.children:
node = node.failure_link if node.failure_link is not None else self.root
node = node.children.get(ch, self.root)
if len(node.outputs) > 0:
print(i, node.outputs)
if __name__ == "__main__":
ac = AhoCorasick()
for w in ["he", "she", "his", "hers", "I", "my", "ME", "mine"]:
ac.insert(w)
ac.build()
text = "asdfahishersIadfsamaMEandOhers-mefsadfasmines"
ac.search(text)When you run this, you can get the following result.
Aho-Corasick in ClamAV
Now that we have an overview of the Aho–Corasick algorithm, let’s briefly look at how scanning using the Aho–Corasick algorithm is performed in ClamAV scans.
First, scanning using AC in ClamAV is performed by the cli_ac_scanbuff function.
cl_error_t cli_ac_scanbuff(
const unsigned char *buffer,
uint32_t length,
const char **virname,
void **customdata,
struct cli_ac_result **res,
const struct cli_matcher *root,
struct cli_ac_data *mdata,
uint32_t offset,
cli_file_t ftype,
struct cli_matched_type **ftoffset,
unsigned int mode,
cli_ctx *ctx)The buffer provided when this function is called is assigned the data to be scanned, such as the Eicar text.
Also, nodes in the AC method are defined as the cli_ac_node structure.
struct cli_ac_patt {
uint16_t *pattern, *prefix, length[3], prefix_length[3];
uint32_t mindist, maxdist;
uint32_t sigid;
uint32_t lsigid[3];
uint16_t ch[2];
char *virname;
void *customdata;
uint16_t ch_mindist[2];
uint16_t ch_maxdist[2];
uint16_t parts, partno, special, special_pattern;
struct cli_ac_special **special_table;
uint16_t rtype, type;
uint32_t offdata[4], offset_min, offset_max;
uint32_t boundary;
uint8_t depth;
uint8_t sigopts;
};
struct cli_ac_list {
struct cli_ac_patt *me;
union {
struct cli_ac_node *node;
struct cli_ac_list *next;
};
struct cli_ac_list *next_same;
};
struct cli_ac_node {
struct cli_ac_list *list;
struct cli_ac_node **trans, *fail;
};Within this implementation, it starts from the Root node obtained by root->ac_root, then follows the transition destinations corresponding to buffer[i] one by one from trans (the next node), and when the node is terminal it begins the pattern-matching process.
current = root->ac_root;
for (i = 0; i < length; i++) {
current = current->trans[buffer[i]];
if (UNLIKELY(IS_FINAL(current))) {
struct cli_ac_list *faillist = current->fail->list;
pattN = current->list;
/* omitted */
}
}Various checks are performed here, but the point I particularly want to focus on is the ac_forward_match_branch function, which is called further from the ac_findmatch function.
/* state should reset on call, recursion depth = number of alternate specials */
/* each loop iteration starts on the NEXT sequence to validate */
static int ac_forward_match_branch(const unsigned char *buffer, uint32_t bp, uint32_t offset, uint32_t fileoffset, uint32_t length,
const struct cli_ac_patt *pattern, uint32_t pp, uint16_t specialcnt, uint32_t *start, uint32_t *end)
{
int match;
uint16_t wc, i;
match = 1;
/* forward (pattern) validation; determines end */
for (i = pp; i < pattern->length[0] && bp < length; i++) {
AC_MATCH_CHAR(pattern->pattern[i], buffer[bp], 0);
if (!match)
return 0;
bp++;
}
*end = bp;In the code above, this is where the scanned buffer and the corresponding signature pattern are finally compared.
If you actually debug an Eicar scan, you can confirm that at this point the signature pattern in pattern->pattern and the scan data in buffer are being compared one byte at a time.
Summary
While tracing ClamAV’s scan logic, I ended up reaching the implementation of an information retrieval algorithm.
It was worthwhile because it deepened my understanding of how AV can scan large amounts of data at high speed.