Add file patching tests to the storages app
Signed-off-by: Roberto Rosario <roberto.rosario@mayan-edms.com>
This commit is contained in:
@@ -5,5 +5,6 @@ from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
|
||||
class StorageApp(apps.AppConfig):
|
||||
has_tests = True
|
||||
name = 'mayan.apps.storage'
|
||||
verbose_name = _('Storage')
|
||||
|
||||
0
mayan/apps/storage/tests/__init__.py
Normal file
0
mayan/apps/storage/tests/__init__.py
Normal file
80
mayan/apps/storage/tests/test_utils.py
Normal file
80
mayan/apps/storage/tests/test_utils.py
Normal file
@@ -0,0 +1,80 @@
|
||||
from __future__ import print_function, unicode_literals
|
||||
|
||||
from pathlib2 import Path
|
||||
import shutil
|
||||
|
||||
from mayan.apps.common.tests.base import BaseTestCase
|
||||
from mayan.apps.storage.utils import mkdtemp
|
||||
|
||||
from ..utils import patch_files
|
||||
|
||||
|
||||
class PatchFilesTestCase(BaseTestCase):
|
||||
test_replace_text = 'replaced_text'
|
||||
|
||||
def setUp(self):
|
||||
super(PatchFilesTestCase, self).setUp()
|
||||
|
||||
self.temporary_directory = mkdtemp()
|
||||
self.path_temporary_directory = Path(self.temporary_directory)
|
||||
self.path_test_file = self.path_temporary_directory / 'test_file.txt'
|
||||
|
||||
with self.path_test_file.open(mode='w') as file_object:
|
||||
file_object.writelines(
|
||||
[
|
||||
'line 1\n',
|
||||
' line 2\n',
|
||||
'line 3\n',
|
||||
]
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
super(PatchFilesTestCase, self).tearDown()
|
||||
shutil.rmtree(self.temporary_directory, ignore_errors=True)
|
||||
|
||||
def _patch_test_file(self):
|
||||
replace_list = [
|
||||
{
|
||||
'filename_pattern': '*',
|
||||
'content_patterns': [
|
||||
{
|
||||
'search': self.test_search_text,
|
||||
'replace': self.test_replace_text,
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
patch_files(
|
||||
path=self.path_temporary_directory, replace_list=replace_list
|
||||
)
|
||||
|
||||
with self.path_test_file.open(mode='r') as file_object:
|
||||
self.final_text = file_object.read()
|
||||
|
||||
def test_file_patching_single_line(self):
|
||||
self.test_search_text = 'line 1'
|
||||
|
||||
self._patch_test_file()
|
||||
|
||||
self.assertEqual(self.final_text, 'replaced_text\n line 2\nline 3\n')
|
||||
|
||||
def test_file_patching_multi_line(self):
|
||||
self.test_search_text = 'line 2\nline 3\n'
|
||||
|
||||
self._patch_test_file()
|
||||
|
||||
self.assertEqual(self.final_text, 'line 1\n replaced_text')
|
||||
|
||||
def test_file_patching_spaces(self):
|
||||
self.test_search_text = ' line 2'
|
||||
|
||||
self._patch_test_file()
|
||||
|
||||
self.assertEqual(self.final_text, 'line 1\nreplaced_text\nline 3\n')
|
||||
|
||||
def test_file_patching_no_matches(self):
|
||||
self.test_search_text = 'line 4'
|
||||
|
||||
self._patch_test_file()
|
||||
|
||||
self.assertEqual(self.final_text, 'line 1\n line 2\nline 3\n')
|
||||
@@ -1,6 +1,5 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import fileinput
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
@@ -8,7 +7,6 @@ import tempfile
|
||||
|
||||
from pathlib2 import Path
|
||||
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.module_loading import import_string
|
||||
|
||||
from .settings import setting_temporary_directory
|
||||
@@ -98,15 +96,42 @@ def patch_files(path=None, replace_list=None):
|
||||
for replace_entry in replace_list or []:
|
||||
for path_entry in path_object.glob('**/{}'.format(replace_entry['filename_pattern'])):
|
||||
if path_entry.is_file():
|
||||
# PY3
|
||||
# Don't use context processor to allow working on Python 2.7
|
||||
# Update on Mayan EDMS version >= 4.0
|
||||
file_object = fileinput.FileInput(force_text(path_entry), inplace=True)
|
||||
for line in file_object:
|
||||
for pattern in replace_entry['content_patterns']:
|
||||
line = line.replace(pattern['search'], pattern['replace'])
|
||||
print(line, end='')
|
||||
file_object.close()
|
||||
for pattern in replace_entry['content_patterns']:
|
||||
with path_entry.open(mode='r+') as source_file_object:
|
||||
with tempfile.TemporaryFile(mode='r+') as temporary_file_object:
|
||||
source_position = 0
|
||||
destination_position = 0
|
||||
|
||||
while(True):
|
||||
source_file_object.seek(source_position)
|
||||
letter = source_file_object.read(1)
|
||||
|
||||
if len(letter) == 0:
|
||||
break
|
||||
else:
|
||||
if letter == pattern['search'][0]:
|
||||
text = '{}{}'.format(letter, source_file_object.read(len(pattern['search']) - 1))
|
||||
|
||||
temporary_file_object.seek(destination_position)
|
||||
if text == pattern['search']:
|
||||
text = pattern['replace']
|
||||
source_position = source_position + len(pattern['search'])
|
||||
destination_position = destination_position + len(pattern['replace'])
|
||||
temporary_file_object.write(text)
|
||||
|
||||
else:
|
||||
source_position = source_position + 1
|
||||
destination_position = destination_position + 1
|
||||
temporary_file_object.write(letter)
|
||||
else:
|
||||
source_position = source_position + 1
|
||||
destination_position = destination_position + 1
|
||||
temporary_file_object.write(letter)
|
||||
|
||||
source_file_object.seek(0)
|
||||
source_file_object.truncate()
|
||||
temporary_file_object.seek(0)
|
||||
shutil.copyfileobj(fsrc=temporary_file_object, fdst=source_file_object)
|
||||
|
||||
|
||||
def validate_path(path):
|
||||
|
||||
Reference in New Issue
Block a user