Last active
July 18, 2022 07:10
-
-
Save testillano/eaf0c796966604b12f850e78e70c5d67 to your computer and use it in GitHub Desktop.
SafeFile with close delayed and max files opened control
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/////////////////////////////////////////////////////////////// | |
// Safe file: allow safe writting of text/binary files. | |
// Close delay configurable. | |
// Max opened files controlled by condition variable. | |
// | |
// LINK: g++ main.cc -l pthread -l boost_system -l boost_thread | |
/////////////////////////////////////////////////////////////// | |
#include <iostream> | |
#include <thread> | |
#include <vector> | |
#include <fstream> | |
#include <atomic> | |
#include <mutex> | |
#include <string> | |
#include <unistd.h> // sysconf | |
#include <boost/asio.hpp> | |
#include <boost/thread.hpp> | |
#include <boost/date_time/posix_time/posix_time.hpp> | |
class SafeFile { | |
std::string path_; | |
std::ofstream file_; | |
int max_open_files_; | |
std::mutex mutex_; // write file mutex | |
bool opened_; | |
boost::asio::deadline_timer *timer_{}; | |
unsigned int close_delay_us_; | |
boost::asio::io_service *io_service_{}; | |
void delayedClose() { | |
if (!timer_) timer_ = new boost::asio::deadline_timer(*io_service_, boost::posix_time::microseconds(close_delay_us_)); | |
timer_->cancel(); | |
timer_->expires_from_now(boost::posix_time::microseconds(close_delay_us_)); | |
timer_->async_wait([this] (const boost::system::error_code& e) { | |
if( e ) return; // probably, we were cancelled (boost::asio::error::operation_aborted) | |
close(); | |
}); | |
} | |
public: | |
/** | |
* Constructor | |
* | |
* @param timersIoService asio io service which will be used to delay close | |
* operations with the intention to reduce overhead in some scenarios. | |
* @param path file path to write. It could be relative (to execution path) or absolute. | |
* @param closeDelayUs delay after last write operation, to close the file. By default | |
* it is configured to 1 second, something appropiate to log over long term files. | |
* Zero value means that no planned close is scheduled, so the file is opened, | |
* written and closed in the same moment. This could be interesting for write | |
* many different files when they are not rewritten. If they need to append | |
* data later, use @setCloseDelay to configure them as short term files | |
* taking into account the maximum number of files that your system could | |
* open simultaneously. This class will blocks new open operations when that | |
* limit is reached, to prevent file system errors. | |
* @param mode open mode. By default, text files and append is selected. You | |
* could anyway add other flags, for example for binary dumps: std::ios::binary | |
*/ | |
SafeFile (boost::asio::io_service *timersIoService, | |
const std::string& path, | |
unsigned int closeDelayUs = 1000000 /* 1 second */, | |
std::ios_base::openmode mode = std::ofstream::out | std::ios_base::app): io_service_(timersIoService), | |
path_(path), | |
close_delay_us_(closeDelayUs), | |
opened_(false), | |
timer_(nullptr) | |
{ | |
max_open_files_ = sysconf(_SC_OPEN_MAX /* 1024 probably */) - 10 /* margin just in case the process open other files */; | |
open(mode); | |
} | |
~SafeFile() { close(); delete timer_; } | |
// Opened files control: | |
static std::atomic<int> CurrentOpenedFiles; | |
static std::mutex MutexOpenedFiles; | |
static std::condition_variable OpenedFilesCV; | |
/** | |
* Open the file for writting | |
* | |
* @param mode open mode. By default, text files and append is selected. You | |
* could anyway add other flags, for example for binary dumps: std::ios::binary | |
*/ | |
void open(std::ios_base::openmode mode = std::ofstream::out | std::ios_base::app) { | |
std::unique_lock<std::mutex> lock(MutexOpenedFiles); | |
if (opened_) return; | |
//Wait until we have data or a quit signal | |
OpenedFilesCV.wait(lock, [this] | |
{ | |
return (CurrentOpenedFiles.load() < max_open_files_); | |
}); | |
// After wait, we own the lock | |
file_.open(path_, mode); | |
if (file_.is_open()) { | |
opened_ = true; | |
CurrentOpenedFiles++; | |
std::cout << "Opened file " << path_ << '\n'; | |
} | |
lock.unlock(); | |
} | |
/** | |
* Close the file | |
*/ | |
void close() { | |
std::unique_lock<std::mutex> lock(MutexOpenedFiles); | |
if (!opened_) return; | |
file_.close(); | |
opened_ = false; | |
CurrentOpenedFiles--; | |
std::cout << "Closed file " << path_ << '\n'; | |
lock.unlock(); | |
OpenedFilesCV.notify_one(); | |
} | |
/** | |
* Empty the file | |
*/ | |
void empty() { | |
open(std::ofstream::out | std::ofstream::trunc); | |
close(); | |
} | |
/** | |
* Set the delay in microseconds to close an opened file after writting over it. | |
* | |
* A value of zero will disable the delayed procedure, and close will be done | |
* after write operation (instant close). | |
* | |
* The class constructor sets 1 second by default, appropiate for logging | |
* oriented files which represent the most common and reasonable usage. | |
* | |
* We could consider 'short term files' those which are going to be rewritten | |
* (like long term ones) but when there are many of them and maximum opened | |
* files limit is a factor to take into account. So if your application is | |
* writting many different files, to optimize for example a load traffic rate | |
* of 200k req/s with a limit of 1024 concurrent files, we need a maximum | |
* delay of 1024/200000 = 0,00512 = 5 msecs. | |
* | |
* @param usecs microseconds of delay. Zero disables planned close and will be done instantly. | |
*/ | |
void setCloseDelayUs(unsigned int usecs) { | |
close_delay_us_ = usecs; | |
} | |
/** Class string representation */ | |
std::string asString() { | |
std::string result = "SafeFile | path: "; | |
result += path_; | |
result += " | size (bytes): "; | |
std::ifstream file( path_, std::ofstream::in | std::ios::ate | std::ios::binary); // valid also for text files | |
result += std::to_string(file.tellg()); | |
file.close(); | |
result += " | state: "; | |
result += (opened_ ? "opened":"closed"); | |
return result; | |
} | |
/** | |
* Write data to the file. | |
* Close could be delayed. | |
* | |
* @param data data to write | |
* @see setCloseDelay() | |
*/ | |
void write (const std::string& data) { | |
// Open file (lazy): | |
open(); | |
// Write file: | |
std::lock_guard<std::mutex> lock(mutex_); | |
file_.write(data.c_str(), data.size()); | |
// Close file: | |
if (close_delay_us_ != 0) { | |
delayedClose(); | |
} | |
else { | |
close(); | |
} | |
} | |
}; | |
std::atomic<int> SafeFile::CurrentOpenedFiles(0); | |
std::mutex SafeFile::MutexOpenedFiles; | |
std::condition_variable SafeFile::OpenedFilesCV; | |
int main () { | |
auto timersIoService = new boost::asio::io_service(); | |
std::thread tt([&] { | |
boost::asio::io_service::work work(*timersIoService); | |
timersIoService->run(); | |
}); | |
std::cout << "Timers io context running ..." << '\n'; | |
std::vector<std::thread> threads; | |
int nthreads = 10; | |
for (int n=0; n < nthreads; n++) { | |
threads.push_back(std::thread ([&tt, n, timersIoService]() { | |
auto myFile = std::make_shared<SafeFile>(timersIoService, std::string("file.txt.") + std::to_string(n)); // long term file: 1 second by default | |
//auto myFile = std::make_shared<SafeFile>(timersIoService, std::string("file.txt.") + std::to_string(n), 5000); // short term file: less overhead, more risk to reach opened files limit | |
//auto myFile = std::make_shared<SafeFile>(timersIoService, std::string("file.txt.") + std::to_string(n), 0); // instant file: atomic open/write/close: overhead | |
for(int k=0; k<500; k++) myFile->write(std::string("Hi from: ") + myFile->asString() + "\n"); | |
std::cout << "Waiting possible close operation ..." << '\n'; | |
sleep(2); | |
})); | |
} | |
for (int n=0; n < nthreads; n++) { | |
threads[n].join(); | |
} | |
boost::asio::deadline_timer exitTimer(*timersIoService, boost::posix_time::milliseconds(100)); | |
exitTimer.async_wait([&] (const boost::system::error_code& e) { timersIoService->stop(); std::cout << "Exiting ..." << '\n'; }); | |
tt.join(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment