parent
ae2e5a61db
commit
b84d0bc7da
@ -1,68 +1,377 @@ |
||||
""" Handle search requests. """ |
||||
|
||||
import os |
||||
import sqlite3 |
||||
import tempfile |
||||
import re |
||||
import logging |
||||
|
||||
from flask import request, jsonify |
||||
|
||||
from asl_articles import app |
||||
from asl_articles.models import Publisher, Publication, Article |
||||
from asl_articles import app, db |
||||
from asl_articles.models import Publisher, Publication, Article, Author, Scenario, get_model_from_table_name |
||||
from asl_articles.publishers import get_publisher_vals |
||||
from asl_articles.publications import get_publication_vals |
||||
from asl_articles.articles import get_article_vals |
||||
from asl_articles.utils import clean_html, decode_tags, to_bool |
||||
|
||||
_search_index_path = None |
||||
_logger = logging.getLogger( "search" ) |
||||
|
||||
_SQLITE_FTS_SPECIAL_CHARS = "+-#':/." |
||||
_PASSTHROUGH_REGEXES = set( [ |
||||
re.compile( r"\bAND\b" ), |
||||
re.compile( r"\bOR\b" ), |
||||
re.compile( r"\bNOT\b" ), |
||||
re.compile( r"\((?![Rr]\))" ), |
||||
] ) |
||||
|
||||
# NOTE: The following are special search terms used by the test suite. |
||||
SEARCH_ALL = "<!all!>" |
||||
SEARCH_ALL_PUBLISHERS = "<!publishers!>" |
||||
SEARCH_ALL_PUBLICATIONS = "<!publications!>" |
||||
SEARCH_ALL_ARTICLES = "<!articles!>" |
||||
|
||||
BEGIN_HILITE = '<span class="hilite">' |
||||
END_HILITE = "</span>" |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
class SearchDbConn: |
||||
"""Context manager to handle SQLite transactions.""" |
||||
def __init__( self ): |
||||
self.conn = sqlite3.connect( _search_index_path ) |
||||
def __enter__( self ): |
||||
return self |
||||
def __exit__( self, exc_type, exc_value, traceback ): |
||||
if exc_type is None: |
||||
self.conn.commit() |
||||
else: |
||||
self.conn.rollback() |
||||
self.conn.close() |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def _get_authors( article ): |
||||
"""Return the searchable authors for an article.""" |
||||
author_ids = [ a.author_id for a in article.article_authors ] |
||||
query = db.session.query( Author ).filter( Author.author_id.in_( author_ids ) ) |
||||
return "\n".join( a.author_name for a in query ) |
||||
|
||||
def _get_scenarios( article ): |
||||
"""Return the searchable scenarios for an article.""" |
||||
scenario_ids = [ s.scenario_id for s in article.article_scenarios ] |
||||
query = db.session.query( Scenario ).filter( Scenario.scenario_id.in_( scenario_ids ) ) |
||||
return "\n".join( |
||||
"{}\t{}".format( s.scenario_display_id, s.scenario_name ) if s.scenario_display_id else s.scenario_name |
||||
for s in query |
||||
) |
||||
|
||||
def _get_tags( tags ): |
||||
"""Return the searchable tags for an article or publication.""" |
||||
if not tags: |
||||
return None |
||||
tags = decode_tags( tags ) |
||||
return "\n".join( tags ) |
||||
|
||||
# map search index columns to ORM fields |
||||
_FIELD_MAPPINGS = { |
||||
"publisher": { "name": "publ_name", "description": "publ_description" }, |
||||
"publication": { "name": "pub_name", "description": "pub_description", |
||||
"tags": lambda pub: _get_tags( pub.pub_tags ) |
||||
}, |
||||
"article": { "name": "article_title", "name2": "article_subtitle", "description": "article_snippet", |
||||
"authors": _get_authors, "scenarios": _get_scenarios, |
||||
"tags": lambda article: _get_tags( article.article_tags ) |
||||
} |
||||
} |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
@app.route( "/search", methods=["POST"] ) |
||||
def search(): |
||||
"""Run a search query.""" |
||||
"""Run a search.""" |
||||
try: |
||||
return _do_search() |
||||
except Exception as exc: #pylint: disable=broad-except |
||||
msg = str( exc ) |
||||
if isinstance( exc, sqlite3.OperationalError ): |
||||
if msg.startswith( "fts5: " ): |
||||
msg = msg[5:] |
||||
if not msg: |
||||
msg = str( type(exc) ) |
||||
return jsonify( { "error": msg } ) |
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
||||
|
||||
def _do_search(): #pylint: disable=too-many-locals,too-many-statements,too-many-branches |
||||
"""Run a search.""" |
||||
|
||||
# initialize |
||||
# parse the request parameters |
||||
query_string = request.json.get( "query" ).strip() |
||||
_logger.debug( "SEARCH: [%s]", query_string ) |
||||
if not query_string: |
||||
raise RuntimeError( "Missing query string." ) |
||||
no_hilite = to_bool( request.json.get( "no_hilite" ) ) |
||||
_logger.info( "SEARCH REQUEST: %s", query_string ) |
||||
|
||||
# check for special query terms (for testing porpoises) |
||||
results = [] |
||||
def find_special_term( term ): |
||||
nonlocal query_string |
||||
pos = query_string.find( term ) |
||||
if pos >= 0: |
||||
query_string = query_string[:pos] + query_string[pos+len(term):] |
||||
return True |
||||
return False |
||||
special_terms = { |
||||
SEARCH_ALL_PUBLISHERS: |
||||
lambda: [ get_publisher_vals(p,True) for p in Publisher.query ], #pylint: disable=not-an-iterable |
||||
SEARCH_ALL_PUBLICATIONS: |
||||
lambda: [ get_publication_vals(p,True) for p in Publication.query ], #pylint: disable=not-an-iterable |
||||
SEARCH_ALL_ARTICLES: |
||||
lambda: [ get_article_vals(a,True) for a in Article.query ] #pylint: disable=not-an-iterable |
||||
} |
||||
if find_special_term( SEARCH_ALL ): |
||||
for term,func in special_terms.items(): |
||||
results.extend( func() ) |
||||
else: |
||||
for term,func in special_terms.items(): |
||||
if find_special_term( term ): |
||||
results.extend( func() ) |
||||
query_string = query_string.strip() |
||||
if not query_string: |
||||
return jsonify( results ) |
||||
|
||||
# return all publishers |
||||
query = Publisher.query |
||||
if query_string: |
||||
query = query.filter( |
||||
Publisher.publ_name.ilike( "%{}%".format( query_string ) ) |
||||
) |
||||
query = query.order_by( Publisher.publ_name.asc() ) |
||||
publishers = query.all() |
||||
_logger.debug( "- Found: %s", " ; ".join( str(p) for p in publishers ) ) |
||||
for publ in publishers: |
||||
publ = get_publisher_vals( publ ) |
||||
publ["type"] = "publisher" |
||||
results.append( publ ) |
||||
|
||||
# return all publications |
||||
query = Publication.query |
||||
if query_string: |
||||
query = query.filter( |
||||
Publication.pub_name.ilike( "%{}%".format( query_string ) ) |
||||
) |
||||
query = query.order_by( Publication.pub_name.asc() ) |
||||
publications = query.all() |
||||
_logger.debug( "- Found: %s", " ; ".join( str(p) for p in publications ) ) |
||||
for pub in publications: |
||||
pub = get_publication_vals( pub ) |
||||
pub[ "type" ] = "publication" |
||||
results.append( pub ) |
||||
|
||||
# return all articles |
||||
query = Article.query |
||||
if query_string: |
||||
query = query.filter( |
||||
Article.article_title.ilike( "%{}%".format( query_string ) ) |
||||
# prepare the query |
||||
fts_query_string = _make_fts_query_string( query_string ) |
||||
_logger.debug( "FTS query string: %s", fts_query_string ) |
||||
|
||||
# NOTE: We would like to cache the connection, but SQLite connections can only be used |
||||
# in the same thread they were created in. |
||||
with SearchDbConn() as dbconn: |
||||
|
||||
# run the search |
||||
hilites = [ "", "" ] if no_hilite else [ BEGIN_HILITE, END_HILITE ] |
||||
def highlight( n ): |
||||
return "highlight( searchable, {}, '{}', '{}' )".format( |
||||
n, hilites[0], hilites[1] |
||||
) |
||||
sql = "SELECT owner,rank,{}, {}, {}, {}, {}, {} FROM searchable" \ |
||||
" WHERE searchable MATCH ?" \ |
||||
" ORDER BY rank".format( |
||||
highlight(1), highlight(2), highlight(3), highlight(4), highlight(5), highlight(6) |
||||
) |
||||
curs = dbconn.conn.execute( sql, |
||||
( "{name name2 description authors scenarios tags}: " + fts_query_string, ) |
||||
) |
||||
query = query.order_by( Article.article_title.asc() ) |
||||
articles = query.all() |
||||
_logger.debug( "- Found: %s", " ; ".join( str(a) for a in articles ) ) |
||||
for article in articles: |
||||
article = get_article_vals( article ) |
||||
article[ "type" ] = "article" |
||||
results.append( article ) |
||||
|
||||
# get the results |
||||
for row in curs: |
||||
|
||||
# get the next result |
||||
owner_type, owner_id = row[0].split( ":" ) |
||||
model = get_model_from_table_name( owner_type ) |
||||
obj = model.query.get( owner_id ) |
||||
_logger.debug( "- {} ({:.3f})".format( obj, row[1] ) ) |
||||
|
||||
# prepare the result for the front-end |
||||
result = globals()[ "get_{}_vals".format( owner_type ) ]( obj ) |
||||
result[ "type" ] = owner_type |
||||
|
||||
# return highlighted versions of the content to the caller |
||||
fields = _FIELD_MAPPINGS[ owner_type ] |
||||
for col_no,col_name in enumerate(["name","name2","description"]): |
||||
field = fields.get( col_name ) |
||||
if not field: |
||||
continue |
||||
if row[2+col_no] and BEGIN_HILITE in row[2+col_no]: |
||||
# NOTE: We have to return both the highlighted and non-highlighted versions, since the front-end |
||||
# will show the highlighted version in the search results, but the non-highlighted version elsewhere |
||||
# e.g. an article's title in the titlebar of its edit dialog. |
||||
result[ field+"!" ] = row[ 2+col_no ] |
||||
if row[5] and BEGIN_HILITE in row[5]: |
||||
result[ "authors!" ] = row[5].split( "\n" ) |
||||
if row[6] and BEGIN_HILITE in row[6]: |
||||
result[ "scenarios!" ] = [ s.split("\t") for s in row[6].split("\n") ] |
||||
if row[7] and BEGIN_HILITE in row[7]: |
||||
result[ "tags!" ] = row[7].split( "\n" ) |
||||
|
||||
# add the result to the list |
||||
results.append( result ) |
||||
|
||||
return jsonify( results ) |
||||
|
||||
def _make_fts_query_string( query_string ): |
||||
"""Generate the SQLite query string.""" |
||||
|
||||
# check if this looks like a raw FTS query |
||||
if any( regex.search(query_string) for regex in _PASSTHROUGH_REGEXES ): |
||||
return query_string |
||||
|
||||
# split the query string (taking into account quoted phrases) |
||||
words = query_string.split() |
||||
i = 0 |
||||
while True: |
||||
if i >= len(words): |
||||
break |
||||
if i > 0 and words[i-1].startswith('"'): |
||||
words[i-1] += " {}".format( words[i] ) |
||||
del words[i] |
||||
if words[i-1].startswith('"') and words[i-1].endswith('"'): |
||||
words[i-1] = words[i-1][1:-1] |
||||
continue |
||||
i += 1 |
||||
|
||||
# clean up quoted phrases |
||||
words = [ w[1:] if w.startswith('"') else w for w in words ] |
||||
words = [ w[:-1] if w.endswith('"') else w for w in words ] |
||||
words = [ w.strip() for w in words ] |
||||
words = [ w for w in words if w ] |
||||
|
||||
# quote any phrases that need it |
||||
def has_special_char( word ): |
||||
return any( ch in word for ch in _SQLITE_FTS_SPECIAL_CHARS+" " ) |
||||
def quote_word( word ): |
||||
return '"{}"'.format(word) if has_special_char(word) else word |
||||
words = [ quote_word(w) for w in words ] |
||||
|
||||
# escape any special characters |
||||
words = [ w.replace("'","''") for w in words ] |
||||
|
||||
return " AND ".join( words ) |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def init_search( session, logger ): |
||||
"""Initialize the search engine.""" |
||||
|
||||
# initialize the database |
||||
global _search_index_path |
||||
_search_index_path = app.config.get( "SEARCH_INDEX_PATH" ) |
||||
if not _search_index_path: |
||||
# FUDGE! We should be able to create a shared, in-memory database using this: |
||||
# file::memory:?mode=memory&cache=shared |
||||
# but it doesn't seem to work (on Linux) and ends up creating a file with this name :-/ |
||||
# We manually create a temp file, which has to have the same name each time, so that we don't |
||||
# keep creating a new database each time we start up. Sigh... |
||||
_search_index_path = os.path.join( tempfile.gettempdir(), "asl-articles.searchdb" ) |
||||
if os.path.isfile( _search_index_path ): |
||||
os.unlink( _search_index_path ) |
||||
|
||||
logger.info( "Creating search index: %s", _search_index_path ) |
||||
with SearchDbConn() as dbconn: |
||||
|
||||
# NOTE: We would like to make "owner" the primary key, but FTS doesn't support primary keys |
||||
# (nor UNIQUE constraints), so we have to manage this manually :-( |
||||
dbconn.conn.execute( |
||||
"CREATE VIRTUAL TABLE searchable USING fts5" |
||||
" ( owner, name, name2, description, authors, scenarios, tags, tokenize='porter unicode61' )" |
||||
) |
||||
|
||||
# load the searchable content |
||||
logger.debug( "Loading the search index..." ) |
||||
logger.debug( "- Loading publishers." ) |
||||
for publ in session.query( Publisher ): |
||||
add_or_update_publisher( dbconn, publ ) |
||||
logger.debug( "- Loading publications." ) |
||||
for pub in session.query( Publication ): |
||||
add_or_update_publication( dbconn, pub ) |
||||
logger.debug( "- Loading articles." ) |
||||
for article in session.query( Article ): |
||||
add_or_update_article( dbconn, article ) |
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
||||
|
||||
def add_or_update_publisher( dbconn, publ ): |
||||
"""Add/update a publisher in the search index.""" |
||||
_do_add_or_update_searchable( dbconn, "publisher", |
||||
_make_publisher_key(publ), publ |
||||
) |
||||
|
||||
def add_or_update_publication( dbconn, pub ): |
||||
"""Add/update a publication in the search index.""" |
||||
_do_add_or_update_searchable( dbconn, "publication", |
||||
_make_publication_key(pub.pub_id), pub |
||||
) |
||||
|
||||
def add_or_update_article( dbconn, article ): |
||||
"""Add/update an article in the search index.""" |
||||
_do_add_or_update_searchable( dbconn, "article", |
||||
_make_article_key(article.article_id), article |
||||
) |
||||
|
||||
def _do_add_or_update_searchable( dbconn, owner_type, owner, obj ): |
||||
"""Add or update a record in the search index.""" |
||||
|
||||
# prepare the fields |
||||
fields = _FIELD_MAPPINGS[ owner_type ] |
||||
vals = { |
||||
f: getattr( obj,fields[f] ) if isinstance( fields[f], str ) else fields[f]( obj ) |
||||
for f in fields |
||||
} |
||||
vals = { |
||||
k: clean_html( v, allow_tags=[], safe_attrs=[] ) |
||||
for k,v in vals.items() |
||||
} |
||||
|
||||
def do_add_or_update( dbconn ): |
||||
dbconn.conn.execute( "INSERT INTO searchable" |
||||
" ( owner, name, name2, description, authors, scenarios, tags )" |
||||
" VALUES (?,?,?,?,?,?,?)", ( |
||||
owner, |
||||
vals.get("name"), vals.get("name2"), vals.get("description"), |
||||
vals.get("authors"), vals.get("scenarios"), vals.get("tags") |
||||
) ) |
||||
|
||||
# update the database |
||||
if dbconn: |
||||
# NOTE: If we are passed a connection to use, we assume we are starting up and are doing |
||||
# the initial build of the search index, and therefore don't need to check for an existing row. |
||||
# The caller is responsible for committing the transaction. |
||||
do_add_or_update( dbconn ) |
||||
else: |
||||
with SearchDbConn() as dbconn2: |
||||
# NOTE: Because we can't have a UNIQUE constraint on "owner", we can't use UPSERT nor INSERT OR UPDATE, |
||||
# so we have to delete any existing row manually, then insert :-/ |
||||
_logger.debug( "Updating searchable: %s", owner ) |
||||
_logger.debug( "- %s", " ; ".join( "{}=\"{}\"".format( k, repr(v) ) for k,v in vals.items() if v ) ) |
||||
dbconn2.conn.execute( "DELETE FROM searchable WHERE owner = ?", (owner,) ) |
||||
do_add_or_update( dbconn2 ) |
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
||||
|
||||
def delete_publishers( publs ): |
||||
"""Remove publishers from the search index.""" |
||||
with SearchDbConn() as dbconn: |
||||
for publ in publs: |
||||
_do_delete_searchable( dbconn, _make_publisher_key( publ ) ) |
||||
|
||||
def delete_publications( pubs ): |
||||
"""Remove publications from the search index.""" |
||||
with SearchDbConn() as dbconn: |
||||
for pub in pubs: |
||||
_do_delete_searchable( dbconn, _make_publication_key( pub ) ) |
||||
|
||||
def delete_articles( articles ): |
||||
"""Remove articles from the search index.""" |
||||
with SearchDbConn() as dbconn: |
||||
for article in articles: |
||||
_do_delete_searchable( dbconn, _make_article_key( article ) ) |
||||
|
||||
def _do_delete_searchable( dbconn, owner ): |
||||
"""Remove an entry from the search index.""" |
||||
dbconn.conn.execute( "DELETE FROM searchable WHERE owner = ?", (owner,) ) |
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
||||
|
||||
def _make_publisher_key( publ ): |
||||
"""Generate the owner key for a Publisher.""" |
||||
return "publisher:{}".format( publ.publ_id if isinstance(publ,Publisher) else publ ) |
||||
|
||||
def _make_publication_key( pub ): |
||||
"""Generate the owner key for a Publication.""" |
||||
return "publication:{}".format( pub.pub_id if isinstance(pub,Publication) else pub ) |
||||
|
||||
def _make_article_key( article ): |
||||
"""Generate the owner key for an Article.""" |
||||
return "article:{}".format( article.article_id if isinstance(article,Article) else article ) |
||||
|
@ -0,0 +1,96 @@ |
||||
{ |
||||
|
||||
"publisher": [ |
||||
{ "publ_id": 1, |
||||
"publ_name": "Multi-Man Publishing", |
||||
"publ_description": "Designers and producers of Advanced Squad Leader and other fine wargames." |
||||
}, |
||||
{ "publ_id": 2, |
||||
"publ_name": "View From The Trenches", |
||||
"publ_description": "Britain's Premier ASL Journal", |
||||
"publ_url": "http://vftt.co.uk" |
||||
} |
||||
], |
||||
|
||||
"publication": [ |
||||
{ "pub_id": 10, |
||||
"pub_name": "ASL Journal", |
||||
"pub_edition": 4, |
||||
"pub_tags": "aslj", |
||||
"publ_id": 1 |
||||
}, |
||||
{ "pub_id": 11, |
||||
"pub_name": "ASL Journal", |
||||
"pub_edition": 5, |
||||
"pub_tags": "aslj", |
||||
"publ_id": 1 |
||||
}, |
||||
{ "pub_id": 12, |
||||
"pub_name": "View From The Trenches", |
||||
"pub_edition": 100, |
||||
"pub_description": "Fantastic 100th issue!", |
||||
"pub_tags": "vftt", |
||||
"publ_id": 2 |
||||
} |
||||
], |
||||
|
||||
"article": [ |
||||
{ "article_id": 500, |
||||
"article_title": "Hit 'Em High, Or Hit 'Em Low", |
||||
"article_subtitle": "Some things about light mortars you might like to know", |
||||
"article_snippet": "Light mortars in ASL can be game winners depending on what they can shoot at, how low you roll and how often you get rate.", |
||||
"article_tags": "aslj\nmortars", |
||||
"pub_id": 10 |
||||
}, |
||||
{ "article_id": 501, |
||||
"article_title": "'Bolts From Above", |
||||
"article_snippet": "Infantry often found itself battling the elements as well as the enemy. ASL has made provisions for the inclusion of inclement weather conditions, such as rain and snow.", |
||||
"article_tags": "aslj\nweather", |
||||
"pub_id": 10 |
||||
}, |
||||
{ "article_id": 510, |
||||
"article_title": "The Jungle Isn't Neutral", |
||||
"article_subtitle": "Up close and personal in the PTO", |
||||
"article_snippet": "British Lieutenant Colonel F. Spencer Chapman wrote a memoir of jungle fighting in Malaysia titled \"The Jungle Is Neutral.\"", |
||||
"article_tags": "aslj\nPTO", |
||||
"pub_id": 11 |
||||
}, |
||||
{ "article_id": 511, |
||||
"article_title": "Hunting DUKWs and Buffalos", |
||||
"article_subtitle": "Scenario Analysis: HS17 \"Water Foul\"", |
||||
"article_snippet": "This scenario features a late-war Canadian assault on a German-occupied flooded town - an unusual tactical challenge in ASL.", |
||||
"article_tags": "aslj", |
||||
"pub_id": 11 |
||||
}, |
||||
{ "article_id": 520, |
||||
"article_title": "Jagdpanzer 38(t) Hetzer", |
||||
"article_snippet": "In the 1930s the Germans conducted a number of military exercises which showed that close support from light field guns was helpful for infantry operations.", |
||||
"pub_id": 12 |
||||
} |
||||
], |
||||
|
||||
"article_author": [ |
||||
{ "seq_no": 1, "article_id": 500, "author_id": 1000 }, |
||||
{ "seq_no": 1, "article_id": 510, "author_id": 1001 }, |
||||
{ "seq_no": 1, "article_id": 511, "author_id": 1002 }, |
||||
{ "seq_no": 1, "article_id": 520, "author_id": 1003 } |
||||
], |
||||
|
||||
"author": [ |
||||
{ "author_id": 1000, "author_name": "Simon Spinetti" }, |
||||
{ "author_id": 1001, "author_name": "Mark Pitcavage" }, |
||||
{ "author_id": 1002, "author_name": "Oliver Giancola" }, |
||||
{ "author_id": 1003, "author_name": "Michael Davies" } |
||||
], |
||||
|
||||
"article_scenario": [ |
||||
{ "seq_no": 1, "article_id": 511, "scenario_id": 2000 }, |
||||
{ "seq_no": 1, "article_id": 511, "scenario_id": 2001 } |
||||
], |
||||
|
||||
"scenario": [ |
||||
{ "scenario_id": 2000, "scenario_display_id": "HS17", "scenario_name": "Water Foul" }, |
||||
{ "scenario_id": 2001, "scenario_name": "No Scenario ID" } |
||||
] |
||||
|
||||
} |
@ -1,39 +0,0 @@ |
||||
""" Basic tests. """ |
||||
|
||||
from asl_articles.tests.utils import init_tests, do_search, find_child |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def test_basic( webdriver, flask_app, dbconn ): |
||||
"""Basic tests.""" |
||||
|
||||
# initialize |
||||
init_tests( webdriver, flask_app, dbconn, fixtures="basic.json" ) |
||||
|
||||
# make sure the home page loaded correctly |
||||
elem = find_child( "#search-form .caption" ) |
||||
assert elem.text == "Search for:" |
||||
|
||||
# run some test searches |
||||
def do_test( query, expected ): |
||||
results = do_search( query ) |
||||
def get_href( r ): |
||||
elem = find_child( ".name a", r ) |
||||
return elem.get_attribute( "href" ) if elem else "" |
||||
results = [ ( |
||||
find_child( ".name", r ).text, |
||||
find_child( ".description", r ).text, |
||||
get_href( r ) |
||||
) for r in results ] |
||||
assert results == expected |
||||
do_test( "publish", [ ("Multiman Publishing","","http://mmp.com/") ] ) |
||||
do_test( "foo", [] ) |
||||
do_test( " ", [ |
||||
( "Avalon Hill", "AH description" , "http://ah.com/" ), |
||||
( "Le Franc Tireur", "The French guys.", "" ), |
||||
( "Multiman Publishing", "", "http://mmp.com/" ) |
||||
] ) |
||||
do_test( " H ", [ |
||||
( "Avalon Hill", "AH description" , "http://ah.com/" ), |
||||
( "Multiman Publishing", "", "http://mmp.com/" ) |
||||
] ) |
@ -0,0 +1,376 @@ |
||||
""" Test search operations. """ |
||||
|
||||
from asl_articles.search import SearchDbConn, _make_fts_query_string |
||||
|
||||
from asl_articles.tests.test_publishers import create_publisher, edit_publisher |
||||
from asl_articles.tests.test_publications import create_publication, edit_publication |
||||
from asl_articles.tests.test_articles import create_article, edit_article |
||||
from asl_articles.tests.utils import init_tests, wait_for_elem, find_child, find_children, check_ask_dialog, \ |
||||
do_search, get_result_names, find_search_result |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def test_search_publishers( webdriver, flask_app, dbconn ): |
||||
"""Test searching publishers.""" |
||||
|
||||
# initialize |
||||
init_tests( webdriver, flask_app, dbconn ) |
||||
|
||||
# test searching publisher names/descriptions |
||||
_do_test_searches( ["hill","original"], [] ) |
||||
create_publisher( { |
||||
"name": "Avalon Hill", "description": "The original ASL vendor." |
||||
} ) |
||||
_do_test_searches( ["hill","original"], ["Avalon Hill"] ) |
||||
|
||||
# edit the publisher |
||||
sr = find_search_result( "Avalon Hill" ) |
||||
edit_publisher( sr, { |
||||
"name": "Avalon Mountain", "description": "The first ASL vendor." |
||||
} ) |
||||
_do_test_searches( ["hill","original"], [] ) |
||||
_do_test_searches( ["mountain","first"], ["Avalon Mountain"] ) |
||||
|
||||
# delete the publisher |
||||
sr = find_search_result( "Avalon Mountain" ) |
||||
find_child( ".delete", sr ).click() |
||||
check_ask_dialog( "Delete this publisher?", "ok" ) |
||||
_do_test_searches( ["hill","original","mountain","first"], [] ) |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def test_search_publications( webdriver, flask_app, dbconn ): |
||||
"""Test searching publications.""" |
||||
|
||||
# initialize |
||||
init_tests( webdriver, flask_app, dbconn ) |
||||
|
||||
# test searching publication names/descriptions |
||||
_do_test_searches( ["journal","good"], [] ) |
||||
create_publication( { |
||||
"name": "ASL Journal", "description": "A pretty good magazine." |
||||
} ) |
||||
_do_test_searches( ["journal","good"], ["ASL Journal"] ) |
||||
|
||||
# edit the publication |
||||
sr = find_search_result( "ASL Journal" ) |
||||
edit_publication( sr, { |
||||
"name": "ASL Magazine", "description": "Not a bad magazine." |
||||
} ) |
||||
_do_test_searches( ["journal","good"], [] ) |
||||
_do_test_searches( ["magazine","bad"], ["ASL Magazine"] ) |
||||
|
||||
# delete the publication |
||||
sr = find_search_result( "ASL Magazine" ) |
||||
find_child( ".delete", sr ).click() |
||||
check_ask_dialog( "Delete this publication?", "ok" ) |
||||
_do_test_searches( ["journal","good","magazine","bad"], [] ) |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def test_search_articles( webdriver, flask_app, dbconn ): |
||||
"""Test searching articles.""" |
||||
|
||||
# initialize |
||||
init_tests( webdriver, flask_app, dbconn ) |
||||
|
||||
# test searching article titles/subtitles/snippets |
||||
_do_test_searches( ["low","some","game"], [] ) |
||||
create_article( { |
||||
"title": "Hit 'Em High, Or Hit 'Em Low", |
||||
"subtitle": "Some things about light mortars you might like to know", |
||||
"snippet": "Light mortars in ASL can be game winners." |
||||
} ) |
||||
_do_test_searches( ["low","some","game"], ["Hit 'Em High, Or Hit 'Em Low"] ) |
||||
|
||||
# edit the article |
||||
sr = find_search_result( "Hit 'Em High, Or Hit 'Em Low" ) |
||||
edit_article( sr, { |
||||
"title": "Hit 'Em Hard", |
||||
"subtitle": "Where it hurts!", |
||||
"snippet": "Always the best way to do things." |
||||
} ) |
||||
_do_test_searches( ["low","some","game"], [] ) |
||||
_do_test_searches( ["hard","hurt","best"], ["Hit 'Em Hard"] ) |
||||
|
||||
# delete the article |
||||
sr = find_search_result( "Hit 'Em Hard" ) |
||||
find_child( ".delete", sr ).click() |
||||
check_ask_dialog( "Delete this article?", "ok" ) |
||||
_do_test_searches( ["hard","hurt","best"], [] ) |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def test_search_authors( webdriver, flask_app, dbconn ): |
||||
"""Test searching for authors.""" |
||||
|
||||
# initialize |
||||
init_tests( webdriver, flask_app, dbconn, fixtures="search.json" ) |
||||
|
||||
# search for some authors |
||||
_do_test_search( "pitcavage", ["The Jungle Isn't Neutral"] ) |
||||
_do_test_search( "davie", ["Jagdpanzer 38(t) Hetzer"] ) |
||||
_do_test_search( "pit* dav*", [] ) # nb: implied AND |
||||
_do_test_search( "pit* OR dav*", ["The Jungle Isn't Neutral","Jagdpanzer 38(t) Hetzer"] ) |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def test_search_scenarios( webdriver, flask_app, dbconn ): |
||||
"""Test searching for scenarios.""" |
||||
|
||||
# initialize |
||||
init_tests( webdriver, flask_app, dbconn, fixtures="search.json" ) |
||||
|
||||
# search for some scenarios |
||||
_do_test_search( "foul", ["Hunting DUKWs and Buffalos"] ) |
||||
_do_test_search( "hs17", ["Hunting DUKWs and Buffalos"] ) |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def test_search_tags( webdriver, flask_app, dbconn ): |
||||
"""Test searching for tags.""" |
||||
|
||||
# initialize |
||||
init_tests( webdriver, flask_app, dbconn, fixtures="search.json" ) |
||||
|
||||
# search for some publication tags |
||||
_do_test_search( "vftt", ["View From The Trenches (100)"] ) |
||||
|
||||
# search for some article tags |
||||
_do_test_search( "pto", ["The Jungle Isn't Neutral"] ) |
||||
_do_test_search( "aslj", [ |
||||
"ASL Journal (4)", "ASL Journal (5)", |
||||
"'Bolts From Above", "The Jungle Isn't Neutral", "Hunting DUKWs and Buffalos", "Hit 'Em High, Or Hit 'Em Low" |
||||
] ) |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def test_empty_search( webdriver, flask_app, dbconn ): |
||||
"""Test handling of an empty search string.""" |
||||
|
||||
# initialize |
||||
init_tests( webdriver, flask_app, dbconn, fixtures="search.json" ) |
||||
|
||||
# search for an empty string |
||||
form = find_child( "#search-form" ) |
||||
find_child( ".query", form ).send_keys( " " ) |
||||
find_child( "button[type='submit']", form ).click() |
||||
dlg = wait_for_elem( 2, "#ask" ) |
||||
assert find_child( ".MuiDialogContent-root", dlg ).text == "Please enter something to search for." |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def test_multiple_search_results( webdriver, flask_app, dbconn ): |
||||
"""Test more complicated search queries.""" |
||||
|
||||
# initialize |
||||
init_tests( webdriver, flask_app, dbconn, fixtures="search.json" ) |
||||
|
||||
# do a search |
||||
_do_test_search( "asl", [ |
||||
"View From The Trenches", |
||||
"ASL Journal (4)", "ASL Journal (5)", |
||||
"Hunting DUKWs and Buffalos", "'Bolts From Above", "Hit 'Em High, Or Hit 'Em Low" |
||||
] ) |
||||
|
||||
# do some searches |
||||
_do_test_search( "infantry", [ |
||||
"'Bolts From Above", "Jagdpanzer 38(t) Hetzer" |
||||
] ) |
||||
_do_test_search( "infantry OR mortar", [ |
||||
"'Bolts From Above", "Jagdpanzer 38(t) Hetzer", |
||||
"Hit 'Em High, Or Hit 'Em Low" |
||||
] ) |
||||
_do_test_search( "infantry AND mortar", [] ) |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def test_highlighting( webdriver, flask_app, dbconn ): |
||||
"""Test highlighting search matches.""" |
||||
|
||||
# initialize |
||||
init_tests( webdriver, flask_app, dbconn, fixtures="search.json", no_sr_hilite=0 ) |
||||
|
||||
def find_highlighted( elems ): |
||||
results = [] |
||||
for e in elems if isinstance(elems,list) else [elems]: |
||||
results.extend( c.text for c in find_children( ".hilite", e ) ) |
||||
return results |
||||
|
||||
# test highlighting in publisher search results |
||||
results = _do_test_search( "view britain", ["View From The Trenches"] ) |
||||
sr = results[0] |
||||
assert find_highlighted( find_child( ".name span", sr ) ) == [ "View" ] |
||||
assert find_highlighted( find_child( ".description", sr ) ) == [ "Britain" ] |
||||
|
||||
def check_publication_highlights( query, expected, name, description, tags ): |
||||
results = _do_test_search( query, [expected] ) |
||||
assert len(results) == 1 |
||||
sr = results[0] |
||||
assert find_highlighted( find_child( ".name span", sr ) ) == name |
||||
assert find_highlighted( find_child( ".description", sr ) ) == description |
||||
assert find_highlighted( find_children( ".tag", sr ) ) == tags |
||||
|
||||
# test highlighting in publication search results |
||||
check_publication_highlights( "view fantastic", |
||||
"View From The Trenches (100)", |
||||
["View"], ["Fantastic"], [] |
||||
) |
||||
check_publication_highlights( "vftt", |
||||
"View From The Trenches (100)", |
||||
[], [], ["vftt"] |
||||
) |
||||
|
||||
def check_article_highlights( query, expected, title, subtitle, snippet, authors, scenarios, tags ): |
||||
results = _do_test_search( query, [expected] ) |
||||
assert len(results) == 1 |
||||
sr = results[0] |
||||
assert find_highlighted( find_child( ".title span", sr ) ) == title |
||||
assert find_highlighted( find_child( ".subtitle", sr ) ) == subtitle |
||||
assert find_highlighted( find_child( ".snippet", sr ) ) == snippet |
||||
assert find_highlighted( find_children( ".author", sr ) ) == authors |
||||
assert find_highlighted( find_children( ".scenario", sr ) ) == scenarios |
||||
assert find_highlighted( find_children( ".tag", sr ) ) == tags |
||||
|
||||
# test highlighting in article search results |
||||
check_article_highlights( "hit light mortar", |
||||
"Hit 'Em High, Or Hit 'Em Low", |
||||
["Hit","Hit"], ["light","mortars"], ["Light","mortars"], [], [], ["mortars"] |
||||
) |
||||
|
||||
# repeat the article search using a quoted phrase |
||||
check_article_highlights( '"light mortar"', |
||||
"Hit 'Em High, Or Hit 'Em Low", |
||||
[], ["light mortars"], ["Light mortars"], [], [], [] |
||||
) |
||||
|
||||
# test highlighting in article authors |
||||
check_article_highlights( "pitcav*", |
||||
"The Jungle Isn't Neutral", |
||||
[], [], [], ["Pitcavage"], [], [] |
||||
) |
||||
|
||||
# test highlighting in article scenario names |
||||
check_article_highlights( "foul", |
||||
"Hunting DUKWs and Buffalos", |
||||
[], ["Foul"], [], [], ["Foul"], [] |
||||
) |
||||
|
||||
# test highlighting in article scenario ID's |
||||
check_article_highlights( "hs17", |
||||
"Hunting DUKWs and Buffalos", |
||||
[], ["HS17"], [], [], ["HS17"], [] |
||||
) |
||||
|
||||
# test highlighting in article tags |
||||
check_article_highlights( "pto", |
||||
"The Jungle Isn't Neutral", |
||||
[], ["PTO"], [], [], [], ["PTO"] |
||||
) |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def test_html_stripping( webdriver, flask_app, dbconn ): |
||||
"""Test HTML stripping of searchable content.""" |
||||
|
||||
# initialize |
||||
init_tests( webdriver, flask_app, dbconn ) |
||||
|
||||
# create objects with HTML content |
||||
create_publisher( { |
||||
"name": "A <b>bold</b> publisher", |
||||
"description": "This is some <b>bold text</b>, this is <i>italic</i>." |
||||
} ) |
||||
create_publication( { |
||||
"name": "A <b>bold</b> publication", |
||||
"edition": "75<u>L</u>", |
||||
"description": "This is some <b>bold text</b>, this is <i>italic</i>.", |
||||
"tags": [ "+<b>bold</b>", "+<i>italic</i>" ] |
||||
} ) |
||||
create_article( { |
||||
"title": "An <i>italic</i> article", |
||||
"subtitle": "A <b>bold</b> subtitle", |
||||
"authors": [ "+Joe <u>Underlined</u>" ], |
||||
"tags": [ "+<b>bold</b>", "+<i>italic</i>" ], |
||||
"scenarios": [ "+<b>bold</b> [B1]", "+<i>italic</i> [I1]" ], |
||||
"snippet": "This is some <b>bold text</b>, this is <i>italic</i>." |
||||
} ) |
||||
|
||||
# check if the search index contains any HTML |
||||
def is_html_clean( val ): |
||||
return "<" not in val and ">" not in val if val else True |
||||
with SearchDbConn() as dbconn2: |
||||
curs = dbconn2.conn.execute( "SELECT * FROM searchable" ) |
||||
for row in curs: |
||||
assert all( is_html_clean(v) for v in row ) |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def test_make_fts_query_string(): |
||||
"""Test generating FTS query strings.""" |
||||
|
||||
def do_test( query, expected ): |
||||
assert _make_fts_query_string(query) == expected |
||||
|
||||
# test some query strings |
||||
do_test( "", "" ) |
||||
do_test( "hello", "hello" ) |
||||
do_test( " hello, world! ", "hello, AND world!" ) |
||||
do_test( |
||||
"foo 1+2 A-T K# bar", |
||||
'foo AND "1+2" AND "A-T" AND "K#" AND bar' |
||||
) |
||||
do_test( |
||||
"a'b a''b", |
||||
"\"a''b\" AND \"a''''b\"" |
||||
) |
||||
do_test( |
||||
'foo "set dc" bar', |
||||
'foo AND "set dc" AND bar' |
||||
) |
||||
|
||||
# test some quoted phrases |
||||
do_test( '""', '' ) |
||||
do_test( ' " " ', '' ) |
||||
do_test( |
||||
'"hello world"', |
||||
'"hello world"' |
||||
) |
||||
do_test( |
||||
' foo "hello world" bar ', |
||||
'foo AND "hello world" AND bar' |
||||
) |
||||
do_test( |
||||
' foo " xyz " bar ', |
||||
'foo AND xyz AND bar' |
||||
) |
||||
do_test( |
||||
' foo " xyz 123 " bar ', |
||||
'foo AND "xyz 123" AND bar' |
||||
) |
||||
|
||||
# test some incorrectly quoted phrases |
||||
do_test( '"', '' ) |
||||
do_test( ' " " " ', '' ) |
||||
do_test( ' a "b c d e', 'a AND "b c d e"' ) |
||||
do_test( ' a b" c d e ', 'a AND b AND c AND d AND e' ) |
||||
|
||||
# test pass-through |
||||
do_test( "AND", "AND" ) |
||||
do_test( "OR", "OR" ) |
||||
do_test( "NOT", "NOT" ) |
||||
do_test( "foo OR bar", "foo OR bar" ) |
||||
do_test( "(a OR b)", "(a OR b)" ) |
||||
|
||||
# --------------------------------------------------------------------- |
||||
|
||||
def _do_test_search( query, expected ): |
||||
"""Run a search and check the results.""" |
||||
results = do_search( query ) |
||||
assert set( get_result_names( results ) ) == set( expected ) |
||||
return results |
||||
|
||||
def _do_test_searches( queries, expected ): |
||||
"""Run searches and check the results.""" |
||||
for query in queries: |
||||
_do_test_search( query, expected ) |
Loading…
Reference in new issue