본문 바로가기
Study/Python

Python에서 AhoCorasick(아호코라식) 알고리즘 구현하기

by 개발새-발 2021. 4. 17.
반응형

지금 캐글에 논문을 입력으로 받아 논문에서 사용된 데이터셋 이름을 찾는 대회가 진행 중이다. 만약 알고 있는 데이터셋 이름이 있다면, 논문 속에서 그 이름을 단순 문자열 매칭을 이용해서 찾아내어서 출력에 반영할 수 있을 것이다. 처음에는 Test 셋이 몇 개 없는 줄 착각하고 단순 2중 for문으로 이를 구현했었다. 그러나 숨겨진 test 셋에 문서가 8000개가 넘어서 문자열 매칭만 하였음에도 50분이 넘게 걸렸었다. 그래서 이 부분을 다중 패턴 매칭 알고리즘인 아호코라식을 사용하여서 구현하였다.

 

아호코라식 알고리즘은 찾아야 하는 패턴이 많을 때 단순 2중 for문에 비해 큰 시간 차이를 보인다. 또, 검색해야 하는 범위가 방대할 때, 아호코라식으로는 커피 두 잔 정도 타는 시간에 끝낼 수 있을 것들이 단순 2중 for문으로는 3박 4일 제주도 여행을 다녀와도 다 끝나지 않을 수도 있다. 알고리즘이 이래서 중요한가 보다.

 

아호코라식에서 원하는 패턴들을 찾는 방법을 한번 보자.

def search(self,sentence):
        crr = self.head
        ret = []
        for c in sentence :
            while crr is not self.head and c not in crr:
                crr = crr.fail
            if c in crr:
                crr = crr[c]
            
            if crr.final:
                ret.extend(list(crr.out))
        return ret

중요한 건 지금 노드가 root가 아니고 지금 노드에서 주어진 문자에 의해 넘어갈 수 있는 child 노드가 없을 때 fail을 따라가는 것이다. fail이 아호코라식의 핵심이라고 볼 수 있다.

 

그렇다면 fail은 무엇인가? fail을 따라가면 trie 내부에서 지금 노드가 대표하는 패턴의 가장 긴 접미사를 대표하는 노드로 갈 수 있다. 예를 들어 세 개의 패턴 'abcdefg'와 'bcdefg', 'efg'를 찾아내기 위해 아호코라식을 사용한다고 하자. 이 3개의 패턴으로 트라이를 만들었고, "abcdefghhh"에서 이를 찾아낸다고 해보자. "abcdefghhh"로 trie를 따라 'g'까지 가면 우린 트라이상에서 a-b-c-d-e-f-g로 올것이다. 이 'g'에 는 'abcdefg'를 찾았다 라는 정보가 있다. 그런데 패턴을 보면 'abcdefg'에는 'bcdefg' 와 'efg'가 둘다 포함되어있는데 이 결과는 fail함수를 만들어 놓지 않았을 때 'abcdefg'를 따라가 만난 노드에는 포함되어있지 않다. 앞에서 fail로 가장 긴 접미사를 찾는다고 하였다. 'abcdefg'의 접미사중에 트라이 내부에서 찾을 수 있는 가장 긴 접미사는 'bcdefg'이고 한번 더 따라가면 'efg'를 쉽게 찾을 수 있다. 이제 우린 'abcdefg'를 따라가 만난 'g'에서 'abcdefg'와 fail을 따라가 찾아낸 'bcdefg','efg'를 출력으로 지정하면 'abcdefg'를 따라올때 마다 이 세개의 패턴과 일치한다는 것을 보여줄 수 있는 것이다.

 

fail함수는 트라이에서 bfs를 이용하여 가장 짧은 것들부터 처리하면 간단히 구현할 수 있다. 또, 이 fail에 짧을 코드 몇 줄만 추가하면 fail을 따라가 output을 갱신하는 과정도 간단하게 처리할 수 있다. 아래는 fail을 구축하는 과정이고, output의 갱신도 동시에 이루어진다.

def constructfail(self):
        queue = Queue()
        self.head.fail = self.head
        queue.put(self.head)
        while not queue.empty():
            crr = queue.get()
            for nextc in crr:
                child = crr[nextc]
                
                if crr is self.head:
                    child.fail = self.head
                else :
                    f = crr.fail
                    while f is not self.head and nextc not in f:
                        f = f.fail
                    if nextc in f:
                        f = f[nextc]
                    child.fail = f
                
                child.addout(child.fail.out)
                child.final |= child.fail.final
                
                queue.put(child)

 

아래는 내가 만들어본 AhoCorasick 클래스이다.

from queue import Queue

class Node(dict):
        def __init__(self):
            super().__init__()
            self.final = False;
            
            self.out = set();
            self.fail = None;
            
        def addout(self,out):
            if type(out) is set:
                self.out = self.out.union(out)
            else :
                self.out.add(out)
        
        def addchild(self,alphabet,node = None):
            self[alphabet] = Node() if node is None else node

class AC():
       
    def __init__(self,patterns):
        self.patterns = patterns
        self.head = Node()
        
        self.maketrie()
        self.constructfail()
        
    def search(self,sentence):
        crr = self.head
        ret = []
        for c in sentence :
            while crr is not self.head and c not in crr:
                crr = crr.fail
            if c in crr:
                crr = crr[c]
            
            if crr.final:
                ret.extend(list(crr.out))
        return ret
    
    def maketrie(self):
        for pattern in self.patterns:
            crr = self.head
            for c in pattern :
                if c not in crr:
                    crr.addchild(c)
                crr = crr[c]
            crr.final = True
            crr.addout(pattern)
            
    def constructfail(self):
        queue = Queue()
        self.head.fail = self.head
        queue.put(self.head)
        while not queue.empty():
            crr = queue.get()
            for nextc in crr:
                child = crr[nextc]
                
                if crr is self.head:
                    child.fail = self.head
                else :
                    f = crr.fail
                    while f is not self.head and nextc not in f:
                        f = f.fail
                    if nextc in f:
                        f = f[nextc]
                    child.fail = f
                
                child.addout(child.fail.out)
                child.final |= child.fail.final
                
                queue.put(child)

kaggle에서 위 클래스로 문자열 탐색만 돌려본 결과이다. 제출 시간이 단순 2중 for문보다 빨라졌다.

www.kaggle.com/kazakan/simple-string-matching-with-ahocorasick

 

Simple string matching with AhoCorasick

Explore and run machine learning code with Kaggle Notebooks | Using data from Coleridge Initiative - Show US the Data

www.kaggle.com

 

알고리즘은 항상 c++로 공부하다 보니 이 구현은 미숙한 부분이 있을 수 있습니다. 부족한 점이 있다면 지적은 언제나 받고 있으니 환영입니다!

반응형

댓글