Documents now have their own dedicated DocumentPage submodel. The old DocumentPage is now called DocumentVersionPage. This allows mappings between document pages and document version pages, allowing renumbering, appending pages. DocumentPages have a content_object to map them to any other object. For now they only map to DocumentVersionPages. New option added to the version upload form to append the pages of the new version. A new view was added to just append new pages with wraps the new document version upload form and hides the append pages checkbox set to True. Add a new action, reset_pages to reset the pages of the document to those of the latest version. Missing: appending tests, checks for proper content_object in OCR and document parsing. Author: Roberto Rosario <roberto.rosario@mayan-edms.com> Date: Thu Oct 11 12:00:25 2019 -0400
356 lines
11 KiB
Python
356 lines
11 KiB
Python
from __future__ import absolute_import, unicode_literals
|
|
|
|
import logging
|
|
|
|
from django.apps import apps
|
|
from django.db.models import Q
|
|
from django.utils.encoding import force_text, python_2_unicode_compatible
|
|
from django.utils.module_loading import import_string
|
|
from django.utils.translation import ugettext as _
|
|
|
|
from mayan.apps.common.literals import LIST_MODE_CHOICE_LIST
|
|
|
|
from .literals import (
|
|
QUERY_OPERATION_AND, QUERY_OPERATION_OR, TERM_NEGATION_CHARACTER,
|
|
TERM_OPERATION_OR, TERM_OPERATIONS, TERM_QUOTES, TERM_SPACE_CHARACTER
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class FieldQuery(object):
|
|
def __init__(self, search_field, search_term_collection):
|
|
query_operation = QUERY_OPERATION_AND
|
|
self.query = None
|
|
self.parts = []
|
|
|
|
for term in search_term_collection.terms:
|
|
if term.is_meta:
|
|
# It is a meta term, modifies the query operation
|
|
# and is not searched
|
|
if term.string == TERM_OPERATION_OR:
|
|
query_operation = QUERY_OPERATION_OR
|
|
else:
|
|
if search_field.transformation_function:
|
|
term_string = search_field.transformation_function(
|
|
term_string=term.string
|
|
)
|
|
else:
|
|
term_string = term.string
|
|
|
|
q_object = Q(
|
|
**{'%s__%s' % (search_field.field, 'icontains'): term_string}
|
|
)
|
|
if term.negated:
|
|
q_object = ~q_object
|
|
|
|
if self.query is None:
|
|
self.query = q_object
|
|
else:
|
|
if query_operation == QUERY_OPERATION_AND:
|
|
self.query = self.query & q_object
|
|
else:
|
|
self.query = self.query | q_object
|
|
|
|
if not term.is_meta:
|
|
self.parts.append(force_text(search_field.label))
|
|
self.parts.append(force_text(term))
|
|
else:
|
|
self.parts.append(term.string)
|
|
|
|
def __str__(self):
|
|
return ' '.join(self.parts)
|
|
|
|
|
|
class SearchField(object):
|
|
"""
|
|
Search for terms in fields that directly belong to the parent SearchModel
|
|
"""
|
|
def __init__(self, search_model, field, label, transformation_function=None):
|
|
self.search_model = search_model
|
|
self.field = field
|
|
self.label = label
|
|
self.transformation_function = transformation_function
|
|
|
|
def get_full_name(self):
|
|
return self.field
|
|
|
|
def get_model(self):
|
|
return self.search_model.model
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class SearchModel(object):
|
|
_registry = {}
|
|
|
|
@classmethod
|
|
def all(cls):
|
|
return sorted(list(cls._registry.values()), key=lambda x: x.label)
|
|
|
|
@classmethod
|
|
def as_choices(cls):
|
|
return cls._registry
|
|
|
|
@classmethod
|
|
def get(cls, name):
|
|
try:
|
|
result = cls._registry[name]
|
|
except KeyError:
|
|
raise KeyError(_('No search model matching the query'))
|
|
if not hasattr(result, 'serializer'):
|
|
result.serializer = import_string(result.serializer_path)
|
|
|
|
return result
|
|
|
|
def __init__(
|
|
self, app_label, model_name, serializer_path, label=None,
|
|
list_mode=None, permission=None, queryset=None
|
|
):
|
|
self.app_label = app_label
|
|
self.list_mode = list_mode or LIST_MODE_CHOICE_LIST
|
|
self.model_name = model_name
|
|
self.search_fields = []
|
|
self._model = None # Lazy
|
|
self._label = label
|
|
self.serializer_path = serializer_path
|
|
self.permission = permission
|
|
self.queryset = queryset
|
|
self.__class__._registry[self.get_full_name()] = self
|
|
|
|
def __str__(self):
|
|
return force_text(self.label)
|
|
|
|
def add_model_field(self, *args, **kwargs):
|
|
"""
|
|
Add a search field that directly belongs to the parent SearchModel
|
|
"""
|
|
search_field = SearchField(self, *args, **kwargs)
|
|
self.search_fields.append(search_field)
|
|
|
|
def get_fields_simple_list(self):
|
|
"""
|
|
Returns a list of the fields for the SearchModel
|
|
"""
|
|
result = []
|
|
for search_field in self.search_fields:
|
|
result.append((search_field.get_full_name(), search_field.label))
|
|
|
|
return result
|
|
|
|
def get_full_name(self):
|
|
return '%s.%s' % (self.app_label, self.model_name)
|
|
|
|
def get_queryset(self):
|
|
if self.queryset:
|
|
return self.queryset()
|
|
else:
|
|
return self.model.objects.all()
|
|
|
|
def get_search_field(self, full_name):
|
|
try:
|
|
return self.search_fields[full_name]
|
|
except KeyError:
|
|
raise KeyError('No search field named: %s' % full_name)
|
|
|
|
def get_search_query(self, query_string, global_and_search=False):
|
|
return SearchQuery(
|
|
query_string=query_string, search_model=self,
|
|
global_and_search=global_and_search
|
|
)
|
|
|
|
@property
|
|
def label(self):
|
|
if not self._label:
|
|
self._label = self.model._meta.verbose_name
|
|
return self._label
|
|
|
|
@property
|
|
def model(self):
|
|
if not self._model:
|
|
self._model = apps.get_model(self.app_label, self.model_name)
|
|
return self._model
|
|
|
|
@property
|
|
def pk(self):
|
|
return self.get_full_name()
|
|
|
|
def search(self, query_string, user, global_and_search=False):
|
|
AccessControlList = apps.get_model(
|
|
app_label='acls', model_name='AccessControlList'
|
|
)
|
|
|
|
search_query = self.get_search_query(
|
|
query_string=query_string, global_and_search=global_and_search
|
|
)
|
|
|
|
try:
|
|
queryset = self.get_queryset().filter(search_query.query).distinct()
|
|
except Exception:
|
|
logger.error(
|
|
'Error filtering model %s with queryset: %s', self.model,
|
|
search_query.query
|
|
)
|
|
raise
|
|
|
|
if self.permission:
|
|
queryset = AccessControlList.objects.restrict_queryset(
|
|
permission=self.permission, queryset=queryset, user=user
|
|
)
|
|
|
|
return queryset
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class SearchQuery(object):
|
|
def __init__(self, query_string, search_model, global_and_search=False):
|
|
self.query = None
|
|
self.text = []
|
|
|
|
for search_field in search_model.search_fields:
|
|
search_term_collection = SearchTermCollection(
|
|
text=query_string.get(
|
|
search_field.field, query_string.get('q', '')
|
|
).strip()
|
|
)
|
|
|
|
field_query = FieldQuery(
|
|
search_field=search_field,
|
|
search_term_collection=search_term_collection
|
|
)
|
|
|
|
if field_query.query:
|
|
self.text.append('({})'.format(force_text(field_query)))
|
|
|
|
if global_and_search:
|
|
self.text.append('AND')
|
|
else:
|
|
self.text.append('OR')
|
|
|
|
if self.query is None:
|
|
self.query = field_query.query
|
|
else:
|
|
if global_and_search:
|
|
self.query = self.query & field_query.query
|
|
else:
|
|
self.query = self.query | field_query.query
|
|
|
|
self.query = self.query or Q()
|
|
|
|
def __str__(self):
|
|
return ' '.join(self.text[:-1])
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class SearchTerm(object):
|
|
def __init__(self, negated, string, is_meta):
|
|
self.negated = negated
|
|
self.string = string
|
|
self.is_meta = is_meta
|
|
|
|
def __str__(self):
|
|
if self.is_meta:
|
|
return ''
|
|
else:
|
|
return '{}contains "{}"'.format(
|
|
'does not ' if self.negated else '', self.string
|
|
)
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class SearchTermCollection(object):
|
|
def __init__(self, text):
|
|
"""
|
|
Takes a text string and returns a list of dictionaries.
|
|
Each dictionary has two key "negated" and "string"
|
|
|
|
String 'a "b c" d "e" \'f g\' h -i -"j k" l -\'m n\' o OR p'
|
|
|
|
Results in:
|
|
[
|
|
{'negated': False, 'string': 'a'}, {'negated': False, 'string': 'b c'},
|
|
{'negated': False, 'string': 'd'}, {'negated': False, 'string': 'e'},
|
|
{'negated': False, 'string': 'f g'}, {'negated': False, 'string': 'h'},
|
|
{'negated': True, 'string': 'i'}, {'negated': True, 'string': 'j k'},
|
|
{'negated': False, 'string': 'l'}, {'negated': True, 'string': 'm n'},
|
|
{'negated': False, 'string': 'o'}, {'negated': False, 'string': 'OR'},
|
|
{'negated': False, 'string': 'p'}
|
|
]
|
|
"""
|
|
inside_quotes = False
|
|
negated = False
|
|
term_letters = []
|
|
self.terms = []
|
|
|
|
for letter in text:
|
|
if letter in TERM_QUOTES:
|
|
if inside_quotes:
|
|
if term_letters:
|
|
term_string = ''.join(term_letters)
|
|
negated = False
|
|
if term_string.startswith(TERM_NEGATION_CHARACTER):
|
|
term_string = term_string[1:]
|
|
negated = True
|
|
|
|
self.terms.append(
|
|
SearchTerm(
|
|
is_meta=False, negated=negated,
|
|
string=term_string
|
|
)
|
|
)
|
|
negated = False
|
|
term_letters = []
|
|
|
|
inside_quotes = not inside_quotes
|
|
else:
|
|
if not inside_quotes and letter == TERM_SPACE_CHARACTER:
|
|
if term_letters:
|
|
term_string = ''.join(term_letters)
|
|
if term_string in TERM_OPERATIONS:
|
|
is_meta = True
|
|
else:
|
|
is_meta = False
|
|
|
|
if is_meta:
|
|
negated = False
|
|
else:
|
|
negated = False
|
|
if term_string.startswith(TERM_NEGATION_CHARACTER):
|
|
term_string = term_string[1:]
|
|
negated = True
|
|
|
|
self.terms.append(
|
|
SearchTerm(
|
|
is_meta=is_meta, negated=negated,
|
|
string=term_string
|
|
)
|
|
)
|
|
negated = False
|
|
term_letters = []
|
|
else:
|
|
term_letters.append(letter)
|
|
|
|
if term_letters:
|
|
term_string = ''.join(term_letters)
|
|
negated = False
|
|
if term_string.startswith(TERM_NEGATION_CHARACTER):
|
|
term_string = term_string[1:]
|
|
negated = True
|
|
|
|
self.terms.append(
|
|
SearchTerm(
|
|
is_meta=False, negated=negated,
|
|
string=term_string
|
|
)
|
|
)
|
|
|
|
def __str__(self):
|
|
result = []
|
|
for term in self.terms:
|
|
if term.is_meta:
|
|
result.append(term.string)
|
|
else:
|
|
result.append(force_text(term))
|
|
|
|
return ' '.join(result)
|