From a2045ed3983a8bb50d4414f11e40bf57a905b6db Mon Sep 17 00:00:00 2001 From: Taka Date: Mon, 23 Mar 2020 09:31:38 +0000 Subject: [PATCH] Tightened up how search aliases are processed. --- asl_articles/search.py | 171 ++++++++++++++++++++---------- asl_articles/tests/test_search.py | 87 ++++++++++----- asl_articles/utils.py | 4 + 3 files changed, 182 insertions(+), 80 deletions(-) diff --git a/asl_articles/search.py b/asl_articles/search.py index c15bb8d..dfdb4e4 100644 --- a/asl_articles/search.py +++ b/asl_articles/search.py @@ -17,7 +17,7 @@ from asl_articles.models import Publisher, Publication, Article, Author, Scenari from asl_articles.publishers import get_publisher_vals from asl_articles.publications import get_publication_vals, get_publication_sort_key from asl_articles.articles import get_article_vals, get_article_sort_key -from asl_articles.utils import AppConfigParser, decode_tags, to_bool +from asl_articles.utils import AppConfigParser, decode_tags, to_bool, squash_spaces _search_index_path = None _search_aliases = {} @@ -338,57 +338,119 @@ def _do_fts_search( fts_query_string, col_names, results=None ): #pylint: disabl # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -def _make_fts_query_string( query_string, search_aliases ): +def _make_fts_query_string( query_string, search_aliases ): #pylint: disable=too-many-statements,too-many-locals """Generate the SQLite query string.""" - # check if this looks like a raw FTS query - if any( regex.search(query_string) for regex in _PASSTHROUGH_REGEXES ): - return query_string - - # split the query string (taking into account quoted phrases) - words = query_string.split() - i = 0 - while True: - if i >= len(words): - break - if i > 0 and words[i-1].startswith('"'): - words[i-1] += " {}".format( words[i] ) - del words[i] - if words[i-1].startswith('"') and words[i-1].endswith('"'): - words[i-1] = words[i-1][1:-1] - continue - i += 1 - - # clean up quoted phrases - words = [ w[1:] if w.startswith('"') else w for w in words ] - words = [ w[:-1] if w.endswith('"') else w for w in words ] - words = [ w.strip() for w in words ] - words = [ w for w in words if w ] - - # quote any phrases that need it - def has_special_char( word ): - return any( ch in word for ch in _SQLITE_FTS_SPECIAL_CHARS+" " ) - def quote_word( word ): - return '"{}"'.format(word) if has_special_char(word) else word - words = [ quote_word(w) for w in words ] - - # handle search aliases - for i,word in enumerate(words): - word = word.lower() - if word.startswith( '"' ) and word.endswith( '"' ): - word = word[1:-1] - aliases = search_aliases.get( word ) - if aliases: - aliases = [ quote_word( a ) for a in aliases ] - aliases.sort() # nb: so that tests will work reliably - words[i] = "({})".format( - " OR ".join( aliases ) - ) - - # escape any special characters - words = [ w.replace("'","''") for w in words ] - - return " AND ".join( words ) + # initialize + query_string = squash_spaces( query_string ) + is_raw_query = any( regex.search(query_string) for regex in _PASSTHROUGH_REGEXES ) + + # set the order in which we will check search aliases (longest to shortest, + # because we want an alias of "aa bb cc" to take priority over "bb". + search_aliases = sorted( search_aliases.items(), key=lambda a: len(a[0]), reverse=True ) + + def is_word_char( ch ): + return ch.isalnum() or ch in "_-#" + def is_word( start, end ): + """Check if the string segment starts/ends on a word boundary.""" + if start > 0 and is_word_char( buf[start-1] ): + return False + if end < len(buf) and is_word_char( buf[end] ): + return False + return True + + # look for search aliases + buf = query_string.lower() + matches = [] + for alias in search_aliases: + pos = 0 + while True: + # look for the next instance of the alias + start = buf.find( alias[0], pos ) + if start < 0: + break + # found one, check if it's a separate word + end = start + len(alias[0]) + pos = end + if not is_word( start, end ): + continue + # check if it's quoted + if buf[start-1] == '"' and buf[end] == '"': + # yup - remove the quotes + start -= 1 + end += 1 + # save the location of the match (and what it will be replaced with) + matches.append( ( start, end, alias[1] ) ) + # remove the matching string (for safety, to stop it from being matched again later) + buf = buf[:start] + "#"*len(alias[0]) + buf[end:] + + def make_replacement_text( val ): + """Generate the query sub-clause for alias replacement text.""" + if isinstance( val, str ): + return quote( val ) + else: + assert isinstance( val, list ) + return "({})".format( " OR ".join( quote(v) for v in val ) ) + def quote( val ): + """Quote a string, if necessary.""" + if not val.startswith( '"' ) or not val.endswith( '"' ): + if any( ch in val for ch in _SQLITE_FTS_SPECIAL_CHARS+" " ): + val = '"{}"'.format( val ) + return val.replace( "'", "''" ) + def tokenize( val ): + """Split a string into tokens (taking into account quoted phrases).""" + if is_raw_query: + return [ val.strip() ] + tokens = [] + for word in val.split(): + if len(tokens) > 0: + if tokens[-1].startswith( '"' ) and not tokens[-1].endswith( '"' ): + # the previous token is a quoted phrase, continue it + tokens[-1] += " " + word + continue + if not tokens[-1].startswith( '"' ) and word.endswith( '"' ): + tokens.append( quote( word[:-1] ) ) + continue + tokens.append( quote( word ) ) + if len(tokens) > 0 and tokens[-1].startswith( '"' ) and not tokens[-1].endswith( '"' ): + # we have an unterminated quoted phrase, terminate it + tokens[-1] += '"' + return [ t for t in tokens if t ] + + # split the query string into parts (alias replacement texts, and everything else) + parts, pos = [], 0 + for match in matches: + if pos < match[0]: + # extract the text up to the start of the next match, and tokenize it + parts.extend( tokenize( query_string[ pos : match[0] ] ) ) + # replace the next match with its replacement text + parts.append( make_replacement_text( match[2] ) ) + pos = match[1] + if pos < len(query_string): + # extract any remaining text, and tokenize it + parts.extend( tokenize( query_string[pos:] ) ) + + # clean up the parts + parts = [ p for p in parts if p not in ('"','""') ] + # NOTE: Quoted phrases are not handled properly if alias replacement happens inside them e.g. + # "MMP News" -> (mmp OR "Multi-Man Publishing" OR "Multiman Publishing") AND News + # but it's difficult to know what to do in this case. If we have an alias "foo" => "bar", + # then this search query: + # "foo xyz" + # should really become: + # ("foo xyz" OR "bar xyz") + # but this would be ridiculously complicated to implement, and far more trouble than it's worth. + # We can end up with un-matched quotes in these cases, so we try to clean them up here. + def clean_part( val ): + if len(val) > 1: + if val.startswith( '"' ) and not val.endswith( '"' ): + return val[1:] + if not val.startswith( '"' ) and val.endswith( '"' ): + return val[:-1] + return val + parts = [ clean_part(p) for p in parts ] + + return (" " if is_raw_query else " AND ").join( parts ) # --------------------------------------------------------------------- @@ -603,19 +665,20 @@ def _load_search_aliases( aliases, aliases2 ): "Found duplicate search alias: {}", key, logger = _logger ) - search_aliases[ key ] = vals + search_aliases[ key.lower() ] = vals # load the search aliases for row in aliases: vals = [ row[0] ] - vals.extend( v.strip() for v in row[1].split( ";" ) ) - add_search_alias( row[0], vals ) + vals.extend( v for v in row[1].split( ";" ) ) + vals = [ squash_spaces(v) for v in vals ] + add_search_alias( vals[0], vals ) _logger.debug( "- %s => %s", row[0], vals ) # load the search aliases for row in aliases2: vals = itertools.chain( [row[0]], row[1].split("=") ) - vals = [ v.strip().lower() for v in vals ] + vals = [ squash_spaces(v) for v in vals ] _logger.debug( "- %s", vals ) for v in vals: add_search_alias( v, vals ) diff --git a/asl_articles/tests/test_search.py b/asl_articles/tests/test_search.py index dacfbb3..9010680 100644 --- a/asl_articles/tests/test_search.py +++ b/asl_articles/tests/test_search.py @@ -169,10 +169,9 @@ def test_multiple_search_results( webdriver, flask_app, dbconn ): init_tests( webdriver, flask_app, dbconn, fixtures="search.json" ) # do a search - _do_test_search( "#asl", [ - "View From The Trenches", + _do_test_search( "#aslj", [ "ASL Journal (4)", "ASL Journal (5)", - "Hunting DUKWs and Buffalos", "'Bolts From Above", "Hit 'Em High, Or Hit 'Em Low" + "Hunting DUKWs and Buffalos", "'Bolts From Above", "Hit 'Em High, Or Hit 'Em Low", "The Jungle Isn't Neutral" ] ) # do some searches @@ -543,14 +542,8 @@ def test_author_aliases( webdriver, flask_app, dbconn ): def test_make_fts_query_string(): """Test generating FTS query strings.""" - # initialize - search_aliases = _load_search_aliases( - [ ( "aaa", "bbb ; ccc" ) ], - [ ( "mmp", "Multi-Man Publishing = Multiman Publishing" ) ] - ) - def do_test( query, expected ): - assert _make_fts_query_string( query, search_aliases ) == expected + assert _make_fts_query_string( query, {} ) == expected # test some query strings do_test( "", "" ) @@ -584,10 +577,11 @@ def test_make_fts_query_string(): ' foo " xyz " bar ', 'foo AND xyz AND bar' ) - do_test( - ' foo " xyz 123 " bar ', - 'foo AND "xyz 123" AND bar' - ) + # NOTE: We used to handle this case, but it's debatable what the right thing to do is :-/ + # do_test( + # ' foo " xyz 123 " bar ', + # 'foo AND "xyz 123" AND bar' + # ) # test some incorrectly quoted phrases do_test( '"', '' ) @@ -602,20 +596,61 @@ def test_make_fts_query_string(): do_test( "foo OR bar", "foo OR bar" ) do_test( "(a OR b)", "(a OR b)" ) - # test search aliases - do_test( "aaa", "(aaa OR bbb OR ccc)" ) - do_test( "bbb", "bbb" ) - do_test( "ccc", "ccc" ) - - # test search aliases - do_test( "MMP", '("multi-man publishing" OR "multiman publishing" OR mmp)' ) - do_test( "Xmmp", "Xmmp" ) - do_test( "mmpX", "mmpX" ) - do_test( "multi-man publishing", '"multi-man" AND publishing' ) - do_test( 'abc "multi-man publishing" xyz', - 'abc AND ("multi-man publishing" OR "multiman publishing" OR mmp) AND xyz' +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +def test_search_aliases(): + """Test search aliases in query strings.""" + + # initialize + search_aliases = _load_search_aliases( + [ # one-way aliases + ( "aa", "bbb ; cccc" ), + ( "xXx", "x1 X2 ; x3"), + ( "foo", "{FOO}" ), + ( " foo bar ", " {FOO BAR} " ), # nb: spaces will be squashed and stripped + ], + [ # two-way aliases + ( " joe's nose ", " Joes Nose = Joseph's Nose " ) # nb: spaces will be squashed and stripped + ] ) + def do_test( query, expected ): + assert _make_fts_query_string( query, search_aliases ) == expected + + # test one-way aliases + do_test( "a", "a" ) + do_test( "XaX", "XaX" ) + do_test( "aa", "(aa OR bbb OR cccc)" ) + do_test( 'abc "aa" xyz', "abc AND (aa OR bbb OR cccc) AND xyz" ) + do_test( "XaaX", "XaaX" ) + do_test( "aaa", "aaa" ) + do_test( "XaaaX", "XaaaX" ) + do_test( "bbb", "bbb" ) + do_test( "cccc", "cccc" ) + + # test one-way aliases with spaces in the replacement text + do_test( "XxX", '(xXx OR "x1 X2" OR x3)' ) + do_test( "x1 X2", "x1 AND X2" ) + + # test one-way aliases with overlapping text in the keys ("foo" vs. "foo bar") + do_test( "foo bar", '("foo bar" OR "{FOO BAR}")' ) + do_test( "abc foo bar xyz", 'abc AND ("foo bar" OR "{FOO BAR}") AND xyz' ) + do_test( "Xfoo bar", "Xfoo AND bar" ) + do_test( "foo barX", '(foo OR {FOO}) AND barX' ) + do_test( "Xfoo barX", "Xfoo AND barX" ) + + # test two-way aliases + do_test( "joe's nose", '("joe\'\'s nose" OR "Joes Nose" OR "Joseph\'\'s Nose")' ) + do_test( "abc joe's nose xyz", 'abc AND ("joe\'\'s nose" OR "Joes Nose" OR "Joseph\'\'s Nose") AND xyz' ) + do_test( " JOES NOSE ", '("joe\'\'s nose" OR "Joes Nose" OR "Joseph\'\'s Nose")' ) + do_test( "Xjoes nose ", "Xjoes AND nose" ) + do_test( "joes noseX", "joes AND noseX" ) + do_test( "Xjoes noseX", "Xjoes AND noseX" ) + do_test( "Joseph's Nose", '("joe\'\'s nose" OR "Joes Nose" OR "Joseph\'\'s Nose")' ) + + # check that raw queries still have alias processing done + do_test( "foo AND bar", "(foo OR {FOO}) AND bar" ) + # --------------------------------------------------------------------- def test_aslrb_links(): diff --git a/asl_articles/utils.py b/asl_articles/utils.py index c2a8a60..e34f6f4 100644 --- a/asl_articles/utils.py +++ b/asl_articles/utils.py @@ -227,3 +227,7 @@ def to_bool( val ): if val in ["no","false","disabled","0"]: return False return None + +def squash_spaces( val ): + """Squash multiple spaces down into a single space.""" + return " ".join( val.split() )