Created
July 26, 2017 12:14
-
-
Save ribtoks/8be6425a7a78b94e03948ea64fd1e171 to your computer and use it in GitHub Desktop.
BerkeleyDB Qt wrappers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "database.h" | |
#include <QDir> | |
#include <cstring> | |
#include <cmath> | |
#include <db.h> | |
#include "../Common/defines.h" | |
#include "constants.h" | |
#define DEFAULT_PAGESIZE (1024) | |
#define DEFAULT_CONCURRENT_TRANSACTIONS (100) | |
#define DEFAULT_DEADLOCK_DETECTION_SECONDS (10) | |
#define MEMORY_ONE_MB (1024*1024) | |
// memory required for locks, db handles and statistics overhead | |
#define MAX_MEMORY_GBYTES (0) | |
#ifdef QT_DEBUG | |
#define MAX_MEMORY_BYTES (2*MEMORY_ONE_MB) | |
#else | |
#define MAX_MEMORY_BYTES (4*MEMORY_ONE_MB) | |
#endif | |
#define DEFAULT_CACHE_SPLIT_PARTS (5) | |
#define DEFAULT_CACHE_GBYTES (0) | |
#define MAX_CACHE_GBYTES (0) | |
#ifdef QT_DEBUG | |
#define DEFAULT_CACHE_BYTES (10*MEMORY_ONE_MB) | |
#define MAX_CACHE_BYTES (100*MEMORY_ONE_MB) | |
#else | |
#define DEFAULT_CACHE_BYTES (100*MEMORY_ONE_MB) | |
#define MAX_CACHE_BYTES (200*MEMORY_ONE_MB) | |
#endif | |
#ifdef QT_DEBUG | |
#define BUFFER_EXPAND_ATTEMPTS 20 | |
#else | |
#define BUFFER_EXPAND_ATTEMPTS 5 | |
#endif | |
namespace Helpers { | |
QString ensureDBDirectoryExists(const QString &dbDirName) { | |
QString appDataPath = XPIKS_USERDATA_PATH; | |
QString path; | |
if (!appDataPath.isEmpty()) { | |
path = QDir::cleanPath(appDataPath + QDir::separator() + dbDirName); | |
QDir dbDir(path); | |
if (!dbDir.exists()) { | |
LOG_INFO << "Creating db dir" << path; | |
QDir().mkpath(path); | |
} | |
} else { | |
path = QDir::currentPath(); | |
} | |
return path; | |
} | |
Database::Database(__db_env *environment, int maxBufferSize, AsyncCoordinator *finalizeCoordinator): | |
m_FinalizeCoordinator(finalizeCoordinator), | |
m_Environment(environment), | |
m_Database(nullptr), | |
m_StartValueBufferSize(maxBufferSize), | |
m_IsOpened(false) | |
{ | |
Q_ASSERT(maxBufferSize > 0); | |
Q_ASSERT(environment != nullptr); | |
AsyncCoordinatorLocker locker(m_FinalizeCoordinator); | |
Q_UNUSED(locker); | |
} | |
Database::~Database() { | |
Q_ASSERT(m_IsOpened == false); | |
if (m_IsOpened) { | |
close(); | |
} | |
} | |
bool Database::open(const char *dbName) { | |
Q_ASSERT(m_Database == nullptr); | |
LOG_DEBUG << dbName; | |
bool success = false; | |
int result = 0; | |
do { | |
result = db_create(&m_Database, m_Environment, 0); | |
if (result != 0) { | |
LOG_WARNING << "Failed creating a database instance:" << db_strerror(result); | |
break; | |
} | |
result = m_Database->set_pagesize(m_Database, DEFAULT_PAGESIZE); | |
if (result != 0) { | |
LOG_WARNING << "Failed to set pagesize:" << db_strerror(result); | |
break; | |
} | |
u_int32_t db_flags; | |
db_flags = DB_CREATE | // create the database if not exists | |
DB_AUTO_COMMIT | // Enclose the DB->open() call within a transaction | |
DB_THREAD; // thread-safety | |
result = m_Database->open(m_Database, /* Pointer to the database */ | |
NULL, /* Txn pointer */ | |
dbName, /* File name */ | |
NULL, /* Logical db name */ | |
DB_BTREE, /* Database type (using btree) */ | |
db_flags, /* Open flags */ | |
0); /* File mode. Using defaults */ | |
if (result != 0) { | |
LOG_WARNING << "Failed to open the database" << dbName << db_strerror(result); | |
break; | |
} | |
m_IsOpened = true; | |
success = true; | |
} while (false); | |
return success; | |
} | |
void Database::close() { | |
LOG_DEBUG << "#"; | |
Q_ASSERT(m_Database != nullptr); | |
// sync and close database | |
int result = m_Database->close(m_Database, 0); | |
if (result != 0) { | |
LOG_WARNING << "Closing database failed:" << db_strerror(result); | |
} else { | |
m_IsOpened = false; | |
m_Database = nullptr; | |
} | |
AsyncCoordinatorUnlocker unlocker(m_FinalizeCoordinator); | |
Q_UNUSED(unlocker); | |
} | |
void Database::sync() { | |
LOG_DEBUG << "#"; | |
Q_ASSERT(m_Database != nullptr); | |
int result = m_Database->sync(m_Database, 0); | |
if (result != 0) { | |
LOG_WARNING << "Database sync failed" << db_strerror(result); | |
} | |
} | |
void Database::warmupCache(int percent) { | |
LOG_DEBUG << "#"; | |
Q_ASSERT((0 <= percent) && (percent <= 100)); | |
Q_ASSERT(m_Database != nullptr); | |
Q_ASSERT(m_Environment != nullptr); | |
// here and below code from "Warming the memory pool" BerkeleyDB docs | |
do { | |
u_int32_t pagesize, gbytes, bytes; | |
int ret = 0, numcachepages; | |
/* Find out how many pages can fit at most in the cache */ | |
ret = m_Environment->get_mp_pagesize(m_Environment, &pagesize); | |
if (ret != 0) { | |
LOG_WARNING << "Error retrieving the cache pagesize:" << db_strerror(ret); | |
break; | |
} | |
ret = m_Environment->get_cache_max(m_Environment, &gbytes, &bytes); | |
if (ret != 0) { | |
LOG_WARNING << "Error retrieving maximum cache size:" << db_strerror(ret); | |
break; | |
} | |
/* Avoid an overflow by first calculating pages per gigabyte. */ | |
numcachepages = gbytes * ((1024 * 1024 * 1024) / pagesize); | |
numcachepages += bytes / pagesize; | |
LOG_DEBUG << numcachepages << "can fit"; | |
double realPages = floor(numcachepages * ((double)percent / 100.0)); | |
int result = doWarmupCache((int)realPages); | |
if (result != 0) { | |
LOG_WARNING << "Cache warmup failed"; | |
} else { | |
LOG_INFO << "Cache warmup succeeded"; | |
} | |
} while (false); | |
} | |
bool Database::exists(const QByteArray &key) { | |
Q_ASSERT(m_Database != nullptr); | |
int result = checkExists(key); | |
bool missing = result == DB_NOTFOUND; | |
return !missing; | |
} | |
bool Database::tryGetKey(const QByteArray &key, QByteArray &value) { | |
Q_ASSERT(m_Database != nullptr); | |
int attempts = BUFFER_EXPAND_ATTEMPTS; | |
int result = 0; | |
int bufferSize = m_StartValueBufferSize; | |
while (attempts--) { | |
result = doGetKey(key, value, bufferSize); | |
if ((result != DB_NOTFOUND) && (result != DB_KEYEMPTY)) { | |
LOG_WARNING << "Error reading" << key << "from database:" << db_strerror(result); | |
} | |
if (result == DB_BUFFER_SMALL) { | |
bufferSize = 2*bufferSize + 1; | |
} else { | |
break; | |
} | |
} | |
// do not save this for debug in order to test code above | |
#ifdef QT_NO_DEBUG | |
if (result == 0) { | |
// if we have concurrency issue here, then anyway eventually | |
// max buffer size will be properly adjusted | |
m_MaxValueBufferSize = bufferSize; | |
} | |
#endif | |
bool success = result == 0; | |
return success; | |
} | |
bool Database::trySetKey(const QByteArray &key, const QByteArray &value) { | |
Q_ASSERT(m_Database != nullptr); | |
int result = doSetKey(key, value); | |
if (result != 0) { | |
LOG_WARNING << "Failed to set a key" << key << "with error:" << db_strerror(result); | |
} | |
bool success = result == 0; | |
return success; | |
} | |
bool Database::deleteRecord(const QByteArray &key) { | |
Q_ASSERT(m_Database != nullptr); | |
int result = doDeleteRecord(key); | |
if (result != 0) { | |
LOG_WARNING << "Failed to delete a key" << key << ":" << db_strerror(result); | |
} | |
bool success = result == 0; | |
return success; | |
} | |
std::unique_ptr<Database::Iterator> Database::getIterator() { | |
LOG_DEBUG << "#"; | |
std::unique_ptr<Database::Iterator> it(new Database::Iterator(m_Database)); | |
it->initialize(); | |
LOG_DEBUG << "Iterator is valid:" << it->isValid(); | |
return it; | |
} | |
int Database::checkExists(const QByteArray &key) { | |
DBT dbtKey; | |
memset(&dbtKey, 0, sizeof(DBT)); | |
u_int32_t keySize = key.size(); | |
const char *keyData = key.data(); | |
dbtKey.data = (void*)keyData; | |
dbtKey.size = keySize; | |
int result = m_Database->exists(m_Database, NULL, &dbtKey, 0); | |
return result; | |
} | |
int Database::doGetKey(const QByteArray &key, QByteArray &value, int bufferSize) { | |
DBT dbtKey, dbtData; | |
memset(&dbtKey, 0, sizeof(DBT)); | |
memset(&dbtData, 0, sizeof(DBT)); | |
u_int32_t keySize = key.size(); | |
const char *keyData = key.data(); | |
dbtKey.data = (void*)keyData; | |
dbtKey.size = keySize; | |
value.fill(0, bufferSize); | |
char *valueData = value.data(); | |
dbtData.data = valueData; | |
dbtData.ulen = bufferSize; | |
dbtData.flags = DB_DBT_USERMEM; | |
int result = m_Database->get(m_Database, NULL, &dbtKey, &dbtData, 0); | |
return result; | |
} | |
int Database::doSetKey(const QByteArray &key, const QByteArray &value) { | |
DBT dbtKey, dbtData; | |
memset(&dbtKey, 0, sizeof(DBT)); | |
memset(&dbtData, 0, sizeof(DBT)); | |
u_int32_t keySize = key.size(); | |
const char *keyData = key.data(); | |
dbtKey.data = (void*)keyData; | |
dbtKey.size = keySize; | |
u_int32_t valueSize = value.size() + 1; | |
const char *valueData = value.data(); | |
dbtData.data = (void*)valueData; | |
dbtData.size = valueSize; | |
int result = m_Database->put(m_Database, NULL, &dbtKey, &dbtData, 0); | |
return result; | |
} | |
int Database::doDeleteRecord(const QByteArray &key) { | |
DBT dbtKey; | |
memset(&dbtKey, 0, sizeof(DBT)); | |
u_int32_t keySize = key.size(); | |
const char *keyData = key.data(); | |
dbtKey.data = (void*)keyData; | |
dbtKey.size = keySize; | |
int result = m_Database->del(m_Database, NULL, &dbtKey, 0); | |
return result; | |
} | |
int Database::doWarmupCache(int pagesCount) { | |
LOG_INFO << pagesCount; | |
// here and below code from "Warming the memory pool" BerkeleyDB docs | |
DB_MPOOLFILE *mpf = 0; | |
void *page_addrp = 0; | |
db_pgno_t page_number = 0; | |
int ret = 0; | |
int pagecount = 0; | |
/* | |
* Get the mpool handle | |
*/ | |
mpf = m_Database->get_mpf(m_Database); | |
/* Load pages until there are no more pages in the database, | |
* or until we've put as many pages into the cache as will fit. | |
*/ | |
while (ret != DB_PAGE_NOTFOUND && pagecount < pagesCount) { | |
/* | |
* Get the page from the cache. This causes DB to retrieve | |
* the page from disk if it isn't already in the cache. | |
*/ | |
ret = mpf->get(mpf, &page_number, 0, 0, &page_addrp); | |
if (ret && ret != DB_PAGE_NOTFOUND) { | |
LOG_WARNING << "Error retrieving db page:" << page_number << db_strerror(ret); | |
return ret; | |
} | |
/* | |
* If a page was retrieved, put it back into the cache. This | |
* releases the page latch so that the page can be evicted | |
* if DB needs more room in the cache at some later time. | |
*/ | |
if (ret != DB_PAGE_NOTFOUND) { | |
ret = mpf->put(mpf, page_addrp, DB_PRIORITY_UNCHANGED, 0); | |
if (ret) { | |
LOG_WARNING << "Error putting db page:" << page_number << db_strerror(ret); | |
return ret; | |
} | |
} | |
++page_number; | |
++pagecount; | |
} | |
LOG_INFO << pagecount << "pages loaded"; | |
return 0; | |
} | |
DatabaseManager::DatabaseManager(): | |
QObject(), | |
m_Environment(NULL) | |
{ | |
QObject::connect(&m_FinalizeCoordinator, &Helpers::AsyncCoordinator::statusReported, | |
this, &DatabaseManager::onReadyToFinalize); | |
} | |
bool DatabaseManager::initialize() { | |
const bool withoutRecovery = false; | |
const bool withRecovery = true; | |
int result = 0; | |
do { | |
result = doInitialize(Constants::DB_DIR, withoutRecovery); | |
if (result == 0) { break; } | |
LOG_WARNING << "Retrying environment initialization with recovery"; | |
closeEnvironment(); | |
result = doInitialize(Constants::DB_DIR, withRecovery); | |
if (result == 0) { | |
LOG_INFO << "Recovery finished successfully!"; | |
break; | |
} | |
LOG_WARNING << "Switching to failover DB environment"; | |
result = doInitialize(Constants::DB_DIR_FAILOVER, withoutRecovery); | |
if (result == 0) { | |
LOG_INFO << "Successfully switched to failover environment"; | |
break; | |
} | |
LOG_WARNING << "Retrying environment initialization with recovery"; | |
closeEnvironment(); | |
result = doInitialize(Constants::DB_DIR_FAILOVER, withRecovery); | |
if (result == 0) { | |
LOG_INFO << "Failover recovery finished successfully!"; | |
break; | |
} | |
LOG_WARNING << "Failed to initialize the environment"; | |
} | |
while (false); | |
bool success = result == 0; | |
return success; | |
} | |
void DatabaseManager::finalize() { | |
LOG_DEBUG << "#"; | |
closeAll(); | |
closeEnvironment(); | |
LOG_INFO << "Finalize finished"; | |
} | |
int DatabaseManager::doInitialize(const QString &dbDirName, bool withRecovery) { | |
Q_ASSERT(m_Environment == nullptr); | |
LOG_INFO << "with recovery =" << withRecovery; | |
int result = 0; | |
do { | |
result = db_env_create(&m_Environment, 0); | |
if (result != 0) { | |
LOG_WARNING << "Failed to create an environment:" << db_strerror(result); | |
break; | |
} | |
LOG_DEBUG << "DB Environment allocated"; | |
u_int32_t additional_flags; | |
additional_flags = DB_AUTO_COMMIT | /*wrap all operations in transactions*/ | |
DB_TXN_NOSYNC; /*do not flush write-ahead logs*/ | |
result = m_Environment->set_flags(m_Environment, additional_flags, 1); | |
if (result != 0) { | |
LOG_WARNING << "Failed to set additional flags:" << db_strerror(result); | |
break; | |
} | |
// deadlock detection | |
result = m_Environment->set_timeout(m_Environment, DEFAULT_DEADLOCK_DETECTION_SECONDS, DB_SET_TXN_TIMEOUT); | |
if (result != 0) { | |
LOG_WARNING << "Failed to set the deadlock detection timeout:" << db_strerror(result); | |
break; | |
} | |
result = m_Environment->set_tx_max(m_Environment, DEFAULT_CONCURRENT_TRANSACTIONS); | |
if (result != 0) { | |
LOG_WARNING << "Failed to set concurrent transactions number:" << db_strerror(result); | |
break; | |
} | |
result = m_Environment->set_cache_max(m_Environment, MAX_CACHE_GBYTES, MAX_CACHE_BYTES); | |
if (result != 0) { | |
LOG_WARNING << "Failed to set max cache size:" << db_strerror(result); | |
break; | |
} | |
result = m_Environment->set_cachesize(m_Environment, DEFAULT_CACHE_GBYTES, DEFAULT_CACHE_BYTES, DEFAULT_CACHE_SPLIT_PARTS); | |
if (result != 0) { | |
LOG_WARNING << "Failed to set cache size:" << db_strerror(result); | |
break; | |
} | |
result = m_Environment->set_memory_max(m_Environment, MAX_MEMORY_GBYTES, MAX_MEMORY_BYTES); | |
if (result != 0) { | |
LOG_WARNING << "Failed to set memory max size:" << db_strerror(result); | |
break; | |
} | |
LOG_DEBUG << "DB Environment configured"; | |
QString dbDirPath = ensureDBDirectoryExists(dbDirName); | |
QByteArray dbDir = dbDirPath.toUtf8(); | |
char *dbDirData = dbDir.data(); | |
u_int32_t env_flags; | |
env_flags = 0 | | |
DB_INIT_LOCK | /* Initialize locking */ | |
DB_INIT_LOG | /* Initialize logging */ | |
DB_INIT_MPOOL | /* Initialize the cache */ | |
DB_INIT_TXN | /* Initialize transactions */ | |
DB_CREATE | /* If the environment does not exist, create it. */ | |
//DB_PRIVATE | DO NOT specify if using failcheck or recovery | |
DB_THREAD | /* Multithreading support */ | |
DB_RECOVER | /* run recovery */ | |
DB_USE_ENVIRON; /* allow any paths for db files */ | |
if (withRecovery) { | |
env_flags = env_flags | DB_RECOVER_FATAL; | |
} | |
result = m_Environment->open(m_Environment, dbDirData, env_flags, 0); | |
if (result != 0) { | |
LOG_WARNING << "Failed to open an environment:" << db_strerror(result); | |
break; | |
} | |
LOG_DEBUG << "DB Environment opened"; | |
m_DBDirPath = dbDirPath; | |
} while (false); | |
return result; | |
} | |
int DatabaseManager::closeEnvironment() { | |
LOG_DEBUG << "#"; | |
Q_ASSERT(m_Environment != nullptr); | |
int result = m_Environment->close(m_Environment, 0); | |
if (result != 0) { | |
LOG_WARNING << "Failed to close an environment:" << db_strerror(result); | |
} else { | |
LOG_DEBUG << "Environment closed"; | |
m_Environment = nullptr; | |
} | |
return result; | |
} | |
std::shared_ptr<Database> DatabaseManager::openDatabase(const char *dbName) { | |
LOG_DEBUG << dbName; | |
Q_ASSERT(m_Environment != nullptr); | |
std::shared_ptr<Database> db(new Database(m_Environment, DEFAULT_READ_BUFFER_START_SIZE, &m_FinalizeCoordinator)); | |
bool openSucceded = db->open(dbName); | |
if (!openSucceded) { | |
LOG_WARNING << "Failed to open" << dbName; | |
db->close(); | |
db.reset(); | |
} else { | |
LOG_INFO << "Opened" << dbName << "database"; | |
m_DatabaseArray.push_back(db); | |
} | |
return db; | |
} | |
void DatabaseManager::createCheckpoint() { | |
LOG_DEBUG << "#"; | |
Q_ASSERT(m_Environment != nullptr); | |
int result = m_Environment->txn_checkpoint(m_Environment, 0, 0, 0); | |
if (result != 0) { | |
LOG_WARNING << "Failed to checkpoint environment:" << db_strerror(result); | |
} else { | |
LOG_INFO << "Checkpoint created"; | |
} | |
} | |
void DatabaseManager::prepareToFinalize() { | |
LOG_DEBUG << "#"; | |
m_FinalizeCoordinator.allBegun(); | |
} | |
void DatabaseManager::onReadyToFinalize(int status) { | |
LOG_INFO << status; | |
finalize(); | |
} | |
void DatabaseManager::closeAll() { | |
LOG_DEBUG << "#"; | |
Q_ASSERT(m_Environment != nullptr); | |
for (auto &db: m_DatabaseArray) { | |
db->close(); | |
} | |
LOG_INFO << "Databases closed"; | |
} | |
Database::Iterator::Iterator(__db *database): | |
m_Database(database), | |
m_Cursor(nullptr), | |
m_CurrentKey(new DBT()), | |
m_CurrentValue(new DBT()), | |
m_IsValid(false), | |
m_IsInitialized(false) | |
{ | |
Q_ASSERT(database != nullptr); | |
} | |
Database::Iterator::~Iterator() { | |
if (m_CurrentKey != nullptr) { | |
delete m_CurrentKey; | |
} | |
if (m_CurrentValue != nullptr) { | |
delete m_CurrentValue; | |
} | |
if (m_Cursor != nullptr) { | |
m_Cursor->close(m_Cursor); | |
} | |
} | |
bool Database::Iterator::moveNext() { | |
Q_ASSERT(m_IsInitialized); | |
if (!m_IsValid) { | |
return false; | |
} | |
const bool success = doMoveNext(); | |
m_IsValid = success; | |
return success; | |
} | |
void Database::Iterator::initialize() { | |
Q_ASSERT(!m_IsInitialized); | |
u_int32_t flags = 0; | |
// optimize for bulk operations to continue on the | |
// same database page as the previous operation | |
flags |= DB_CURSOR_BULK; | |
int result = m_Database->cursor(m_Database, NULL, &m_Cursor, flags); | |
if (result != 0) { | |
LOG_WARNING << "Failed to checkpoint environment:" << db_strerror(result); | |
m_IsValid = false; | |
} else { | |
m_IsValid = true; | |
} | |
m_IsInitialized = true; | |
} | |
bool Database::Iterator::doMoveNext() { | |
int attempts = BUFFER_EXPAND_ATTEMPTS; | |
int result = 0; | |
int keyBufferSize = m_KeyStartBufferSize; | |
int valueBufferSize = m_ValueStartBufferSize; | |
while (attempts--) { | |
result = moveCursor(keyBufferSize, valueBufferSize); | |
if (result == DB_BUFFER_SMALL) { | |
keyBufferSize = 2*keyBufferSize + 1; | |
valueBufferSize = 2*valueBufferSize + 1; | |
} else { | |
if (result == DB_NOTFOUND) { | |
LOG_INFO << "Reached the end of the database"; | |
} else { | |
LOG_WARNING << "Error moving cursor through database:" << db_strerror(result); | |
} | |
break; | |
} | |
} | |
// do not save this for debug in order to test code above | |
#ifdef QT_NO_DEBUG | |
if (result == 0) { | |
// if we have concurrency issue here, then anyway eventually | |
// max buffer size will be properly adjusted | |
m_KeyStartBufferSize = keyBufferSize; | |
m_ValueStartBufferSize = valueBufferSize; | |
} | |
#endif | |
bool success = result == 0; | |
return success; | |
} | |
int Database::Iterator::moveCursor(int keyBufferSize, int valueBufferSize) { | |
memset(m_CurrentKey, 0, sizeof(DBT)); | |
memset(m_CurrentValue, 0, sizeof(DBT)); | |
m_KeyBuffer.fill(0, keyBufferSize); | |
u_int32_t keySize = m_KeyBuffer.size(); | |
char *keyData = m_KeyBuffer.data(); | |
m_CurrentKey->data = (void*)keyData; | |
m_CurrentKey->size = keySize; | |
m_CurrentKey->flags = DB_DBT_USERMEM; | |
// ---- | |
m_ValueBuffer.fill(0, valueBufferSize); | |
u_int32_t valueSize = m_ValueBuffer.size(); | |
char *valueData = m_ValueBuffer.data(); | |
m_CurrentValue->data = (void*)valueData; | |
m_CurrentValue->size = valueSize; | |
m_CurrentValue->flags = DB_DBT_USERMEM; | |
u_int32_t flags = 0; | |
// the cursor is moved to the next key/data pair of the database, and that pair is returned. | |
// In the presence of duplicate key values, the value of the key may not change. | |
flags |= DB_NEXT; | |
int result = m_Cursor->get(m_Cursor, m_CurrentKey, m_CurrentValue, flags); | |
return result; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef DATABASE_H | |
#define DATABASE_H | |
#include <QByteArray> | |
#include <QString> | |
#include <vector> | |
#include <memory> | |
#include "asynccoordinator.h" | |
struct __db; | |
struct __db_env; | |
struct __dbc; | |
struct __db_dbt; | |
#ifdef QT_DEBUG | |
// test if expanding buffer code works well in tryGetKey() | |
#define DEFAULT_READ_BUFFER_START_SIZE (10) | |
#else | |
#define DEFAULT_READ_BUFFER_START_SIZE (2*1024) | |
#endif | |
namespace Helpers { | |
class Database { | |
public: | |
Database(__db_env *environment, int maxBufferSize=DEFAULT_READ_BUFFER_START_SIZE, AsyncCoordinator *finalizeCoordinator=nullptr); | |
virtual ~Database(); | |
public: | |
class Iterator { | |
friend class Database; | |
public: | |
Iterator(__db *database); | |
virtual ~Iterator(); | |
// disable unwanted behavior | |
Iterator(const Iterator &) = delete; | |
Iterator &operator=(const Iterator &) = delete; | |
public: | |
bool moveNext(); | |
const QByteArray &getCurrentKey() const { return m_KeyBuffer; } | |
const QByteArray &getCurrentValue() const { return m_ValueBuffer; } | |
bool isValid() const { return m_IsValid; } | |
private: | |
void initialize(); | |
bool doMoveNext(); | |
int moveCursor(int keyBufferSize, int valueBufferSize); | |
private: | |
__db *m_Database; | |
__dbc *m_Cursor; | |
__db_dbt *m_CurrentKey; | |
__db_dbt *m_CurrentValue; | |
QByteArray m_KeyBuffer; | |
QByteArray m_ValueBuffer; | |
int m_KeyStartBufferSize; | |
int m_ValueStartBufferSize; | |
volatile bool m_IsValid; | |
volatile bool m_IsInitialized; | |
}; | |
public: | |
bool isOpened() const { return m_IsOpened; } | |
public: | |
bool open(const char *dbName); | |
void close(); | |
void sync(); | |
void warmupCache(int percent = 50); | |
public: | |
bool exists(const QByteArray &key); | |
bool tryGetKey(const QByteArray &key, QByteArray &value); | |
bool trySetKey(const QByteArray &key, const QByteArray &data); | |
bool deleteRecord(const QByteArray &key); | |
std::unique_ptr<Iterator> getIterator(); | |
private: | |
int checkExists(const QByteArray &key); | |
int doGetKey(const QByteArray &key, QByteArray &value, int bufferSize); | |
int doSetKey(const QByteArray &key, const QByteArray &value); | |
int doDeleteRecord(const QByteArray &key); | |
int doWarmupCache(int pagesCount); | |
private: | |
AsyncCoordinator *m_FinalizeCoordinator; | |
__db_env *m_Environment; | |
__db *m_Database; | |
volatile int m_StartValueBufferSize; | |
volatile bool m_IsOpened; | |
}; | |
class DatabaseManager: public QObject { | |
Q_OBJECT | |
public: | |
DatabaseManager(); | |
public: | |
bool initialize(); | |
private: | |
void finalize(); | |
int doInitialize(const QString &dbDirName, bool withRecovery); | |
int closeEnvironment(); | |
public: | |
std::shared_ptr<Database> openDatabase(const char *dbName); | |
// should be run from time to time | |
void createCheckpoint(); | |
void prepareToFinalize(); | |
private slots: | |
void onReadyToFinalize(int status); | |
private: | |
void closeAll(); | |
private: | |
AsyncCoordinator m_FinalizeCoordinator; | |
__db_env *m_Environment; | |
QString m_DBDirPath; | |
std::vector<std::shared_ptr<Database> > m_DatabaseArray; | |
}; | |
} | |
#endif // DATABASE_H |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment