Skip to content

Instantly share code, notes, and snippets.

@rhee
Created July 13, 2019 14:13
Show Gist options
  • Save rhee/a0ee38e5ab3213b5c8b2953780b2b470 to your computer and use it in GitHub Desktop.
Save rhee/a0ee38e5ab3213b5c8b2953780b2b470 to your computer and use it in GitHub Desktop.
vdl_shm.py
# coding: utf-8
from traceback import print_exc
import os
import stat
import sys
from struct import pack, unpack, pack_into, unpack_from
import mmap
import ctypes
VDL_SHM_MAX_FILES = 10
VDL_SHM_MODE_CREATE = 0
VDL_SHM_MODE_OPEN = 1
_debug = True
_trace = False
"""
VDL SHM Shared Pool Structure
(* all integral values are unsigned, and stored in LSB-first order)
+0 +1 +2 +3 +4 +5 +6 +7
+0 +-------+-------+-------+-------+-------+-------+-------+-------+
| 'V' 'D' 'L' 'M' | 1 | n | 0 | # pool memory magic (4), layout version(1), num files(1), reserved(2)
+8 +-------+-------+-------+-------+-------+-------+-------+-------+
| pool size | allocated size | # pool allocation states: max size, allocated size
+16 +-------+-------+-------+-------+-------+-------+-------+-------+
| file #0 offset | file #0 bytes |
+24 +-------+-------+-------+-------+-------+-------+-------+-------+
| file #1 offset | file #1 bytes |
+32 +-------+-------+-------+-------+-------+-------+-------+-------+
| file #2 offset | file #2 bytes |
+36 +-------+-------+-------+-------+-------+-------+-------+-------+
: ... :
+88 +-------+-------+-------+-------+-------+-------+-------+-------+
| file #9 offset | file #9 bytes |
+96 +-------+-------+-------+-------+-------+-------+-------+-------+
| |
| file #0 content |
| |
: ... :
| |
+-------+-------+-------+-------+-------+-------+-------+-------+
| |
| file #1 content |
| |
: ... :
| |
+-------+-------+-------+-------+-------+-------+-------+-------+
...
...
"""
class vdl_shm_common:
def __init__(self, shm_name, shm_size, shm_mode):
self.shm_name = shm_name
self.shm_size = shm_size
self.shm_mode = shm_mode
self._shm_size = 0
self._shm = None
self._shm_memoryview= None
self.create(self.shm_size)
if self.shm_mode == VDL_SHM_MODE_CREATE:
self.pool_init()
self.pool_check()
@property
def shm_native_name(self):
raise NotImplementedError("Please Implement create()")
def create(self,shm_size):
raise NotImplementedError("Please Implement create()")
def destroy(self):
raise NotImplementedError("Please Implement destroy()")
@property
def shm(self):
return self._shm
@property
def buf(self):
if self._shm_memoryview == None:
self._shm_memoryview = memoryview(self._shm)
return self._shm_memoryview
def pool_init(self):
shm = self.shm
_shm_size = self._shm_size
magic = 'VDLM'.encode('ascii')
assert len(magic) == 4
shm[0:4] = magic
version = 1
pack_into('<B',shm,4,version)
num_files = 0
pack_into('<B',shm,5,num_files)
pool_size = _shm_size
pack_into('<I',shm,8,pool_size)
pool_allocated_size = 96 # header and ftbl 영역 크기
pack_into('<I',shm,12,pool_allocated_size)
shm[16:96] = b'\0' * (96-16)
def pool_check(self):
if _debug:
print('{:>22s}: {}'.format('pool_magic',self.pool_magic))
print('{:>22s}: {}'.format('pool_version',self.pool_version))
print('{:>22s}: {}'.format('pool_num_files',self.pool_num_files))
print('{:>22s}: {}'.format('pool_size',self.pool_size))
print('{:>22s}: {}'.format('pool_allocated_size',self.pool_allocated_size))
assert self.pool_magic == 'VDLM'
assert self.pool_version == 1
assert self.pool_allocated_size <= self.pool_size
@property
def pool_magic(self):
magic = self.shm[0:4].decode('ascii')
return magic
@property
def pool_version(self):
version, = unpack_from('<B',self.shm,4)
return version
@property
def pool_num_files(self):
num_files, = unpack_from('<B',self.shm,5)
return num_files
@pool_num_files.setter
def pool_num_files(self,n):
pack_into('<B',self.shm,5,n)
@property
def pool_size(self):
size, = unpack_from('<I',self.shm,8)
return size
@pool_size.setter
def pool_size(self, size):
pack_into('<I',self.shm,8,size)
@property
def pool_allocated_size(self):
allocated_size, = unpack_from('<I',self.shm,12)
return allocated_size
@pool_allocated_size.setter
def pool_allocated_size(self,size):
pack_into('<I',self.shm,12,size)
def get_file_offset(self,n):
assert (n >= 0 and n < self.pool_num_files),(n, self.pool_num_files,)
ftbl_offset = 16+n*8
offset, = unpack_from('<I',self.shm,ftbl_offset)
return offset
def set_file_offset(self,n,offset):
assert (n >= 0 and n < self.pool_num_files),(n, self.pool_num_files,)
ftbl_offset = 16+n*8
pack_into('<I',self.shm,ftbl_offset,offset)
def get_file_size(self,n):
assert (n >= 0 and n < self.pool_num_files),(n, self.pool_num_files,)
ftbl_offset = 16+n*8+4
size, = unpack_from('<I',self.shm,ftbl_offset)
return size
def set_file_size(self,n,size):
assert (n >= 0 and n < self.pool_num_files),(n, self.pool_num_files,)
ftbl_offset = 16+n*8+4
pack_into('<I',self.shm,ftbl_offset,size)
def clear(self):
assert self.shm_mode == VDL_SHM_MODE_CREATE
self.pool_init()
def allocate(self,size):
assert self.pool_num_files < VDL_SHM_MAX_FILES, (self.pool_num_files, VDL_SHM_MAX_FILES)
assert self.pool_allocated_size + size <= self._shm_size, (self.pool_allocated_size,size,self._shm_size)
new_file_num = self.pool_num_files
new_file_offset = self.pool_allocated_size
self.pool_num_files += 1
self.pool_allocated_size += size
self.set_file_offset(new_file_num,new_file_offset)
self.set_file_size(new_file_num,size)
if _trace:
print('allocate num {:d}, start {:d}, end {:d}'.format(
new_file_num,
new_file_offset,
new_file_offset+size,
))
# print('type(shm[:])=',type(self.shm[:]))
# print('type(shm[:500])=',type(self.shm[:500]))
# print('type(shm[500:550])=',type(self.shm[500:550]))
# print('type(_shm[:])=',type(self._shm[:]))
# print('type(_shm[:500])=',type(self._shm[:500]))
# print('type(_shm[500:550])=',type(self._shm[500:550]))
return new_file_num, self.buf[new_file_offset:new_file_offset+size]
def get(self,n):
file_offset = self.get_file_offset(n)
file_size = self.get_file_size(n)
if _trace:
print('get num {:d}, start {:d}, end {:d}'.format(
n,
file_offset,
file_offset+file_size,
))
return self.buf[file_offset:file_offset+file_size]
def flush(self):
self.shm.flush()
if os.name == 'nt':
from ctypes import windll, cdll,\
c_wchar, c_size_t, c_ulonglong, c_wchar_p, c_void_p,\
sizeof, c_int,\
WinError
from ctypes.wintypes import BOOL, DWORD, HANDLE, LPCWSTR, LPCVOID, LPVOID
CreateFileMappingW = windll.kernel32.CreateFileMappingW
CreateFileMappingW.argtypes = (HANDLE, LPVOID, DWORD, DWORD, DWORD, LPCWSTR)
CreateFileMappingW.restype = HANDLE
MapViewOfFile = windll.kernel32.MapViewOfFile
MapViewOfFile.argtypes = (HANDLE, DWORD, DWORD, DWORD, c_ulonglong)
MapViewOfFile.restype = LPVOID
UnmapViewOfFile = windll.kernel32.UnmapViewOfFile
UnmapViewOfFile.argtypes = (LPCVOID,)
UnmapViewOfFile.restype = BOOL
CloseHandle = windll.kernel32.CloseHandle
CloseHandle.argtypes = (HANDLE,)
CloseHandle.restype = BOOL
GetLastError = windll.kernel32.GetLastError
# _getch = cdll.msvcrt._getch
FILE_MAP_ALL_ACCESS = 0xF001F
INVALID_HANDLE_VALUE = -1
PAGE_READWRITE = 0x04
VDL_SHM_NAME_PREFIX = 'Local\\VDLSharedBuffer'
VDL_SHM_SIZE_MAX = 70 * 1024 * 1024 # 70MB for 10 image files
VDL_SHM_SIZE_CURRENT = 0
def shm_unlink(shm_name_native):
shm_name_native_wchar = c_wchar_p(shm_name_native)
shm_size = 0
hMapObject = CreateFileMappingW(INVALID_HANDLE_VALUE, None, PAGE_READWRITE, 0, shm_size, shm_name_native_wchar)
if (hMapObject == 0):
print("Could not open file mapping object")
raise WinError()
CloseHandle(hMapObject)
class vdl_shm_win(vdl_shm_common):
"""
"""
def __init__(self, shm_name, shm_size=VDL_SHM_SIZE_CURRENT, shm_mode=VDL_SHM_MODE_OPEN):
assert shm_size >= 0 and shm_size <= VDL_SHM_SIZE_MAX
super(vdl_shm_win, self).__init__(shm_name,shm_size,shm_mode)
@property
def shm_name_native(self):
return VDL_SHM_NAME_PREFIX + '#' + self.shm_name
def create(self,shm_size):
shm_name_native = self.shm_name_native
shm_mode = self.shm_mode
if shm_mode == VDL_SHM_MODE_CREATE:
assert shm_size > 0
shm_name_native_wchar = c_wchar_p(shm_name_native)
hMapObject = CreateFileMappingW(INVALID_HANDLE_VALUE, None, PAGE_READWRITE, 0, shm_size, shm_name_native_wchar)
if (hMapObject == 0):
print("Could not open file mapping object")
raise WinError()
pBuf = MapViewOfFile(hMapObject, FILE_MAP_ALL_ACCESS, 0, 0, shm_size)
if (pBuf == 0):
print("Could not map view of file")
CloseHandle(hMapObject)
raise WinError()
self.h_mapobject = hMapObject
self.p_buf = pBuf
# create shm object for python I/O
_shm = mmap.mmap(0, shm_size, tagname=shm_name_native, access=mmap.ACCESS_WRITE)
self._shm = _shm
# get shm size
_shm.seek(0,2) # end of file
self._shm_size = _shm.tell()
print('vdl_shm_posix: created, size={:d}'.format(self._shm_size))
def destroy(self):
shm_mode = self.shm_mode
if shm_mode == VDL_SHM_MODE_CREATE:
pBuf = self.p_buf
hMapObject = self.h_mapobject
UnmapViewOfFile(pBuf)
CloseHandle(hMapObject)
self.p_buf = None
self.h_mapobject = None
if self._shm_memoryview is not None:
self._shm_memoryview.release()
self._shm_memoryview = None
if self._shm is not None:
self._shm.flush()
self._shm.close()
self._shm = None
else:
try:
unicode
except NameError:
unicode = str
## 이렇게 쓰면 ctypes.get_errno() 사용 불가 (항상 0)
# rtld = ctypes.cdll.LoadLibrary(None)
# _shm_open = rtld.shm_open
# _shm_unlink = rtld.shm_unlink
from ctypes import CDLL
rtld = CDLL('librt.so',use_errno=True)
_shm_open = rtld.shm_open
_shm_unlink = rtld.shm_unlink
def shm_open(name,mode):
"""
mode: os.O_RDWR | os.O_CREAT | os.O_EXCL to create a new block
mode: os.O_RDWR to open an existing block
"""
if isinstance(name, bytes):
name = ctypes.create_string_buffer(name)
elif isinstance(name, unicode):
name = ctypes.create_unicode_buffer(name)
else:
raise TypeError("`name` must be `bytes` or `unicode`")
result = _shm_open(
name,
ctypes.c_int(mode),
ctypes.c_ushort(stat.S_IRUSR | stat.S_IWUSR)
)
if result == -1:
raise RuntimeError(os.strerror(ctypes.get_errno()))
return result
def shm_unlink(name):
if isinstance(name, bytes):
name = ctypes.create_string_buffer(name)
elif isinstance(name, unicode):
name = ctypes.create_unicode_buffer(name)
else:
raise TypeError("`name` must be `bytes` or `unicode`")
result = _shm_unlink(name)
if result == -1:
raise RuntimeError(os.strerror(ctypes.get_errno()))
VDL_SHM_NAME_PREFIX = 'Local/VDLSharedBuffer'
VDL_SHM_SIZE_MAX = 70 * 1024 * 1024 # 70MB for 10 image files
VDL_SHM_SIZE_CURRENT = 0
class vdl_shm_posix(vdl_shm_common):
"""
"""
def __init__(self, shm_name, shm_size=VDL_SHM_SIZE_CURRENT, shm_mode=VDL_SHM_MODE_OPEN):
assert shm_size >= 0 and shm_size <= VDL_SHM_SIZE_MAX
super(vdl_shm_posix,self).__init__(shm_name,shm_size,shm_mode)
@property
def shm_name_native(self):
return VDL_SHM_NAME_PREFIX + '#' + self.shm_name
def create(self,shm_size):
shm_name_native = self.shm_name_native # shm_name
shm_mode = self.shm_mode
if shm_mode == VDL_SHM_MODE_CREATE:
assert shm_size > 0
shm_mode_native = os.O_RDWR | os.O_CREAT | os.O_EXCL
fid = shm_open(shm_name_native,shm_mode_native)
os.ftruncate(fid, shm_size)
else:
shm_mode_native = os.O_RDWR
fid = shm_open(shm_name_native,shm_mode_native)
# create shm object for python I/O
_shm = mmap.mmap(fid, shm_size)
self._shm = _shm
# get shm size
_shm.seek(0,2) # end of file
self._shm_size = _shm.tell()
print('vdl_shm_posix: created, size={:d}'.format(self._shm_size))
def destroy(self):
shm_mode = self.shm_mode
shm_name_native = self.shm_name_native
if shm_mode == VDL_SHM_MODE_CREATE:
shm_unlink(shm_name_native)
if self._shm_memoryview is not None:
self._shm_memoryview.release()
self._shm_memoryview = None
if self._shm is not None:
self._shm.flush()
self._shm.close()
self._shm = None
if os.name == 'nt':
vdl_shm = vdl_shm_win
else:
vdl_shm = vdl_shm_posix
'''
##############################################################
## example: exp-shm-ipc-requester.py
# coding: utf-8
from traceback import print_exc
import os
import struct
import cv2
import socket
if os.path.isdir('G:/Train_ImageSet'):
images_dir = 'G:/Train_ImageSet/Mobis_AI_Image/2019-06-03/KMC_D-Audio, 통합형 AVN 1.0/부팅로고_UVO'
else:
images_dir = '/G/Train_ImageSet/Mobis_AI_Image/2019-06-03/KMC_D-Audio, 통합형 AVN 1.0/부팅로고_UVO'
images = [
'KMC_UVO_EV.bmp',
'KMC_UVO_HEV.bmp',
'KMC_UVO_PHEV.bmp',
'KMC_UVO_canada_01.bmp',
'KMC_UVO_canada_02.bmp',
'KMC_UVO_electric.bmp',
'KMC_UVO_hybrid.bmp',
'KMC_UVO_plug-in.bmp',
'KMC_UVO_seamless_170504.bmp',
'KMC_UVO_seamless_180227.bmp',
]
# lengths[] 를 만든다
lengths = [0] * 10
for i in range(10):
fn = os.path.join(images_dir,images[i])
lengths[i] = os.stat(fn).st_size
ipaddr = '127.0.0.1'
port = 19999
_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_socket.connect((ipaddr,port))
# 예시 IPC 패킷 헤더 44
header = bytearray(44)
# 예시 IPC 요청 코드 'IMGS'
header[:4] = 'IMGS'.encode('ascii')
# 예시 IPC 헤더 - 파일 갯수는 10개로 고정
for i in range(10):
struct.pack_into('<I',header,4+4*i,lengths[i])
print('header:',header)
# 예시 IPC 헤더 부분 전송
_socket.send(header)
# 예시 IPC 데이터 전송
for i in range(10):
fn = os.path.join(images_dir,images[i])
with open(fn,'rb') as f:
_socket.sendfile(f)
_socket.close()
'''
'''
##############################################################
## example: exp-shm-ipc-requester.py
# coding: utf-8
from traceback import print_exc
import os
import struct
import cv2
import socket
if os.path.isdir('G:/Train_ImageSet'):
images_dir = 'G:/Train_ImageSet/Mobis_AI_Image/2019-06-03/KMC_D-Audio, 통합형 AVN 1.0/부팅로고_UVO'
else:
images_dir = '/G/Train_ImageSet/Mobis_AI_Image/2019-06-03/KMC_D-Audio, 통합형 AVN 1.0/부팅로고_UVO'
images = [
'KMC_UVO_EV.bmp',
'KMC_UVO_HEV.bmp',
'KMC_UVO_PHEV.bmp',
'KMC_UVO_canada_01.bmp',
'KMC_UVO_canada_02.bmp',
'KMC_UVO_electric.bmp',
'KMC_UVO_hybrid.bmp',
'KMC_UVO_plug-in.bmp',
'KMC_UVO_seamless_170504.bmp',
'KMC_UVO_seamless_180227.bmp',
]
# lengths[] 를 만든다
lengths = [0] * 10
for i in range(10):
fn = os.path.join(images_dir,images[i])
lengths[i] = os.stat(fn).st_size
ipaddr = '127.0.0.1'
port = 19999
_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_socket.connect((ipaddr,port))
# 예시 IPC 패킷 헤더 44
header = bytearray(44)
# 예시 IPC 요청 코드 'IMGS'
header[:4] = 'IMGS'.encode('ascii')
# 예시 IPC 헤더 - 파일 갯수는 10개로 고정
for i in range(10):
struct.pack_into('<I',header,4+4*i,lengths[i])
print('header:',header)
# 예시 IPC 헤더 부분 전송
_socket.send(header)
# 예시 IPC 데이터 전송
for i in range(10):
fn = os.path.join(images_dir,images[i])
with open(fn,'rb') as f:
_socket.sendfile(f)
_socket.close()
'''
'''
##############################################################
## example: exp-shm-read.py
##
# coding: utf-8
import os
import hashlib
import cv2
import numpy as np
from lib.vdl_shm import vdl_shm, VDL_SHM_SIZE_MAX, VDL_SHM_MODE_CREATE, VDL_SHM_MODE_OPEN
images_shm = vdl_shm('test_worker',VDL_SHM_SIZE_MAX,VDL_SHM_MODE_OPEN)
num_files = images_shm.pool_num_files
im_list = []
for i in range(num_files):
buf = images_shm.get(i)
im = cv2.imdecode(np.frombuffer(buf,dtype=np.uint8),flags=cv2.IMREAD_UNCHANGED)
print('image {:d}, bytes {:d}, shape {}'.format(i,len(buf),im.shape))
im_list.append(im)
buf.release()
# %matplotlib inline
import matplotlib.pyplot as plt
fig, axs = plt.subplots(3, 4, figsize=(11.5,11.5))
for r in range(3):
for c in range(4):
ax = axs[r,c]
i = r * 4 + c
if i < len(im_list):
ax.imshow(im_list[i])
ax.axis('off')
images_shm.destroy()
assert 0
from lib.vdl_shm import shm_unlink, VDL_SHM_NAME_PREFIX
shm_unlink(VDL_SHM_NAME_PREFIX + '#test_worker')
'''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment