Created
September 16, 2022 22:23
-
-
Save ericek111/b356758a9bc41bb16042b02c78d16395 to your computer and use it in GitHub Desktop.
use inotify to watch a directory and process content written to a file from another process
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <filesystem> | |
#include <chrono> | |
#include <thread> | |
#include <vector> | |
#include <map> | |
#include <condition_variable> | |
#include <cerrno> | |
#include <cstring> | |
#include <poll.h> | |
#include <csignal> | |
#include <sys/inotify.h> | |
#include <sys/ioctl.h> | |
#include <opus.h> | |
typedef struct WavHeader | |
{ | |
/* RIFF Chunk Descriptor */ | |
uint8_t RIFF[4]; // RIFF Header Magic header | |
uint32_t ChunkSize; // RIFF Chunk Size | |
uint8_t WAVE[4]; // WAVE Header | |
/* "fmt" sub-chunk */ | |
uint8_t fmt[4]; // FMT header | |
uint32_t Subchunk1Size; // Size of the fmt chunk | |
uint16_t AudioFormat; // Audio format 1=PCM,6=mulaw,7=alaw, 257=IBM Mu-Law, 258=IBM A-Law, 259=ADPCM | |
uint16_t NumOfChan; // Number of channels 1=Mono 2=Sterio | |
uint32_t SamplesPerSec; // Sampling Frequency in Hz | |
uint32_t bytesPerSec; // bytes per second | |
uint16_t blockAlign; // 2=16-bit mono, 4=16-bit stereo | |
uint16_t bitsPerSample; // Number of bits per sample | |
/* "data" sub-chunk */ | |
uint8_t Subchunk2ID[4]; // "data" string | |
uint32_t Subchunk2Size; // Sampled data length | |
} WavHeader; | |
typedef struct InotifyCompressJob { | |
std::string filePath; | |
std::thread th; | |
std::condition_variable cv; | |
std::mutex mtx; | |
std::atomic<bool> stillOpen = true; | |
} InotifyCompressJob; | |
bool g_shouldRun = true; | |
void handleNewFile(std::shared_ptr<InotifyCompressJob> job) { | |
std::cout << "Started new thread for " << job->filePath << std::endl; | |
FILE* file = fopen(job->filePath.c_str(), "r"); | |
if (file == nullptr) { | |
std::cerr << "Can't open new file '" << job->filePath << "': " << std::strerror(errno) << std::endl; | |
return; | |
} | |
int fd = fileno(file); | |
int lastBytesAvailable = 0; | |
int bytesAvailable = 0; | |
size_t readSoFar = 0; | |
bool hasHeader = false; | |
WavHeader inHeader; | |
std::vector<std::byte> inBuf; | |
std::unique_lock<std::mutex> lck(job->mtx); | |
while (g_shouldRun && job->stillOpen) { | |
job->cv.wait_for(lck, std::chrono::milliseconds(500)); | |
if (ioctl(fd, FIONREAD, &bytesAvailable) == -1) { | |
std::cerr << "ioctl error on FIONREAD for '" << job->filePath << "': " << std::strerror(errno) << std::endl; | |
continue; | |
} | |
if (lastBytesAvailable == bytesAvailable) { | |
continue; | |
} | |
if (!hasHeader) { | |
if (bytesAvailable < sizeof(inHeader)) { | |
continue; | |
} | |
ssize_t readNow = read(fd, &inHeader, sizeof(inHeader)); | |
if (readNow == -1) { | |
std::cout << "Failed to read the WAV header of '" << job->filePath << "'" << std::endl; | |
continue; | |
} | |
readSoFar += readNow; | |
inBuf.reserve(inHeader.bytesPerSec); | |
hasHeader = true; | |
} | |
if (bytesAvailable < inBuf.capacity()) { | |
// std::cout << "not enough data yet " << bytesAvailable << " / " << inBuf.capacity() << "\n"; | |
continue; | |
} | |
ssize_t readNow = read(fd, inBuf.data(), inBuf.capacity()); | |
if (readNow == -1) { | |
std::cout << "Failed to read " << inBuf.capacity() << " B from '" << job->filePath << "'" << std::endl; | |
continue; | |
} | |
readSoFar += readNow; | |
std::cout << "Got " << readNow << " / " << bytesAvailable << " in " << job->filePath << std::endl; | |
lastBytesAvailable = bytesAvailable; | |
} | |
// TODO: Process the rest of the buffer. | |
std::cout << "Closed with " << readSoFar << " B read: " << job->filePath << std::endl; | |
fclose(file); | |
} | |
int main(int argc, char** argv) { | |
if (argc < 2) { | |
std::cerr << "Usage: ./inotifycompress (path to directory with recordings)" << std::endl; | |
return 1; | |
} | |
std::filesystem::path path(argv[1]); | |
if (!std::filesystem::is_directory(path)) { | |
std::cerr << "'" << path << "' is not a directory or does not exist!" << std::endl; | |
return 2; | |
} | |
int fd = inotify_init(); | |
if (fd == -1) { | |
std::cerr << "Failed to inotify_init: " << std::strerror(errno) << std::endl; | |
return 3; | |
} | |
signal(SIGINT, [](int signum) { | |
g_shouldRun = false; | |
}); | |
signal(SIGABRT, [](int signum) { | |
g_shouldRun = false; | |
}); | |
std::map<std::string, std::shared_ptr<InotifyCompressJob>> threadMap; | |
int wd = inotify_add_watch(fd, path.c_str(), IN_CREATE | IN_MODIFY | IN_CLOSE_WRITE); | |
std::vector<std::byte> eventBuf; | |
eventBuf.reserve(64 * (sizeof (struct inotify_event)) + NAME_MAX + 1); | |
ssize_t readLen; | |
struct pollfd pfd = { fd, POLLIN, 0 }; | |
while (g_shouldRun) { | |
int ret = poll(&pfd, 1, 100); // timeout of 100ms | |
if (ret == -1) { | |
if (errno == EINTR) { | |
break; | |
} | |
std::cerr << "poll failed: " << std::strerror(errno) << std::endl; | |
return 7; | |
} else if (ret == 0) { | |
continue; | |
} | |
readLen = read(fd, eventBuf.data(), eventBuf.capacity()); | |
if (readLen == 0) { | |
std::cerr << "read from inotify returned 0!" << std::endl; | |
return 4; | |
} else if (readLen == -1) { | |
std::cerr << "Failed to read from inotify: " << std::strerror(errno) << std::endl; | |
return 5; | |
} | |
for (std::byte* cursor = eventBuf.data(); cursor < eventBuf.data() + readLen; ) { | |
auto event = (struct inotify_event*) cursor; | |
cursor += sizeof(struct inotify_event) + event->len; | |
std::string fileName{event->name}; | |
if (!fileName.starts_with("audio_") || !fileName.ends_with(".wav")) | |
continue; | |
if (!threadMap.contains(fileName)) { | |
auto entry = std::make_shared<InotifyCompressJob>(); | |
entry->filePath = std::string{path} + std::filesystem::path::preferred_separator + fileName; | |
entry->th = std::thread{handleNewFile, entry}; | |
threadMap[fileName] = entry; | |
} else { | |
auto& entry = threadMap[fileName]; | |
if (event->mask & IN_CLOSE_WRITE) { | |
entry->stillOpen = false; | |
entry->th.detach(); // TODO: Make this better, destructor is called next. | |
threadMap.erase(fileName); | |
} | |
entry->cv.notify_all(); | |
} | |
} | |
} | |
g_shouldRun = false; // for other threads and housekeeping | |
inotify_rm_watch(fd, wd); | |
close(fd); | |
for (auto& [fileName, entry] : threadMap) { | |
if (entry->th.joinable()) { | |
entry->th.join(); | |
} | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment