Knuth Morris Pratt for Lazy Bytestrings implementation

I've implemented KMP string searching for lazy bytestrings, and I'd like some help improving the performance of the code. I'd also like to know if it doesn't look correct - I've tested it pretty extensively but you never know ... I've been testing on a 7 MB file, where the search sequence is not found. Using strict byestrings, lazy bytestrings, and regular strings, I've found my algorithm is about twice as slow as the strict version. Surprisingly, the strict version is a little bit *slower* than the regular strings version. Thanks for any comments or help! Justin

Now I wonder what that 7MB file might be? :-) We (team TNT) implemented KMP over lazy bytestrings as part of our icfp 2007 contest entry. As I remember, for the DNA evaluator it gave modest speed improvements over more naïve searching. Our implementation was based upon this blog post: http://twan.home.fmf.nl/blog/ Tim
I've implemented KMP string searching for lazy bytestrings, and I'd like some help improving the performance of the code. I'd also like to know if it doesn't look correct - I've tested it pretty extensively but you never know ...
I've been testing on a 7 MB file, where the search sequence is not found. Using strict byestrings, lazy bytestrings, and regular strings, I've found my algorithm is about twice as slow as the strict version. Surprisingly, the strict version is a little bit *slower* than the regular strings version.
Thanks for any comments or help!
Justin _______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

On Wed, 2007-08-01 at 01:51 +0100, Tim Docker wrote:
Now I wonder what that 7MB file might be? :-)
We (team TNT) implemented KMP over lazy bytestrings as part of our icfp 2007 contest entry. As I remember, for the DNA evaluator it gave modest speed improvements over more naïve searching. Our implementation was based upon this blog post:
If anyone can come up with a fast search implementation for strict and/or lazy ByteStrings I'll include it in the bytestring package. The current Data.ByteString search uses a rather under-optimised KMP implementation. I say under-optimised as I think it typically gets beaten by a naive search. Duncan

If anyone can come up with a fast search implementation for strict and/or lazy ByteStrings I'll include it in the bytestring package.
Out of curiosity, is it intentional or an oversight that findSubstrings is only implemented on strict ByteStrings? (at least with the libs supplied with ghc-6.6.1). Tim

twd_gg:
If anyone can come up with a fast search implementation for strict and/or lazy ByteStrings I'll include it in the bytestring package.
Out of curiosity, is it intentional or an oversight that findSubstrings is only implemented on strict ByteStrings? (at least with the libs supplied with ghc-6.6.1).
Lazy functional programmers. -- Don

On Wed, 2007-08-01 at 02:30 +0100, Tim Docker wrote:
If anyone can come up with a fast search implementation for strict and/or lazy ByteStrings I'll include it in the bytestring package.
Out of curiosity, is it intentional or an oversight that findSubstrings is only implemented on strict ByteStrings? (at least with the libs supplied with ghc-6.6.1).
We never got round to it, which is something I'd like to correct. Duncan

On 7/31/07, Tim Docker
Now I wonder what that 7MB file might be? :-)
We (team TNT) implemented KMP over lazy bytestrings as part of our icfp 2007 contest entry. As I remember, for the DNA evaluator it gave modest speed improvements over more naïve searching. Our implementation was based upon this blog post:
Tim
What prompted this was we adapted Oleg's KMP module for PackedStrings to lazy bytestrings, and it was too slow. I am actually interested in implementing KMP for Data.Sequence, but wanted to get bytestrings out of the way first. Justin

jgbailey:
I've implemented KMP string searching for lazy bytestrings, and I'd like some help improving the performance of the code. I'd also like to know if it doesn't look correct - I've tested it pretty extensively but you never know ...
I've been testing on a 7 MB file, where the search sequence is not found. Using strict byestrings, lazy bytestrings, and regular strings, I've found my algorithm is about twice as slow as the strict version. Surprisingly, the strict version is a little bit *slower* than the regular strings version.
Thanks for any comments or help!
Justin
Also, be sure to compare against a naive search, optimised for strict and lazy bytestrings, http://hpaste.org/1803 If its not faster than those 2, then you're doing something wrong :) -- Don

On 7/31/07, Donald Bruce Stewart
jgbailey: Also, be sure to compare against a naive search, optimised for strict and lazy bytestrings,
If its not faster than those 2, then you're doing something wrong :)
-- Don
Yes, I was really hoping someone (dons :)) would look at my code and give any suggestions. Did the attachment come through? Justin

Justin Bailey wrote:
On 7/31/07, Donald Bruce Stewart
wrote: jgbailey: Also, be sure to compare against a naive search, optimised for strict and lazy bytestrings,
If its not faster than those 2, then you're doing something wrong :)
-- Don
Yes, I was really hoping someone (dons :)) would look at my code and give any suggestions. Did the attachment come through?
Justin
I will make measurements. I already see that
{-# OPTIONS_GHC -fbang-patterns #-} module KMPSeq (kmpMatch)
kmpMatch' :: B.ByteString -> Int64 -> Int -> Int64 kmpMatch' !str !currIdx !patShift
let matchLength' !pat !str !cnt | B.null pat = {-# SCC "kmpMatch_nullPatStr" #-} cnt matchLength' !pat !str !cnt | B.null str = {-# SCC "kmpMatch_nullStr" #-} 0 matchLength' !pat !str !cnt | (B.head pat == B.head str) = {-# SCC "kmpMatch_eqHead" #-}
Is a huge win. It now uses 3 MB instead of 243 MB and runs faster. -- Chris

My optimized (and fixed) version of the code is attached. I benchmarked it with:
module Main(main) where
import KMPSeq import qualified Data.ByteString.Lazy as B import qualified Data.ByteString.Lazy.Char8 as BC
infile = "endo.dna"
Modified by one character from the original copied from the endo.dna The leading "IIIII" is often overlapping with the suffix of a prefix of substring so KMP should help here
substring1 = BC.pack "IIIIIPIIIPIIIIIPIIIPFFICCPIIIPFFFFFPIIIPFFFFFPIIIPIIFIIPIIIPIIIIIPIIIPIIIIIPIIIPIIIIIPIIIPCCCCCPI"
original "IIIIIPIIIPIIIIIPIIIPFFICCPIIIPFFFFFPIIIPFFFFFPIIIPIIIIIPIIIPIIIIIPIIIPIIIIIPIIIPIIIIIPIIIPCCCCCPI"
getSearch = do B.readFile infile
main :: IO () main = do print . kmpMatch substring1 =<< getSearch
The bug fix is actually a big speed win for this benchmark. The long commentary I wrote has been sent to Justin Bailey with each step I took and each measurement I made. (800 lines with all the pasted profiles). The speedup was from 18.5 seconds to 2.6 seconds on the above benchmark. Cheers, Chris Kuklewicz
From Martin Amis' short story "The Janitor on Mars":
"...The War with the Scythers of the Orion Spur was hotly prosecuted for just over a billion years. Who won? We did. They're still there, the Scythers. Their planet is still there. The nature of war changed, during that trillennium. It was no longer nuclear or quantum-gravitational. It was neurological. Informational. Life goes on for the Scythers, but its quality has been subtly reduced. We fixed it so that they think they're simulations in a deterministic computer universe. It is believed that this is the maximum suffering you can visit on a type-v world." http://www.metafilter.com/mefi/18176 -- Code by Justin Bailey with optimizations by Chris Kuklewicz {-# OPTIONS_GHC -fbang-patterns #-} module KMPSeq (kmpMatch) where import qualified Data.Array.Base as Base (unsafeAt) import qualified Data.Array.Unboxed as Unboxed (UArray) import qualified Data.Array.IArray as IArray ((!), Array, array) import Data.List as List (map, filter, length, null, take, maximum, foldr, all, drop) import qualified Data.ByteString.Lazy as B import qualified Data.ByteString as S import qualified Data.ByteString.Base as B (unsafeHead,unsafeTail,unsafeDrop,unsafeIndex) import GHC.Int (Int64) import Test.QuickCheck import Data.ByteString as S (isSubstringOf, pack, ByteString) -- Only used during testing import Debug.Trace (trace) import System.IO.Unsafe import System.IO {- Returns the first index for which the pattern given is found in the string given, or -1 if the pattern does not appear. -} kmpMatch :: B.ByteString -- ^ The pattern -> B.ByteString -- ^ The string to search -> Int64 -- ^ Result kmpMatch patLazy search | B.null patLazy = 0 -- Empty pattern matches right away. | otherwise = let pat :: S.ByteString pat = S.concat (B.toChunks patLazy) lookupTable = computeLookupS pat patLen = fromIntegral $ S.length pat -- currPatIdx is our position in the pattern - sometimes we don't -- have to match the whole pattern again. -- -- currIdx is our position in the string. We B.drop elements from str -- on each iteration so have to track our position separately. kmpMatch' :: B.ByteString -> Int64 -> Int -> Int64 kmpMatch' !str !currIdx !patShift | B.null str = -1 | otherwise = -- Find length of prefix in our pattern which matches string (possibly zero) let matchLength :: Int matchLength = let matchLength' :: B.ByteString -> Int -> Int matchLength' !str !cnt | cnt == patLen = patLen | B.null str = 0 | B.unsafeIndex pat cnt == B.head str = matchLength' (B.tail str) (succ cnt) | otherwise = cnt in matchLength' str patShift shiftAmt = Base.unsafeAt lookupTable matchLength {-# INLINE matchLen64 #-} matchLen64 = fromIntegral (matchLength-patShift) in case matchLength of _ | matchLength == patLen -> currIdx - fromIntegral patShift -- found complete match | matchLength == patShift -> kmpMatch' (B.tail str) (currIdx + 1) 0 | shiftAmt > 0 -> kmpMatch' (B.drop matchLen64 str) (currIdx + matchLen64) shiftAmt | otherwise -> kmpMatch' (B.drop matchLen64 str) (currIdx + matchLen64) patShift in kmpMatch' search 0 0 -- These tests have failed in the past -- kmpMatch (toLazyBS "ccb") (toLazyBS "abcdcccb") == 5 -- kmpMatch (toLazyBS "bbaa") (toLazyBS "bdcdbbbbaa") == 6 -- kmpMatch (toLazyBS "cccadadc") (toLazyBS "adaccccadadc") == 4 -- This next one has caused an infinite loop previously. -- kmpMatch (toLazyBS "bbbb") (toLazyBS "dddcaaaddaaabdacbcccabbada") {-| Given our pattern, get all the prefixes of the pattern. For each of those prefixes, find the longest prefix from the original pattern that is also a suffix of the prefix segment being considered, and is not equal to it. The argument given to overlap is the length of the prefix matched so far, and the length of the longest prefix, which is a suffix and is not equal to it, is the value overlap returns. If a given prefix has no possible overlap, it is mapped to -1. -} overlap :: S.ByteString -> [(Int, Int)] overlap pat = let patternLength = S.length pat -- Necessary to reverse prefixes since 'isSuffixOf' not implemented for lazy bytestrings. prefixes = List.map S.reverse $ List.take (fromIntegral patternLength) $ S.inits pat longestPreSuffix prefix = -- Find prefixes that are a prefix of this suffix, but -- are not equal to it. let suffixes = List.filter (\p -> p `S.isPrefixOf` prefix && p /= prefix) prefixes in if S.null prefix || List.null suffixes then 0 else List.maximum (List.map S.length suffixes) in List.map (\prefix -> (fromIntegral $ S.length prefix, fromIntegral $longestPreSuffix prefix)) prefixes {- | Given a string representing a search pattern, this function returns a function which represents, for each prefix of that pattern, the maximally long prefix of the pattern which is a suffix of the indicated pattern segment. If there is no such prefix, 0 is returned. -} computeLookupS :: S.ByteString -> Unboxed.UArray Int Int computeLookupS pat = let patLen = fromIntegral $ S.length pat table :: Unboxed.UArray Int Int table = {-# SCC "computeLookup_table" #-} IArray.array (0, patLen - 1) (overlap pat) in table computeLookup :: B.ByteString -> (Int -> Int) computeLookup pat = let patLen = fromIntegral $ B.length pat table :: Unboxed.UArray Int Int table = {-# SCC "computeLookup_table" #-} IArray.array (0, patLen - 1) (overlap . S.concat . B.toChunks $ pat) in Base.unsafeAt table -- Types, instances and utility functions for testing purposes. newtype PatternChar = PatternChar Char deriving Show instance Arbitrary PatternChar where arbitrary = oneof (List.map (return . PatternChar) ['a', 'b', 'c', 'd']) patternsToString :: [PatternChar] -> B.ByteString patternsToString chars = B.pack $ List.foldr (\(PatternChar char) str -> (toEnum $ fromEnum char) : str) [] chars patternsToStrictString :: [PatternChar] -> S.ByteString patternsToStrictString chars = S.pack $ List.foldr (\(PatternChar char) str -> (toEnum $ fromEnum char) : str) [] chars toLazyBS :: String -> B.ByteString toLazyBS = B.pack . List.map (toEnum . fromEnum) toStrictBS :: String -> S.ByteString toStrictBS = S.pack . List.map (toEnum . fromEnum) -- Test that 0 and 1 element always return 0, if present. prop_testZero :: [PatternChar] -> Property prop_testZero pat = let lookup = computeLookup (patternsToString pat) in not (List.null pat) ==> if List.length pat > 1 then lookup 0 == 0 && lookup 1 == 0 else lookup 0 == 0 -- Test that all overlaps found are actually prefixes of the -- pattern string prop_testSubset :: [PatternChar] -> Property prop_testSubset pat = let patStr = patternsToString pat lookup = computeLookup patStr prefix len = B.take (fromIntegral len) testPrefix len = if lookup len == 0 then True else (prefix (lookup len) patStr) `B.isPrefixOf` (prefix len patStr) in not (List.null pat) ==> trivial (B.null patStr) $ List.all testPrefix [0 .. List.length pat - 1] -- Test that the prefix given is the maximal prefix. That is, -- add one more character makes it either equal to the string -- or not a prefix. prop_testCorrectPrefix :: [PatternChar] -> Property prop_testCorrectPrefix pat = let patStr = patternsToString pat lookup = computeLookup patStr isNeverSuffix len = let origPrefix = prefix len patStr -- Drop 1 to remove empty list allPrefixes = List.drop 1 $ B.inits origPrefix in List.all (\p -> B.null p || p == origPrefix || not ((B.reverse p) `B.isPrefixOf` (B.reverse origPrefix))) allPrefixes prefix len = B.take (fromIntegral len) -- True if the prefix returned from lookup for the length given produces -- a string which is a suffix of the original prefix. isRealSuffix len = (B.reverse (prefix (lookup len) patStr)) `B.isPrefixOf` (B.reverse $ prefix len patStr) isLongestSuffix len = let prefixPlus = prefix ((lookup len) + 1) patStr inputPrefix = prefix len patStr in prefixPlus == inputPrefix || not ((B.reverse prefixPlus) `B.isPrefixOf` (B.reverse inputPrefix)) testTable len = if lookup len == 0 then isNeverSuffix len else isRealSuffix len && isLongestSuffix len in not (List.null pat) ==> List.all testTable [0 .. List.length pat - 1] -- Verify that, if a match is found, it is where it's supposed to be in -- the string and it can be independently found by other means. prop_testMatch :: [PatternChar] -> [PatternChar] -> Property prop_testMatch pat search = let patStr = patternsToString pat searchStr = patternsToString search strictStr = patternsToStrictString search strictPat = patternsToStrictString pat patLen = B.length patStr searchLen = B.length searchStr matchIdx = kmpMatch patStr searchStr verify = if matchIdx > -1 then (B.take patLen $ B.drop matchIdx $ searchStr) == patStr && strictPat `S.isSubstringOf` strictStr else (B.null patStr && B.null searchStr) || not (strictPat `S.isSubstringOf` strictStr) in not (List.null pat) ==> trivial (B.null searchStr) $ classify (patLen > searchLen) "Bigger pattern than search" $ classify (patLen < searchLen) "Smaller pattern than search" $ classify (patLen == searchLen) "Equal pattern and search" $ classify (matchIdx > -1) "Match Exists" $ classify (matchIdx == -1) "Match Doesn't Exist" $ verify -- Test a pattern that was known to fail. prop_testBadPat :: [PatternChar] -> Property prop_testBadPat search = let patStr = toLazyBS "bbc" patLen = B.length patStr searchStr = patternsToString search matchIdx = kmpMatch patStr searchStr strictStr = patternsToStrictString search strictPat = toStrictBS "bbc" verify = if matchIdx > -1 then (B.take patLen $ B.drop matchIdx $ searchStr) == patStr && strictPat `S.isSubstringOf` strictStr else (B.null patStr && B.null searchStr) || not (strictPat `S.isSubstringOf` strictStr) in trivial (List.null search) $ verify -- Test that a pattern on the end of the string is found OK. prop_testEndPattern :: [PatternChar] -> [PatternChar] -> Property prop_testEndPattern pat search = let patStr = patternsToString pat searchStr = patternsToString (search ++ pat) matchIdx = kmpMatch patStr searchStr strictStr = patternsToStrictString (search ++ pat) strictPat = patternsToStrictString pat patLen = B.length patStr verify = if matchIdx > -1 then (B.take patLen $ B.drop matchIdx $ searchStr) == patStr && strictPat `S.isSubstringOf` strictStr else (B.null patStr && B.null searchStr) || not (strictPat `S.isSubstringOf` strictStr) in not (List.null pat) ==> trivial (B.null searchStr) $ classify (matchIdx > -1) "Match Exists" $ classify (matchIdx == -1) "Match Doesn't Exists" $ verify props1 = [ prop_testZero , prop_testSubset , prop_testCorrectPrefix , prop_testBadPat ] props2 = [ prop_testMatch , prop_testEndPattern ] allTests = do mapM_ quickCheck props1 mapM_ quickCheck props2 {- *KMPSeq> allTests OK, passed 100 tests. OK, passed 100 tests. OK, passed 100 tests. OK, passed 100 tests (15% trivial). OK, passed 100 tests. 41% Bigger pattern than search, Match Doesn't Exist. 24% Smaller pattern than search, Match Doesn't Exist. 12% trivial, Bigger pattern than search, Match Doesn't Exist. 11% Smaller pattern than search, Match Exists. 9% Equal pattern and search, Match Doesn't Exist. 3% Equal pattern and search, Match Exists. OK, passed 100 tests (100% Match Exists). -}

G'day all.
Quoting ChrisK
My optimized (and fixed) version of the code is attached.
By the way. I don't know if this is relevant, but I'm curious if this approach is any faster: http://72.14.253.104/search?q=cache:kG4zvvkZPLYJ:www.haskell.org/hawiki/RunT... I claim nothing except that it's an ugly hack. Cheers, Andrew Bromage

Am Mittwoch, 1. August 2007 22:54 schrieb ChrisK:
My optimized (and fixed) version of the code is attached.
I adapted my KMP implementation from one and a half years ago to the problem at hand (no longer search and replace, only find index of first match, and change from Strings to ByteStrings), on my computer, for the few tests I performed, it's about 30-40% faster than Chris' (depending on the input). Chris, could you check whether this holds for your benchmark? If so, any polishing and further optimisations are welcome; if that should be the basis of an addition to the ByteString lib, I'd feel very honoured (in other words, if you consider it useful code, you're welcome to use it). Cheers, Daniel -- KMP algorithm for lazy ByteStrings {-# OPTIONS_GHC -fbang-patterns #-} module KMP (kmpMatch) where import qualified Data.Array.Base as Base (unsafeAt) import Data.Array.Unboxed (UArray, listArray) import qualified Data.Array as A (listArray, (!), elems) import qualified Data.ByteString.Lazy as B import qualified Data.ByteString as S import qualified Data.ByteString.Base as B (unsafeHead, unsafeTail, unsafeDrop, unsafeIndex) import Data.Int (Int64) import Data.Word (Word8) kmpMatch :: B.ByteString -> B.ByteString -> Int64 kmpMatch patLazy search | B.null patLazy = 0 | otherwise = kmp 0 0 search where pat :: S.ByteString pat = S.concat (B.toChunks patLazy) patLen = S.length pat sym :: Int -> Word8 sym = B.unsafeIndex pat bord = A.listArray (0,patLen) $ (-1):0:[getS (sym i) i + 1 | i <- [1 .. patLen - 1]] where getS s n | m < 0 || s == sym m = m | otherwise = getS s m where m = bord A.! n borders :: UArray Int Int borders = listArray (0,patLen) $ A.elems bord (?) :: UArray Int Int -> Int -> Int (?) = Base.unsafeAt getShift :: Word8 -> Int -> Int getShift s n = help n where help k | m < 0 || sym m == s = m | otherwise = help m where m = borders ? k kmp :: Int64 -> Int -> B.ByteString -> Int64 kmp !idx !match !srch | patLen == match = idx - fromIntegral match | B.null srch = -1 | sym match == B.head srch = kmp (idx+1) (match+1) (B.tail srch) | match == 0 = kmp (idx+1) 0 (B.tail srch) | otherwise = case getShift (B.head srch) match of -1 -> kmp idx 0 srch 0 -> kmp (idx+1) 1 (B.tail srch) shft -> kmp (idx + fromIntegral shft) (shft+1) (B.tail srch)

Oooops, stupid error before, fixed below. Missed it due to too few and simple tests, should've quickchecked :-/
Cheers, Daniel
-- KMP algorithm for lazy ByteStrings {-# OPTIONS_GHC -fbang-patterns #-} module KMP (kmpMatch) where
import qualified Data.Array.Base as Base (unsafeAt) import Data.Array.Unboxed (UArray, listArray) import qualified Data.Array as A (listArray, (!), elems)
import qualified Data.ByteString.Lazy as B import qualified Data.ByteString as S import qualified Data.ByteString.Base as B (unsafeHead, unsafeTail, unsafeDrop, unsafeIndex) import Data.Int (Int64) import Data.Word (Word8)
kmpMatch :: B.ByteString -> B.ByteString -> Int64 kmpMatch patLazy search
| B.null patLazy = 0 | otherwise = kmp 0 0 search
where pat :: S.ByteString pat = S.concat (B.toChunks patLazy) patLen = S.length pat sym :: Int -> Word8 sym = B.unsafeIndex pat bord = A.listArray (0,patLen) $ (-1):0:[getS (sym i) i + 1 | i <- [1 .. patLen - 1]] where getS s n
| m < 0 || s == sym m = m | otherwise = getS s m
where m = bord A.! n borders :: UArray Int Int borders = listArray (0,patLen) $ A.elems bord (?) :: UArray Int Int -> Int -> Int (?) = Base.unsafeAt getShift :: Word8 -> Int -> Int getShift s n = help n where help k
| m < 0 || sym m == s = m | otherwise = help m
where m = borders ? k kmp :: Int64 -> Int -> B.ByteString -> Int64 kmp !idx !match !srch
| patLen == match = idx - fromIntegral match | B.null srch = -1 | sym match == B.head srch = kmp (idx+1) (match+1) (B.tail | srch) match == 0 = kmp (idx+1) 0 (B.tail srch) | otherwise = case getShift (B.head srch) match of -1 -> kmp idx 0 srch shft -> kmp (idx+1) (shft+1) (B.tail srch)

I am still working on improving your code. And I have a "Is this a bug?" question: The "lookup = computeLookup pat" defines lookup to take an Int which represents the index into pat, where this index is 0 based and the 0th item is the head of pat. Now look at "matchLength :: Int; matchLength = let ... in matchLength' (B.drop (fromIntegral patShift) pat) str 0" That starts the "pat" from a shorter version that starts at 'patShift' and sets the last argument to matchLength' to 0. matchLength' is in the ... as: let matchLength' pat str cnt | B.null pat = cnt matchLength' pat str cnt | B.null str = 0 matchLength' pat str cnt | (B.head pat == B.head str) = matchLength' (B.drop 1 pat) (B.drop 1 str) (cnt + 1) | otherwise = cnt I see that cnt is incremented until * all of pat is matched, return cnt * all of str is used, return 0 * a mismatch is found, return cnt So a mismatch means that the character at index (patShift+cnt) in the original 'pat' had the mismatch. Now I see this that uses the 'cnt' returned from matchLength: shiftAmt = {-# SCC "kmpMatch_shiftAmt" #-} lookup matchLength And this is the bug. lookup should be given (shiftAmt + matchLength). This was not clear or obvious since * Nearly everything is Int or Int64 * The name "pat" in matchLength' shadows "pat" in kmpMatch This was not caught by quickcheck since it is an internal value and only rarely will trigger an error where a legitimate match is missed. If there is no match then this bug does not cause an incorrect answer (I think). I will fix this and continue hacking on it. -- Chris
participants (7)
-
ajb@spamcop.net
-
ChrisK
-
Daniel Fischer
-
dons@cse.unsw.edu.au
-
Duncan Coutts
-
Justin Bailey
-
Tim Docker