Created
June 1, 2017 08:11
-
-
Save dicksonkv/14c5745aa50da788189e9fb082caa056 to your computer and use it in GitHub Desktop.
FileSystem Level IDS in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import hashlib | |
import os | |
import argparse | |
import pickle | |
import sys | |
import time | |
CHUNK_SIZE = 10240 | |
#SCAN | |
hashDict ={ 'metadata':{} , 'header':{}} | |
#hashDict ={ 'header':{}} | |
hashListRescan = [] | |
parser = argparse.ArgumentParser(description="") | |
parser.add_argument('-i', help='Initalize the database.' ,dest='dbInitialize', action='store_true', default=False) | |
parser.add_argument('-f', help='File to store the Meta-data.' , dest='dbName',metavar='') | |
parser.add_argument('-d', help='Directory Name To Scan' , dest='dirName',metavar='',nargs='+') | |
parser.add_argument('-r', help='Re-Scan the MetaData.', dest='dbReScan',action='store_true', default=False) | |
args = parser.parse_args() | |
DB_initailize = args.dbInitialize | |
DB_name= args.dbName | |
DB_rescan = args.dbReScan | |
DIR_names = args.dirName | |
def printHelp(): | |
print('') | |
print(' INTIALIZE : ids.py -i -d dir1 dir2 .. -f somefile.db') | |
print(' RE-SCAN : ids.py -r -f somefile.db') | |
print('') | |
# This funtion will return the hash value of a resource. | |
# resource can be a file(rtype=f) or string(rtype=s) | |
def getHash(resource,rtype='f'): | |
if rtype == 'f': | |
fh = open(resource,'rb') | |
md5sum = hashlib.new('md5') | |
while True: | |
chunk = fh.read(CHUNK_SIZE) | |
if not chunk: | |
break | |
md5sum.update(chunk) | |
return md5sum.hexdigest() | |
if rtype == 's': | |
md5sum = hashlib.new('md5') | |
md5sum.update(resource.encode('utf-8')) | |
return md5sum.hexdigest() | |
def creatingHeader(): | |
headerDict = {} | |
headerDict['date'] = time.strftime("%d-%b-%Y-%H-%M-%S") | |
headerDict['directory'] = DIR_names | |
hashDict['header'].update(headerDict) | |
return None | |
# dictName can be hashDict 0r hashDictRescan | |
def metaDictToHashDict(fileName): | |
statInfo = os.stat(fileName) | |
metaDict = {} | |
metaDict[fileName] = {} | |
metaDict[fileName]['permission'] = {} | |
metaDict[fileName]['permission']['octal']= oct(statInfo.st_mode)[-4:] | |
metaDict[fileName]['role'] = {} | |
metaDict[fileName]['role']['uid'] = statInfo.st_uid | |
metaDict[fileName]['role']['gid'] = statInfo.st_gid | |
metaDict[fileName]['content'] = {} | |
metaDict[fileName]['content']['length'] = statInfo.st_size | |
metaDict[fileName]['content']['hash']= getHash(fileName) | |
hashDict['metadata'].update(metaDict) | |
return None | |
#def fetchMetaData(f,metaType): | |
# statInfo = os.stat(f) | |
''' | |
if DB_initailize and DB_name and DB_rescan and DIR_names: | |
printHelp() | |
sys.exit(1) | |
if DB_initailize and DB_rescan: | |
printHelp() | |
sys.exit(1) | |
if DB_initailize and DB_name: | |
printHelp() | |
sys.exit(1) | |
''' | |
if DB_initailize and DB_name and DIR_names: | |
creatingHeader() | |
for DIR_Name in DIR_names: | |
for rootDir,subDirs,subFiles in os.walk(DIR_Name): | |
for fName in subFiles: | |
absPath = os.path.join(rootDir,fName) | |
try: | |
print('[ Scaning ] : ',absPath) | |
metaDictToHashDict(absPath) | |
fileObject = open(DB_name,'wb') | |
pickle.dump(hashDict,fileObject) | |
except FileNotFoundError: | |
pass | |
if DB_name and DB_rescan: | |
fileObject = open(DB_name,'rb') | |
hashDict = pickle.load(fileObject) | |
print('') | |
print(' [X] READING DATABASE HEADER') | |
print('') | |
print(' - Scan Date : ', hashDict['header']['date']) | |
print(' - Directory : ', hashDict['header']['directory']) | |
print('') | |
print(' [X] READING FILE NAMES') | |
print('') | |
hashListRescan = hashDict['metadata'].keys() | |
print(' - Total File Found : ', len(hashListRescan)) | |
print('') | |
print(' [X] CHECKING FOR MISSING FILES ') | |
print('') | |
for item in hashListRescan: | |
if not os.path.exists(item): | |
print(' - :',item) | |
print('') | |
print(' [X] CHECKING FOR PERMISSION CHANGES ') | |
print('') | |
for item in hashListRescan: | |
if os.path.exists(item): | |
curPermission = oct(os.stat(item).st_mode)[-4:] | |
oldPermission = hashDict['metadata'][item]['permission']['octal'] | |
if curPermission != oldPermission: | |
print(' - :', '[ O:'+oldPermission+' C:'+curPermission+']' ,item ) | |
print('') | |
print(' [X] CHECKSUM VARIFICATION OF CONTENT ') | |
print('') | |
for item in hashListRescan: | |
if os.path.exists(item): | |
curHash = getHash(item) | |
oldHash = hashDict['metadata'][item]['content']['hash'] | |
if curHash != oldHash: | |
print(' - :', item ) | |
print('') | |
print(' [X] OWNERSHIP VARIFICATION OF CONTENT ') | |
print('') | |
for item in hashListRescan: | |
if os.path.exists(item): | |
curOwner = os.stat(item).st_uid | |
oldOwner = hashDict['metadata'][item]['role']['uid'] | |
if curOwner != oldOwner: | |
print(' - :', item ) | |
print('') | |
print(' [X] GROUP VARIFICATION OF CONTENT ') | |
print('') | |
for item in hashListRescan: | |
if os.path.exists(item): | |
curGid = os.stat(item).st_gid | |
oldGid = hashDict['metadata'][item]['role']['gid'] | |
if curGid != oldGid: | |
print(' - :', item ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment