-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathextract_text.py
More file actions
159 lines (123 loc) · 5.84 KB
/
extract_text.py
File metadata and controls
159 lines (123 loc) · 5.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# %%
import sys, os, glob, math, csv, json, itertools, pdf2image, pytesseract, cv2
from tqdm import tqdm
from joblib import Parallel, delayed
from skimage.metrics import structural_similarity as ssim
import numpy as np
from nltk.tokenize import sent_tokenize as splitToSentences
# %%
def get_files(path, ext='pdf'):
rChar = '/' if sys.platform == 'posix' else '\\'
return [f.split(rChar)[-1] for f in glob.glob(path + '/*.' + ext.lower())]
def pdf_to_imagesNpArr(pdf_path):
pages = pdf2image.convert_from_path(pdf_path, dpi=250, fmt='png')
return [np.array(pg) for pg in pages]
def get_cropped_images(img, iterCount=2):
'''
this function draws a bounding box around the identified "chunk" of text.
each pass from the loop 'enhances' the feature as there is now a box drawn around it.
finally, when the loop ends, the same chunks are selected from of the original image.
not fixed. change to return a sorted dict of images
'''
def insert_coords(cList, coord):
cList.append(coord)
sorted(cList, key=lambda x: [x[0], x[1]])
return cList
def coord_to_str(coord):
return ','.join([str(c) for c in coord])
def str_to_coord(strCoord):
return [int(c) for c in strCoord.split(',')]
imOut = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
imCopy = imOut.copy()
croppedImgs = {}
coords_list = []
# for I in range(0, iterCount):
gray = imCopy.copy()
ret, thresh1 = cv2.threshold(gray, 0, 255, cv2.THRESH_OTSU | cv2.THRESH_BINARY_INV)
rect_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (18, 18))
dilation = cv2.dilate(thresh1, rect_kernel, iterations = 1)
contours, hierarchy = cv2.findContours(dilation, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
im2 = imCopy.copy()
counter = 0
coords_list = []
for i, cnt in enumerate(contours):
x, y, w, h = cv2.boundingRect(cnt)
cropped = im2[y:y + h, x:x + w]
counter+=1
rect = cv2.rectangle(im2, (x, y), (x + w, y + h), (0, 255, 0), 2)
imCropped = imOut[y:y + h, x:x+w]
coords_list = insert_coords(coords_list, [y, x])
croppedImgs[coord_to_str([y, x])] = imCropped
imCopy = im2.copy()
sorted_cropped_images = {}
for i, c in enumerate(coords_list):
sorted_cropped_images[i] = croppedImgs[coord_to_str(c)]
return sorted_cropped_images
def run_ocr(cropped_images_dict, show_progress):
from joblib import Parallel, delayed
if show_progress:
txtractArr = Parallel(n_jobs=-1)(delayed(pytesseract.image_to_string)(cropped_images_dict[file], "eng")
for file in tqdm()(cropped_images_dict.keys()))
else:
txtractArr = Parallel(n_jobs=-1)(delayed(pytesseract.image_to_string)(cropped_images_dict[file], "eng")
for file in cropped_images_dict.keys())
txtractArr = [t for t in txtractArr if len(t.strip())>0]
return txtractArr[::-1]
# %%
def get_files_hash(filename):
import hashlib
# filename = input("Enter the input file name: ")
sha256_hash = hashlib.sha256()
with open(filename,"rb") as f:
# Read and update hash string value in blocks of 4K
for byte_block in iter(lambda: f.read(4096),b""):
sha256_hash.update(byte_block)
# print(sha256_hash.hexdigest())
return str(sha256_hash.hexdigest())
def save_jsonl(filename, textArr, single_db=True, scan_type='pytesseract'):
default_dbFile_name = 'jarPhysDB.jsonl'
try:
fName = '/'.join(filename.split('/')[:-1]) + '/' + default_dbFile_name if single_db else (filename[:-4] + '.jsonl')
fileHash = get_files_hash(filename)
option = 'a' if os.path.exists(fName) else 'w'
with open(fName, option, encoding='utf-8') as f:
f.write(json.dumps({'filename' : filename, 'filehash': fileHash, 'scanType' : scan_type, 'pages' : textArr}) + '\n')
except Exception as e:
print(e)
print('Error saving jsonl file')
return False
return True
def create_db(folderName=None, filenames=None,single_db=True):
files = get_files('./'+folderName + '/') if filenames is None else filenames
for file in tqdm(files):
imagesNp = pdf_to_imagesNpArr('./' + folderName + '/' + file)
cropped_images_dicts = [get_cropped_images(img) for img in imagesNp]
textArrs = Parallel(n_jobs=4)(delayed(run_ocr)(cropped_images_dict, show_progress=False) for cropped_images_dict in tqdm(cropped_images_dicts))
# in the future, where different types of scans are done, the scan function also returns the scan type.
save_jsonl('./' + folderName + '/' + file, textArrs, single_db=single_db, scan_type='pytesseract')
return True
def get_all_indexed_files(folderName):
db_files = get_files('./'+folderName + '/', 'jsonl')
scanEntries = []
for db_file in db_files:
with open('./'+folderName + '/' + db_file, 'r', encoding='utf-8') as f:
for line in f:
d = json.loads(line)
scanEntries.append(d['filehash'])
return list(set(scanEntries))
def extractTextMain(folderName=None):
if folderName is None:
folderName = './files'
# available_scan_types = ['pytesseract']
pdfsinFolder = get_files(folderName)
fHashes = [get_files_hash(folderName + '/' + f) for f in pdfsinFolder]
# possible_combos = list(itertools.product(fHashes, available_scan_types))
notScanned = [pdfsinFolder[i] for i, f in enumerate(fHashes) if f not in get_all_indexed_files(folderName)]
if len(notScanned) == 0:
print('All files are already scanned')
return
create_db(folderName, notScanned, single_db=True)
# %%
if __name__ == '__main__':
extractTextMain()
# %%