Last active
August 29, 2018 06:00
-
-
Save mcchae/fc928f8de526031cbf665f924f402487 to your computer and use it in GitHub Desktop.
CVE search engine using python Whoosh
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding=utf-8 | |
""" | |
import CVE from "http://cve.mitre.org/data/downloads/allitems.txt" | |
""" | |
################################################################################ | |
import os | |
import urllib2 | |
import getopt | |
import datetime | |
from whoosh.index import create_in | |
from whoosh.fields import * | |
from termcolor import colored | |
INDEX_DIR="/opt/scap/cve" | |
# ################################################################################ | |
# def get_unique_fields(txt=None, | |
# url="http://cve.mitre.org/data/downloads/allitems.txt"): | |
# """ | |
# :param txt: | |
# :return: | |
# {Name} hits 102065 | |
# {Status} hits 102065 | |
# {URL} hits 99012 | |
# {Phase} hits 99012 | |
# {Category} hits 99012 | |
# {Reference} hits 874535 | |
# """ | |
################################################################################ | |
def cve_generator(txt=None, | |
url="http://cve.mitre.org/data/downloads/allitems.txt"): | |
""" | |
:param txt: | |
:param url: | |
:return: | |
""" | |
if txt: | |
ifp = open(txt) | |
elif url: | |
ifp = urllib2.urlopen(url) | |
else: | |
raise ValueError('source text or url is not specified!') | |
try: | |
b_header = True | |
b_body = True | |
cve = {} | |
for i, line in enumerate(ifp): | |
line = line.strip() | |
if b_body: | |
if line.startswith('======'): | |
if cve and not b_header: | |
yield cve | |
cve = {} | |
b_header = False | |
b_body = False | |
continue | |
else: # in body | |
if not line: | |
continue | |
if 'body' not in cve: | |
cve['body'] = [] | |
cve['body'].append(line) | |
else: # check header | |
if not line: | |
b_body = True | |
continue | |
ndx = line.find(':') | |
if ndx < 0: | |
sys.stderr.write('[%d] %s : Invalid colon format\n' % | |
(i+1, line)) | |
else: | |
field = line[:ndx].lower() | |
if field not in cve: | |
cve[field] = [] | |
cve[field].append(line[ndx+1:].strip()) | |
if cve: | |
yield cve | |
finally: | |
ifp.close() | |
################################################################################ | |
def do_index(index_folder=INDEX_DIR, txt=None, url=None, | |
yes=False, verbose=False): | |
# SQL의 DDL을 이용하듯이 특정 문서의 Schema 설정을 우선 합니다. | |
schema = Schema( | |
name=ID(stored=True), | |
status=KEYWORD(stored=True), | |
url=STORED(), | |
phase=KEYWORD(stored=True), | |
category=KEYWORD(stored=True), | |
reference=STORED(), | |
body=TEXT(stored=True), | |
) | |
if not yes: | |
confirm = raw_input("Want to delete and reindexing at <%s>? [y/N] " | |
% index_folder) | |
if not confirm.lower() in ('y', 'yes'): | |
return False | |
if os.path.isdir(index_folder): | |
os.system('rm -rf "%s/*"' % index_folder) | |
if verbose: | |
sys.stdout.write('delete all contents at <%s>\n' % index_folder) | |
s_ts = datetime.datetime.now() | |
# 스키마 정보로 색인용 폴더를 생성합니다. | |
ix = create_in(INDEX_DIR, schema) | |
# Inverted색인을 위한 writer를 정의합니다. | |
writer = ix.writer() | |
sys.stdout.write('start indexing... from ') | |
sys.stdout.write('%s\n' % (txt if txt else url)) | |
for i, cve in enumerate(cve_generator(txt=txt, url=url)): | |
fields = { | |
'name': ''.join(cve['name']), | |
'status': '' if 'status' not in cve else ' '.join(cve['status']), | |
'url': '' if 'url' not in cve else '\n'.join(cve['url']), | |
'phase': '' if 'phase' not in cve else ' '.join(cve['phase']), | |
'category': '' if 'category' not in cve else ' '.join(cve['category']), | |
'reference': '' if 'reference' not in cve else '\n'.join(cve['reference']), | |
'body': '' if 'body' not in cve else '\n'.join(cve['body']), | |
} | |
for f in fields.keys(): | |
try: | |
fields[f] = unicode(fields[f], 'latin1') | |
except Exception as err: | |
sys.stderr.write('fields["%s"]=%s unicode error: %s' % ( | |
f, fields[f], str(err) | |
)) | |
writer.add_document(**fields) | |
if verbose and i % 1000 == 0: | |
sys.stdout.write('<<<%d>>> indexing...\n' % (i + 1,)) | |
sys.stdout.write('<<<%d>>> indexing... done\n' % (i + 1,)) | |
sys.stdout.write('commit and optimizing...') | |
sys.stdout.flush() | |
writer.commit(optimize=True) | |
e_ts = datetime.datetime.now() | |
sys.stdout.write('done! [It takes %s]\n' % (e_ts - s_ts)) | |
return True | |
################################################################################ | |
def usage(msg=None): | |
""" | |
usage for this search program | |
:param msg: | |
:return: | |
""" | |
if msg: | |
print(colored(str(msg), 'red')) | |
print(colored(''' | |
usage: {0} [options] query_string | |
query and search result from CVE | |
options are: | |
-h, --help : show this message | |
-i, --index_folder : index folder (default is "/opt/scap/cve") | |
-t, --txt : use text cve file instead url | |
-u, --url : url to process | |
(default is "http://cve.mitre.org/data/downloads/allitems.txt") | |
-y, --yes : do not confirm to rebuild index | |
-v, --verbose : verbose print | |
'''.format(sys.argv[0]), 'green')) | |
sys.exit(1) | |
################################################################################ | |
if __name__ == '__main__': | |
kwargs = { | |
"index_folder": INDEX_DIR, | |
"txt": None, | |
"url": "http://cve.mitre.org/data/downloads/allitems.txt", | |
"yes": False, | |
"verbose": False, | |
} | |
try: | |
opts, args = getopt.getopt( | |
sys.argv[1:], "hi:t:u:yv", | |
["help", "index_folder=", "txt=", "url=", "yes", "verbose"] | |
) | |
for o, a in opts: | |
if o in ("-h", "--help"): | |
usage() | |
elif o in ("-i", "--index_folder"): | |
kwargs['index_folder'] = a | |
elif o in ("-t", "--txt"): | |
kwargs['txt'] = a | |
elif o in ("-u", "--url"): | |
kwargs['url'] = int(a) | |
elif o in ("-y", "--yes"): | |
kwargs['yes'] = True | |
elif o in ("-v", "--verbose"): | |
kwargs['verbose'] = True | |
do_index(**kwargs) | |
except Exception as e: | |
usage(str(e)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding=utf-8 | |
""" | |
fulltext search the source "http://cve.mitre.org/data/downloads/allitems.txt" | |
""" | |
################################################################################ | |
import os | |
import getopt | |
from whoosh.index import open_dir | |
from whoosh.fields import * | |
from whoosh.qparser import QueryParser | |
from pprint import pprint | |
from termcolor import colored | |
INDEX_DIR="/opt/scap/cve" | |
################################################################################ | |
def print_lexicon(index_folder=INDEX_DIR, field='body', offset=0, limit=10): | |
""" | |
:param index_folder: | |
:param field: | |
:param offset: | |
:param limit: | |
:return: | |
""" | |
ix = open_dir(index_folder) | |
with ix.searcher() as searcher: | |
lx = searcher.lexicon(field) | |
lxs = [x for x in lx] | |
pprint('Number of lexicon of body = %d' % len(lxs)) | |
if limit > 0: | |
lxs = lxs[offset:limit] | |
pprint(lxs) | |
################################################################################ | |
def search(ix, qstr, field='body', offset=0): | |
""" | |
:param ix: | |
:param qstr: | |
:param field: | |
:param offset: | |
:return: | |
""" | |
if not isinstance(qstr, unicode): | |
qstr = unicode(qstr, 'latin1') | |
with ix.searcher() as searcher: | |
query = QueryParser(field, ix.schema).parse(qstr) | |
results = searcher.search(query) | |
length = len(results) | |
for i, r in enumerate(results[offset:]): | |
# print("[%d]%s" % (i+1, '='*80)) | |
# print(r['body']) | |
r_json = { | |
'n_order': i+1, | |
'n_length': length, | |
'name': r['name'], | |
'status': r['status'], | |
'url': r['url'], | |
'phase': r['phase'], | |
'category': r['category'], | |
'reference': r['reference'], | |
'body': r['body'], | |
} | |
yield r_json | |
################################################################################ | |
def do_search(index_folder=INDEX_DIR, qstr=None, field='body', | |
offset=0, limit=10): | |
""" | |
:param index_folder: | |
:param qstr: | |
:param field: | |
:param offset: | |
:param limit: | |
:return: | |
""" | |
if not qstr: | |
raise ValueError("Invalid query string") | |
ix = open_dir(index_folder) | |
for i, r in enumerate(search(ix, qstr, field, offset)): | |
print("[%d]%s" % (i + 1, '=' * 80)) | |
pprint(r) | |
if i + 1 >= limit > 0: | |
break | |
################################################################################ | |
def usage(msg=None): | |
""" | |
usage for this search program | |
:param msg: | |
:return: | |
""" | |
if msg: | |
print(colored(str(msg), 'red')) | |
print(colored(''' | |
usage: {0} [options] query_string | |
query and search result from CVE | |
options are: | |
-h, --help : show this message | |
-i, --index_folder : index folder (default is "/opt/scap/cve") | |
-f, --field : set search field (default is "body") | |
search filed may one of {{"status", "phase", "category"}} | |
-o, --offset : offset to skip (0-based) | |
-l, --limit : to N print (default is 10, 0 means no limit) | |
-x, --lexicon : print lexicon (from field get all terms) | |
'''.format(sys.argv[0]), 'green')) | |
sys.exit(1) | |
################################################################################ | |
if __name__ == '__main__': | |
kwargs = { | |
"index_folder": INDEX_DIR, | |
"field": "body", | |
"qstr": None, | |
"offset": 0, | |
"limit": 10, | |
"lexicon": False, | |
} | |
try: | |
opts, args = getopt.getopt( | |
sys.argv[1:], "hi:f:o:l:x", | |
["help", "index_folder=", "field=", "offset=", "limit=", "lexicon"] | |
) | |
for o, a in opts: | |
if o in ("-h", "--help"): | |
usage() | |
elif o in ("-i", "--index_folder"): | |
kwargs['index_folder'] = a | |
elif o in ("-f", "--field"): | |
kwargs['field'] = a | |
elif o in ("-o", "--offset"): | |
kwargs['offset'] = int(a) | |
elif o in ("-l", "--limit"): | |
kwargs['limit'] = int(a) | |
elif o in ("-x", "--lexicon"): | |
kwargs['lexicon'] = True | |
if kwargs['lexicon']: | |
del kwargs['qstr'] | |
del kwargs['lexicon'] | |
print_lexicon(**kwargs) | |
else: | |
del kwargs['lexicon'] | |
kwargs["qstr"] = ' '.join(args) | |
do_search(**kwargs) | |
except Exception as e: | |
usage(str(e)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment