|
|
|
@ -17,7 +17,7 @@ from asl_articles.models import Publisher, Publication, Article, Author, Scenari |
|
|
|
|
from asl_articles.publishers import get_publisher_vals |
|
|
|
|
from asl_articles.publications import get_publication_vals, get_publication_sort_key |
|
|
|
|
from asl_articles.articles import get_article_vals, get_article_sort_key |
|
|
|
|
from asl_articles.utils import AppConfigParser, decode_tags, to_bool |
|
|
|
|
from asl_articles.utils import AppConfigParser, decode_tags, to_bool, squash_spaces |
|
|
|
|
|
|
|
|
|
_search_index_path = None |
|
|
|
|
_search_aliases = {} |
|
|
|
@ -338,57 +338,119 @@ def _do_fts_search( fts_query_string, col_names, results=None ): #pylint: disabl |
|
|
|
|
|
|
|
|
|
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
|
|
|
|
|
|
|
|
|
def _make_fts_query_string( query_string, search_aliases ): |
|
|
|
|
def _make_fts_query_string( query_string, search_aliases ): #pylint: disable=too-many-statements,too-many-locals |
|
|
|
|
"""Generate the SQLite query string.""" |
|
|
|
|
|
|
|
|
|
# check if this looks like a raw FTS query |
|
|
|
|
if any( regex.search(query_string) for regex in _PASSTHROUGH_REGEXES ): |
|
|
|
|
return query_string |
|
|
|
|
|
|
|
|
|
# split the query string (taking into account quoted phrases) |
|
|
|
|
words = query_string.split() |
|
|
|
|
i = 0 |
|
|
|
|
while True: |
|
|
|
|
if i >= len(words): |
|
|
|
|
break |
|
|
|
|
if i > 0 and words[i-1].startswith('"'): |
|
|
|
|
words[i-1] += " {}".format( words[i] ) |
|
|
|
|
del words[i] |
|
|
|
|
if words[i-1].startswith('"') and words[i-1].endswith('"'): |
|
|
|
|
words[i-1] = words[i-1][1:-1] |
|
|
|
|
continue |
|
|
|
|
i += 1 |
|
|
|
|
|
|
|
|
|
# clean up quoted phrases |
|
|
|
|
words = [ w[1:] if w.startswith('"') else w for w in words ] |
|
|
|
|
words = [ w[:-1] if w.endswith('"') else w for w in words ] |
|
|
|
|
words = [ w.strip() for w in words ] |
|
|
|
|
words = [ w for w in words if w ] |
|
|
|
|
|
|
|
|
|
# quote any phrases that need it |
|
|
|
|
def has_special_char( word ): |
|
|
|
|
return any( ch in word for ch in _SQLITE_FTS_SPECIAL_CHARS+" " ) |
|
|
|
|
def quote_word( word ): |
|
|
|
|
return '"{}"'.format(word) if has_special_char(word) else word |
|
|
|
|
words = [ quote_word(w) for w in words ] |
|
|
|
|
|
|
|
|
|
# handle search aliases |
|
|
|
|
for i,word in enumerate(words): |
|
|
|
|
word = word.lower() |
|
|
|
|
if word.startswith( '"' ) and word.endswith( '"' ): |
|
|
|
|
word = word[1:-1] |
|
|
|
|
aliases = search_aliases.get( word ) |
|
|
|
|
if aliases: |
|
|
|
|
aliases = [ quote_word( a ) for a in aliases ] |
|
|
|
|
aliases.sort() # nb: so that tests will work reliably |
|
|
|
|
words[i] = "({})".format( |
|
|
|
|
" OR ".join( aliases ) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# escape any special characters |
|
|
|
|
words = [ w.replace("'","''") for w in words ] |
|
|
|
|
|
|
|
|
|
return " AND ".join( words ) |
|
|
|
|
# initialize |
|
|
|
|
query_string = squash_spaces( query_string ) |
|
|
|
|
is_raw_query = any( regex.search(query_string) for regex in _PASSTHROUGH_REGEXES ) |
|
|
|
|
|
|
|
|
|
# set the order in which we will check search aliases (longest to shortest, |
|
|
|
|
# because we want an alias of "aa bb cc" to take priority over "bb". |
|
|
|
|
search_aliases = sorted( search_aliases.items(), key=lambda a: len(a[0]), reverse=True ) |
|
|
|
|
|
|
|
|
|
def is_word_char( ch ): |
|
|
|
|
return ch.isalnum() or ch in "_-#" |
|
|
|
|
def is_word( start, end ): |
|
|
|
|
"""Check if the string segment starts/ends on a word boundary.""" |
|
|
|
|
if start > 0 and is_word_char( buf[start-1] ): |
|
|
|
|
return False |
|
|
|
|
if end < len(buf) and is_word_char( buf[end] ): |
|
|
|
|
return False |
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
# look for search aliases |
|
|
|
|
buf = query_string.lower() |
|
|
|
|
matches = [] |
|
|
|
|
for alias in search_aliases: |
|
|
|
|
pos = 0 |
|
|
|
|
while True: |
|
|
|
|
# look for the next instance of the alias |
|
|
|
|
start = buf.find( alias[0], pos ) |
|
|
|
|
if start < 0: |
|
|
|
|
break |
|
|
|
|
# found one, check if it's a separate word |
|
|
|
|
end = start + len(alias[0]) |
|
|
|
|
pos = end |
|
|
|
|
if not is_word( start, end ): |
|
|
|
|
continue |
|
|
|
|
# check if it's quoted |
|
|
|
|
if buf[start-1] == '"' and buf[end] == '"': |
|
|
|
|
# yup - remove the quotes |
|
|
|
|
start -= 1 |
|
|
|
|
end += 1 |
|
|
|
|
# save the location of the match (and what it will be replaced with) |
|
|
|
|
matches.append( ( start, end, alias[1] ) ) |
|
|
|
|
# remove the matching string (for safety, to stop it from being matched again later) |
|
|
|
|
buf = buf[:start] + "#"*len(alias[0]) + buf[end:] |
|
|
|
|
|
|
|
|
|
def make_replacement_text( val ): |
|
|
|
|
"""Generate the query sub-clause for alias replacement text.""" |
|
|
|
|
if isinstance( val, str ): |
|
|
|
|
return quote( val ) |
|
|
|
|
else: |
|
|
|
|
assert isinstance( val, list ) |
|
|
|
|
return "({})".format( " OR ".join( quote(v) for v in val ) ) |
|
|
|
|
def quote( val ): |
|
|
|
|
"""Quote a string, if necessary.""" |
|
|
|
|
if not val.startswith( '"' ) or not val.endswith( '"' ): |
|
|
|
|
if any( ch in val for ch in _SQLITE_FTS_SPECIAL_CHARS+" " ): |
|
|
|
|
val = '"{}"'.format( val ) |
|
|
|
|
return val.replace( "'", "''" ) |
|
|
|
|
def tokenize( val ): |
|
|
|
|
"""Split a string into tokens (taking into account quoted phrases).""" |
|
|
|
|
if is_raw_query: |
|
|
|
|
return [ val.strip() ] |
|
|
|
|
tokens = [] |
|
|
|
|
for word in val.split(): |
|
|
|
|
if len(tokens) > 0: |
|
|
|
|
if tokens[-1].startswith( '"' ) and not tokens[-1].endswith( '"' ): |
|
|
|
|
# the previous token is a quoted phrase, continue it |
|
|
|
|
tokens[-1] += " " + word |
|
|
|
|
continue |
|
|
|
|
if not tokens[-1].startswith( '"' ) and word.endswith( '"' ): |
|
|
|
|
tokens.append( quote( word[:-1] ) ) |
|
|
|
|
continue |
|
|
|
|
tokens.append( quote( word ) ) |
|
|
|
|
if len(tokens) > 0 and tokens[-1].startswith( '"' ) and not tokens[-1].endswith( '"' ): |
|
|
|
|
# we have an unterminated quoted phrase, terminate it |
|
|
|
|
tokens[-1] += '"' |
|
|
|
|
return [ t for t in tokens if t ] |
|
|
|
|
|
|
|
|
|
# split the query string into parts (alias replacement texts, and everything else) |
|
|
|
|
parts, pos = [], 0 |
|
|
|
|
for match in matches: |
|
|
|
|
if pos < match[0]: |
|
|
|
|
# extract the text up to the start of the next match, and tokenize it |
|
|
|
|
parts.extend( tokenize( query_string[ pos : match[0] ] ) ) |
|
|
|
|
# replace the next match with its replacement text |
|
|
|
|
parts.append( make_replacement_text( match[2] ) ) |
|
|
|
|
pos = match[1] |
|
|
|
|
if pos < len(query_string): |
|
|
|
|
# extract any remaining text, and tokenize it |
|
|
|
|
parts.extend( tokenize( query_string[pos:] ) ) |
|
|
|
|
|
|
|
|
|
# clean up the parts |
|
|
|
|
parts = [ p for p in parts if p not in ('"','""') ] |
|
|
|
|
# NOTE: Quoted phrases are not handled properly if alias replacement happens inside them e.g. |
|
|
|
|
# "MMP News" -> (mmp OR "Multi-Man Publishing" OR "Multiman Publishing") AND News |
|
|
|
|
# but it's difficult to know what to do in this case. If we have an alias "foo" => "bar", |
|
|
|
|
# then this search query: |
|
|
|
|
# "foo xyz" |
|
|
|
|
# should really become: |
|
|
|
|
# ("foo xyz" OR "bar xyz") |
|
|
|
|
# but this would be ridiculously complicated to implement, and far more trouble than it's worth. |
|
|
|
|
# We can end up with un-matched quotes in these cases, so we try to clean them up here. |
|
|
|
|
def clean_part( val ): |
|
|
|
|
if len(val) > 1: |
|
|
|
|
if val.startswith( '"' ) and not val.endswith( '"' ): |
|
|
|
|
return val[1:] |
|
|
|
|
if not val.startswith( '"' ) and val.endswith( '"' ): |
|
|
|
|
return val[:-1] |
|
|
|
|
return val |
|
|
|
|
parts = [ clean_part(p) for p in parts ] |
|
|
|
|
|
|
|
|
|
return (" " if is_raw_query else " AND ").join( parts ) |
|
|
|
|
|
|
|
|
|
# --------------------------------------------------------------------- |
|
|
|
|
|
|
|
|
@ -603,19 +665,20 @@ def _load_search_aliases( aliases, aliases2 ): |
|
|
|
|
"Found duplicate search alias: {}", key, |
|
|
|
|
logger = _logger |
|
|
|
|
) |
|
|
|
|
search_aliases[ key ] = vals |
|
|
|
|
search_aliases[ key.lower() ] = vals |
|
|
|
|
|
|
|
|
|
# load the search aliases |
|
|
|
|
for row in aliases: |
|
|
|
|
vals = [ row[0] ] |
|
|
|
|
vals.extend( v.strip() for v in row[1].split( ";" ) ) |
|
|
|
|
add_search_alias( row[0], vals ) |
|
|
|
|
vals.extend( v for v in row[1].split( ";" ) ) |
|
|
|
|
vals = [ squash_spaces(v) for v in vals ] |
|
|
|
|
add_search_alias( vals[0], vals ) |
|
|
|
|
_logger.debug( "- %s => %s", row[0], vals ) |
|
|
|
|
|
|
|
|
|
# load the search aliases |
|
|
|
|
for row in aliases2: |
|
|
|
|
vals = itertools.chain( [row[0]], row[1].split("=") ) |
|
|
|
|
vals = [ v.strip().lower() for v in vals ] |
|
|
|
|
vals = [ squash_spaces(v) for v in vals ] |
|
|
|
|
_logger.debug( "- %s", vals ) |
|
|
|
|
for v in vals: |
|
|
|
|
add_search_alias( v, vals ) |
|
|
|
|