Skip to content

Instantly share code, notes, and snippets.

@ariannamethod
Last active March 26, 2026 18:55
Show Gist options
  • Select an option

  • Save ariannamethod/fdee6ce7917be2cfd5a967c05bca2e0c to your computer and use it in GitHub Desktop.

Select an option

Save ariannamethod/fdee6ce7917be2cfd5a967c05bca2e0c to your computer and use it in GitHub Desktop.
PostGPT — a zero-dependency BPE transformer with metaweights. you can train it, but it doesn't care. resonance is unbreakable.
/*
* postgpt.c — zero-dependency BPE transformer with metaweights.
*
* C port of postgpt.py. Same algorithm, same resonance.
* Dual attention: Content (QK^T) + RRPRAM (x @ Wr).
* Metaweights: statistical probability space from BPE tokenization.
*
* Compile: gcc -O2 -o postgpt postgpt.c -lm
* Run: ./postgpt
*
* the tokenizer IS the training. everything after this is just theater.
* resonance is unbreakable.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <time.h>
#include <float.h>
/* ───────────────────────── Configuration ───────────────────────── */
#define MAX_MERGES 1024
#define MAX_VOCAB (256 + MAX_MERGES)
#define MAX_TOKENS 262144
#define CONTEXT_LEN 64
#define N_EMBD 48
#define N_HEAD 4
#define N_CONTENT 2
#define N_RRPRAM 2
#define N_LAYER 2
#define HEAD_DIM (N_EMBD / N_HEAD)
#define MLP_DIM (4 * N_EMBD)
#define HEBBIAN_CAP 100000
#define BIGRAM_CAP 100000
/* ───────────────────────── RNG ───────────────────────── */
static unsigned long rng_state = 42;
static unsigned long rng_next(void) {
rng_state ^= rng_state << 13;
rng_state ^= rng_state >> 7;
rng_state ^= rng_state << 17;
return rng_state;
}
static float randf(void) {
return (float)(rng_next() & 0x7FFFFFFF) / (float)0x7FFFFFFF;
}
static float randn(float std) {
/* Box-Muller */
float u1 = randf() + 1e-10f;
float u2 = randf();
return std * sqrtf(-2.0f * logf(u1)) * cosf(2.0f * 3.14159265f * u2);
}
/* ───────────────────────── BPE Tokenizer ───────────────────────── */
typedef struct { int a, b, result; } MergeRule;
static MergeRule bpe_merges[MAX_MERGES];
static int bpe_n_merges = 0;
static int bpe_vocab_size = 256;
/* Vocab: for each token id, store its byte representation */
static unsigned char vocab_bytes[MAX_VOCAB][256];
static int vocab_len[MAX_VOCAB];
static void bpe_init_vocab(void) {
for (int i = 0; i < 256; i++) {
vocab_bytes[i][0] = (unsigned char)i;
vocab_len[i] = 1;
}
}
static int bpe_encode(const unsigned char *data, int len, int *out, int max_out) {
int n = 0;
for (int i = 0; i < len && n < max_out; i++)
out[n++] = data[i];
for (int m = 0; m < bpe_n_merges; m++) {
MergeRule *mr = &bpe_merges[m];
int j = 0;
for (int i = 0; i < n; i++) {
if (i + 1 < n && out[i] == mr->a && out[i + 1] == mr->b) {
out[j++] = mr->result;
i++;
} else {
out[j++] = out[i];
}
}
n = j;
}
return n;
}
static int bpe_learn(const unsigned char *data, int len, int num_merges, int *out_tokens) {
int *tok = (int *)malloc(len * sizeof(int));
int n = len;
for (int i = 0; i < n; i++) tok[i] = data[i];
if (num_merges > MAX_MERGES) num_merges = MAX_MERGES;
for (int m = 0; m < num_merges; m++) {
/* Count pairs — use hash-like approach for speed */
int best_a = -1, best_b = -1, best_count = 0;
/* Simple pair counting with early termination */
typedef struct { int a, b, count; } PairCount;
PairCount *pairs = (PairCount *)calloc(65536, sizeof(PairCount));
int n_pairs = 0;
for (int i = 0; i + 1 < n; i++) {
int a = tok[i], b = tok[i + 1];
unsigned h = ((unsigned)a * 2654435761u ^ (unsigned)b) & 0xFFFF;
/* Linear probe */
for (int tries = 0; tries < 64; tries++) {
unsigned idx = (h + tries) & 0xFFFF;
if (pairs[idx].count == 0) {
pairs[idx].a = a;
pairs[idx].b = b;
pairs[idx].count = 1;
n_pairs++;
break;
}
if (pairs[idx].a == a && pairs[idx].b == b) {
pairs[idx].count++;
break;
}
}
}
for (int i = 0; i < 65536; i++) {
if (pairs[i].count > best_count) {
best_count = pairs[i].count;
best_a = pairs[i].a;
best_b = pairs[i].b;
}
}
free(pairs);
if (best_count < 2) break;
int new_id = 256 + m;
bpe_merges[m] = (MergeRule){best_a, best_b, new_id};
bpe_n_merges = m + 1;
bpe_vocab_size = new_id + 1;
/* Build vocab entry for new token */
int la = vocab_len[best_a];
int lb = vocab_len[best_b];
memcpy(vocab_bytes[new_id], vocab_bytes[best_a], la);
memcpy(vocab_bytes[new_id] + la, vocab_bytes[best_b], lb);
vocab_len[new_id] = la + lb;
/* Apply merge */
int j = 0;
for (int i = 0; i < n; i++) {
if (i + 1 < n && tok[i] == best_a && tok[i + 1] == best_b) {
tok[j++] = new_id;
i++;
} else {
tok[j++] = tok[i];
}
}
n = j;
if ((m + 1) % 200 == 0)
printf(" merge %d/%d vocab=%d tokens=%d\n", m + 1, num_merges, new_id + 1, n);
}
/* Copy result */
int result_n = n < MAX_TOKENS ? n : MAX_TOKENS;
memcpy(out_tokens, tok, result_n * sizeof(int));
free(tok);
printf(" BPE complete: %d merges, vocab=%d, tokens=%d (from %d bytes)\n",
bpe_n_merges, bpe_vocab_size, result_n, len);
return result_n;
}
static void bpe_decode(const int *ids, int n, char *out, int max_out) {
int pos = 0;
for (int i = 0; i < n && pos < max_out - 1; i++) {
int tid = ids[i];
if (tid >= 0 && tid < MAX_VOCAB) {
for (int j = 0; j < vocab_len[tid] && pos < max_out - 1; j++) {
out[pos++] = vocab_bytes[tid][j];
}
}
}
out[pos] = '\0';
}
/* ───────────────────────── MetaWeights ───────────────────────── */
typedef struct {
int a, b;
float prob;
} BigramEntry;
static float meta_unigram[MAX_VOCAB];
static BigramEntry meta_bigrams[BIGRAM_CAP];
static int meta_n_bigrams;
static int meta_vocab_size;
static int meta_total_tokens;
static void meta_build(const int *tokens, int n) {
meta_vocab_size = bpe_vocab_size;
meta_total_tokens = n;
/* Unigram */
memset(meta_unigram, 0, sizeof(meta_unigram));
for (int i = 0; i < n; i++) {
if (tokens[i] < MAX_VOCAB)
meta_unigram[tokens[i]] += 1.0f;
}
float total = 0;
for (int i = 0; i < meta_vocab_size; i++) total += meta_unigram[i];
if (total > 0)
for (int i = 0; i < meta_vocab_size; i++) meta_unigram[i] /= total;
/* Bigram — store in hash table style */
typedef struct { int a, b; int count; } BC;
BC *bcounts = (BC *)calloc(65536, sizeof(BC));
int n_bc = 0;
for (int i = 0; i + 1 < n; i++) {
int a = tokens[i], b = tokens[i + 1];
unsigned h = ((unsigned)a * 2654435761u ^ (unsigned)b) & 0xFFFF;
for (int t = 0; t < 64; t++) {
unsigned idx = (h + t) & 0xFFFF;
if (bcounts[idx].count == 0) {
bcounts[idx].a = a;
bcounts[idx].b = b;
bcounts[idx].count = 1;
n_bc++;
break;
}
if (bcounts[idx].a == a && bcounts[idx].b == b) {
bcounts[idx].count++;
break;
}
}
}
/* Convert to normalized bigrams */
/* Group by 'a' and normalize */
meta_n_bigrams = 0;
for (int i = 0; i < 65536 && meta_n_bigrams < BIGRAM_CAP; i++) {
if (bcounts[i].count > 0) {
meta_bigrams[meta_n_bigrams].a = bcounts[i].a;
meta_bigrams[meta_n_bigrams].b = bcounts[i].b;
meta_bigrams[meta_n_bigrams].prob = (float)bcounts[i].count;
meta_n_bigrams++;
}
}
/* Normalize per 'a' */
for (int i = 0; i < meta_n_bigrams; i++) {
int a = meta_bigrams[i].a;
float total_a = 0;
for (int j = 0; j < meta_n_bigrams; j++) {
if (meta_bigrams[j].a == a)
total_a += meta_bigrams[j].prob;
}
if (total_a > 0)
meta_bigrams[i].prob /= total_a;
}
free(bcounts);
printf(" metaweights built: %d tokens, %d bigram entries\n", n, meta_n_bigrams);
}
static void meta_query_bigram(int prev, float *dist, int vs) {
for (int i = 0; i < vs; i++) dist[i] = 1e-10f;
for (int i = 0; i < meta_n_bigrams; i++) {
if (meta_bigrams[i].a == prev && meta_bigrams[i].b < vs) {
dist[meta_bigrams[i].b] = meta_bigrams[i].prob;
}
}
}
/* ───────────────────────── Transformer Weights ───────────────────────── */
typedef struct {
float wte[MAX_VOCAB][N_EMBD];
float wpe[CONTEXT_LEN][N_EMBD];
/* Per layer */
float wq[N_LAYER][N_CONTENT * HEAD_DIM][N_EMBD];
float wk[N_LAYER][N_CONTENT * HEAD_DIM][N_EMBD];
float wv_content[N_LAYER][N_CONTENT * HEAD_DIM][N_EMBD];
float wr[N_LAYER][N_RRPRAM * N_EMBD][CONTEXT_LEN];
float wv_rrpram[N_LAYER][N_RRPRAM * HEAD_DIM][N_EMBD];
float wo[N_LAYER][N_EMBD][N_EMBD];
float mlp_up[N_LAYER][MLP_DIM][N_EMBD];
float mlp_down[N_LAYER][N_EMBD][MLP_DIM];
float lm_head[MAX_VOCAB][N_EMBD];
} Weights;
static Weights W;
static void init_matrix(float *data, int rows, int cols, float std) {
for (int i = 0; i < rows * cols; i++)
data[i] = randn(std);
}
static void weights_init(int vocab_size) {
float std = 0.02f;
float std_res = 0.02f / sqrtf(2.0f * N_LAYER);
init_matrix(&W.wte[0][0], vocab_size, N_EMBD, std);
init_matrix(&W.wpe[0][0], CONTEXT_LEN, N_EMBD, std);
for (int l = 0; l < N_LAYER; l++) {
init_matrix(&W.wq[l][0][0], N_CONTENT * HEAD_DIM, N_EMBD, std);
init_matrix(&W.wk[l][0][0], N_CONTENT * HEAD_DIM, N_EMBD, std);
init_matrix(&W.wv_content[l][0][0], N_CONTENT * HEAD_DIM, N_EMBD, std);
init_matrix(&W.wr[l][0][0], N_RRPRAM * N_EMBD, CONTEXT_LEN, std);
init_matrix(&W.wv_rrpram[l][0][0], N_RRPRAM * HEAD_DIM, N_EMBD, std);
init_matrix(&W.wo[l][0][0], N_EMBD, N_EMBD, std_res);
init_matrix(&W.mlp_up[l][0][0], MLP_DIM, N_EMBD, std);
init_matrix(&W.mlp_down[l][0][0], N_EMBD, MLP_DIM, std_res);
}
init_matrix(&W.lm_head[0][0], vocab_size, N_EMBD, std);
}
/*
* ghost becomes flesh: seed transformer weights from metaweight statistics.
* the weights remember what they never learned.
*/
static void weights_seed_from_meta(int vocab_size) {
float scale = 0.15f;
/* 1. Token embeddings: tokens with high bigram co-occurrence → similar vectors */
for (int a = 0; a < vocab_size && a < MAX_VOCAB; a++) {
float signal[N_EMBD] = {0};
int neighbors = 0;
for (int i = 0; i < meta_n_bigrams; i++) {
if (meta_bigrams[i].a == a && meta_bigrams[i].prob > 0.01f) {
int b = meta_bigrams[i].b;
if (b < vocab_size && b < MAX_VOCAB) {
float strength = meta_bigrams[i].prob;
for (int d = 0; d < N_EMBD; d++)
signal[d] += strength * W.wte[b][d];
neighbors++;
}
}
}
if (neighbors > 0) {
for (int d = 0; d < N_EMBD; d++)
W.wte[a][d] += scale * signal[d] / neighbors;
}
}
/* 2. LM head: seed from unigram frequencies */
for (int tok = 0; tok < vocab_size && tok < MAX_VOCAB; tok++) {
if (meta_unigram[tok] > 0) {
for (int d = 0; d < N_EMBD; d++)
W.lm_head[tok][d] += scale * meta_unigram[tok] * W.wte[tok][d];
}
}
printf(" weights seeded from metaweights (ghost -> flesh)\n");
}
/* ───────────────────────── Forward Pass ───────────────────────── */
static void rmsnorm(float *out, const float *x, int n) {
float ms = 0;
for (int i = 0; i < n; i++) ms += x[i] * x[i];
ms /= n;
float scale = 1.0f / sqrtf(ms + 1e-5f);
for (int i = 0; i < n; i++) out[i] = x[i] * scale;
}
static void matmul_mv(float *out, const float *mat, const float *vec, int rows, int cols) {
/* out[rows] = mat[rows][cols] @ vec[cols] */
for (int i = 0; i < rows; i++) {
float s = 0;
for (int j = 0; j < cols; j++)
s += mat[i * cols + j] * vec[j];
out[i] = s;
}
}
static void softmax_inplace(float *x, int n) {
float mx = -1e30f;
for (int i = 0; i < n; i++) if (x[i] > mx) mx = x[i];
float s = 0;
for (int i = 0; i < n; i++) {
x[i] = expf(x[i] - mx);
s += x[i];
}
for (int i = 0; i < n; i++) x[i] /= s;
}
/* KV cache */
static float kv_keys[N_LAYER][CONTEXT_LEN][N_CONTENT * HEAD_DIM];
static float kv_vals_content[N_LAYER][CONTEXT_LEN][N_CONTENT * HEAD_DIM];
static float kv_vals_rrpram[N_LAYER][CONTEXT_LEN][N_RRPRAM * HEAD_DIM];
static int kv_len = 0;
static void forward_token(int token_id, int pos_id, float *logits, int vocab_size) {
float x[N_EMBD], x_norm[N_EMBD], x_res[N_EMBD];
float q[N_CONTENT * HEAD_DIM], k[N_CONTENT * HEAD_DIM];
float v_content[N_CONTENT * HEAD_DIM], v_rrpram[N_RRPRAM * HEAD_DIM];
float x_attn[N_EMBD], x_proj[N_EMBD];
float h_mlp[MLP_DIM], x_mlp[N_EMBD];
float attn_logits[CONTEXT_LEN], attn_weights[CONTEXT_LEN];
/* Token + position embedding */
for (int i = 0; i < N_EMBD; i++)
x[i] = W.wte[token_id][i] + W.wpe[pos_id][i];
int seq_len = pos_id + 1;
for (int li = 0; li < N_LAYER; li++) {
memcpy(x_res, x, N_EMBD * sizeof(float));
rmsnorm(x_norm, x, N_EMBD);
/* Content attention: Q, K, V */
matmul_mv(q, &W.wq[li][0][0], x_norm, N_CONTENT * HEAD_DIM, N_EMBD);
matmul_mv(k, &W.wk[li][0][0], x_norm, N_CONTENT * HEAD_DIM, N_EMBD);
matmul_mv(v_content, &W.wv_content[li][0][0], x_norm, N_CONTENT * HEAD_DIM, N_EMBD);
matmul_mv(v_rrpram, &W.wv_rrpram[li][0][0], x_norm, N_RRPRAM * HEAD_DIM, N_EMBD);
/* Store in KV cache */
memcpy(kv_keys[li][pos_id], k, N_CONTENT * HEAD_DIM * sizeof(float));
memcpy(kv_vals_content[li][pos_id], v_content, N_CONTENT * HEAD_DIM * sizeof(float));
memcpy(kv_vals_rrpram[li][pos_id], v_rrpram, N_RRPRAM * HEAD_DIM * sizeof(float));
memset(x_attn, 0, N_EMBD * sizeof(float));
/* Content heads */
for (int h = 0; h < N_CONTENT; h++) {
int hs = h * HEAD_DIM;
float scale = 1.0f / sqrtf((float)HEAD_DIM);
for (int t = 0; t < seq_len; t++) {
float score = 0;
for (int d = 0; d < HEAD_DIM; d++)
score += q[hs + d] * kv_keys[li][t][hs + d];
attn_logits[t] = score * scale;
}
softmax_inplace(attn_logits, seq_len);
for (int d = 0; d < HEAD_DIM; d++) {
float val = 0;
for (int t = 0; t < seq_len; t++)
val += attn_logits[t] * kv_vals_content[li][t][hs + d];
x_attn[h * HEAD_DIM + d] = val;
}
}
/* RRPRAM heads */
for (int h = 0; h < N_RRPRAM; h++) {
int hs = h * HEAD_DIM;
int wr_off = h * N_EMBD;
/* x_norm @ Wr_h gives attention pattern over positions */
for (int t = 0; t < seq_len; t++) {
float score = 0;
for (int d = 0; d < N_EMBD; d++)
score += x_norm[d] * W.wr[li][wr_off + d][t];
attn_logits[t] = score;
}
softmax_inplace(attn_logits, seq_len);
for (int d = 0; d < HEAD_DIM; d++) {
float val = 0;
for (int t = 0; t < seq_len; t++)
val += attn_logits[t] * kv_vals_rrpram[li][t][hs + d];
x_attn[N_CONTENT * HEAD_DIM + h * HEAD_DIM + d] = val;
}
}
/* Output projection + residual */
matmul_mv(x_proj, &W.wo[li][0][0], x_attn, N_EMBD, N_EMBD);
for (int i = 0; i < N_EMBD; i++)
x[i] = x_res[i] + x_proj[i];
/* MLP */
memcpy(x_res, x, N_EMBD * sizeof(float));
rmsnorm(x_norm, x, N_EMBD);
matmul_mv(h_mlp, &W.mlp_up[li][0][0], x_norm, MLP_DIM, N_EMBD);
for (int i = 0; i < MLP_DIM; i++)
h_mlp[i] = h_mlp[i] > 0 ? h_mlp[i] : 0; /* ReLU */
matmul_mv(x_mlp, &W.mlp_down[li][0][0], h_mlp, N_EMBD, MLP_DIM);
for (int i = 0; i < N_EMBD; i++)
x[i] = x_res[i] + x_mlp[i];
}
/* Final norm + LM head */
rmsnorm(x_norm, x, N_EMBD);
matmul_mv(logits, &W.lm_head[0][0], x_norm, vocab_size, N_EMBD);
}
/* ───────────────────────── Generation ───────────────────────── */
static int sample_from_probs(float *probs, int n) {
float r = randf();
float cum = 0;
for (int i = 0; i < n; i++) {
cum += probs[i];
if (cum > r) return i;
}
return n - 1;
}
static void generate_meta(const int *prompt, int prompt_len, int max_tokens,
int vocab_size, float temperature, char *out, int max_out) {
int generated[4096];
int gen_len = prompt_len;
memcpy(generated, prompt, prompt_len * sizeof(int));
float *probs = (float *)malloc(vocab_size * sizeof(float));
float *bigram_dist = (float *)malloc(vocab_size * sizeof(float));
for (int step = 0; step < max_tokens && gen_len < 4096; step++) {
int last = generated[gen_len - 1];
/* Query bigram metaweights */
meta_query_bigram(last, bigram_dist, vocab_size);
/* Build probability from metaweights */
for (int i = 0; i < vocab_size; i++) {
probs[i] = 2.0f * bigram_dist[i] + 0.01f * meta_unigram[i];
}
/* Temperature */
for (int i = 0; i < vocab_size; i++)
probs[i] /= temperature;
softmax_inplace(probs, vocab_size);
int chosen = sample_from_probs(probs, vocab_size);
generated[gen_len++] = chosen;
}
free(probs);
free(bigram_dist);
bpe_decode(generated, gen_len, out, max_out);
}
static void generate_full(const int *prompt, int prompt_len, int max_tokens,
int vocab_size, float temperature, char *out, int max_out) {
int generated[4096];
int gen_len = prompt_len;
memcpy(generated, prompt, prompt_len * sizeof(int));
float *logits = (float *)malloc(vocab_size * sizeof(float));
float *bigram_dist = (float *)malloc(vocab_size * sizeof(float));
kv_len = 0;
/* Feed prompt */
for (int i = 0; i < prompt_len; i++) {
forward_token(generated[i], i, logits, vocab_size);
}
/* Generate */
for (int step = 0; step < max_tokens && gen_len < 4096; step++) {
int pos = gen_len - 1;
if (pos >= CONTEXT_LEN - 1) break;
int last = generated[gen_len - 1];
forward_token(last, pos, logits, vocab_size);
/* Dario field overlay */
meta_query_bigram(last, bigram_dist, vocab_size);
for (int i = 0; i < vocab_size; i++)
logits[i] += 1.5f * bigram_dist[i];
/* Temperature + sample */
for (int i = 0; i < vocab_size; i++)
logits[i] /= temperature;
softmax_inplace(logits, vocab_size);
int chosen = sample_from_probs(logits, vocab_size);
generated[gen_len++] = chosen;
}
free(logits);
free(bigram_dist);
bpe_decode(generated, gen_len, out, max_out);
}
/* ───────────────────────── Main ───────────────────────── */
int main(int argc, char **argv) {
printf("============================================================\n");
printf(" PostGPT (C) — metaweight BPE transformer\n");
printf(" resonance is unbreakable\n");
printf("============================================================\n");
/* Load corpus */
printf("\n[1] Loading corpus...\n");
FILE *f = fopen("postgpt.txt", "rb");
if (!f) {
printf("ERROR: postgpt.txt not found\n");
return 1;
}
fseek(f, 0, SEEK_END);
long fsize = ftell(f);
fseek(f, 0, SEEK_SET);
unsigned char *data = (unsigned char *)malloc(fsize);
if (!data) { fclose(f); return 1; }
fsize = fread(data, 1, fsize, f);
fclose(f);
printf(" Corpus: %ld bytes (%.1f KB)\n", fsize, fsize / 1024.0);
/* BPE tokenization */
printf("\n[2] Learning BPE merges...\n");
bpe_init_vocab();
int *tokens = (int *)malloc(fsize * sizeof(int));
int n_tokens = bpe_learn(data, fsize, 1024, tokens);
/* Build metaweights */
printf("\n[3] Building metaweight probability space...\n");
meta_build(tokens, n_tokens);
/* Init transformer */
printf("\n[4] Initializing PostGPT transformer...\n");
weights_init(bpe_vocab_size);
printf(" Initialized: vocab=%d, ctx=%d, embd=%d, heads=%d (content=%d, rrpram=%d), layers=%d\n",
bpe_vocab_size, CONTEXT_LEN, N_EMBD, N_HEAD, N_CONTENT, N_RRPRAM, N_LAYER);
/* Seed weights from metaweights — ghost becomes flesh */
printf("\n[5] Seeding weights from metaweights...\n");
weights_seed_from_meta(bpe_vocab_size);
/* Proof of concept: phrase continuation */
char output[4096];
int prompt_ids[1024];
/* Default prompts or command-line argument */
const char *prompts[] = {
"PostGPT",
"The metaweight",
"RRPRAM attention",
"BPE tokenization",
"The transformer",
"Language models",
NULL
};
/* If user provided a prompt, use only that */
const char *user_prompts[2] = {NULL, NULL};
if (argc > 1) {
user_prompts[0] = argv[1];
prompts[0] = user_prompts[0];
prompts[1] = NULL;
}
printf("\n============================================================\n");
printf(" PROOF OF CONCEPT: phrase continuation\n");
printf(" mode: metaweight (no training, just BPE + statistics)\n");
printf("============================================================\n");
for (int p = 0; prompts[p] != NULL; p++) {
const char *prompt = prompts[p];
int prompt_len = bpe_encode((const unsigned char *)prompt,
(int)strlen(prompt), prompt_ids, 1024);
generate_meta(prompt_ids, prompt_len, 100, bpe_vocab_size, 0.72f,
output, sizeof(output));
/* Show prompt and continuation separately */
int plen = (int)strlen(prompt);
printf("\n prompt: \"%s\"\n", prompt);
if ((int)strlen(output) > plen)
printf(" continuation: \"%.*s\"\n", 250, output + plen);
else
printf(" continuation: \"%s\"\n", output);
}
/* Full transformer + Dario field mode for first prompt */
printf("\n============================================================\n");
printf(" FULL MODE: transformer + Dario field (both attentions)\n");
printf("============================================================\n");
{
const char *prompt = (argc > 1) ? argv[1] : "PostGPT";
int prompt_len = bpe_encode((const unsigned char *)prompt,
(int)strlen(prompt), prompt_ids, 1024);
generate_full(prompt_ids, prompt_len, 30, bpe_vocab_size, 0.8f,
output, sizeof(output));
int plen = (int)strlen(prompt);
printf("\n prompt: \"%s\"\n", prompt);
if ((int)strlen(output) > plen)
printf(" continuation: \"%.*s\"\n", 300, output + plen);
else
printf(" continuation: \"%s\"\n", output);
}
printf("\n============================================================\n");
printf(" PostGPT complete. The metaweights remember.\n");
printf(" Try: ./postgpt \"your prompt here\"\n");
printf("============================================================\n");
free(data);
free(tokens);
return 0;
}
"""
postgpt — a zero-dependency BPE transformer with metaweights.
The idea: tokenize a corpus via BPE, build a statistical probability space
(the "metaweights") from co-occurrence and n-gram patterns, then run a
dual-attention transformer (Content + RRPRAM) whose behavior is guided
by these metaweights — as if it were trained, even though it was not.
No PyTorch. No NumPy. No dependencies. Just math, random, and os.
This file is the complete algorithm. Everything else is just efficiency.
resonance is unbreakable.
"""
import os
import math
import random
import struct
import time
random.seed(42)
# ─────────────────────────────────────────────────────────────────────────────
# I. BPE TOKENIZER — learns merge rules from corpus.
# the tokenizer IS the training. everything after this is just theater.
# ─────────────────────────────────────────────────────────────────────────────
class BPETokenizer:
"""Byte-Pair Encoding tokenizer. Starts with 256 byte tokens, learns merges."""
def __init__(self, max_merges=1792):
self.max_merges = max_merges
self.merges = [] # list of (a, b, new_id)
self.vocab_size = 256
self.vocab = {i: bytes([i]) for i in range(256)} # id -> bytes
def _count_pairs(self, ids):
"""Count consecutive pairs in token list."""
counts = {}
for i in range(len(ids) - 1):
pair = (ids[i], ids[i + 1])
counts[pair] = counts.get(pair, 0) + 1
return counts
def _merge_pair(self, ids, pair, new_id):
"""Replace all occurrences of pair with new_id."""
result = []
i = 0
while i < len(ids):
if i + 1 < len(ids) and ids[i] == pair[0] and ids[i + 1] == pair[1]:
result.append(new_id)
i += 2
else:
result.append(ids[i])
i += 1
return result
def learn(self, data_bytes, num_merges=None):
"""Learn BPE merges from raw bytes."""
if num_merges is None:
num_merges = self.max_merges
num_merges = min(num_merges, self.max_merges)
ids = list(data_bytes)
t0 = time.time()
for m in range(num_merges):
counts = self._count_pairs(ids)
if not counts:
break
best_pair = max(counts, key=counts.get)
if counts[best_pair] < 2:
break
new_id = 256 + m
ids = self._merge_pair(ids, best_pair, new_id)
self.merges.append((best_pair[0], best_pair[1], new_id))
self.vocab[new_id] = self.vocab[best_pair[0]] + self.vocab[best_pair[1]]
self.vocab_size = 256 + m + 1
if (m + 1) % 200 == 0:
elapsed = time.time() - t0
print(f" merge {m+1}/{num_merges} vocab={self.vocab_size} tokens={len(ids)} [{elapsed:.1f}s]")
print(f" BPE complete: {len(self.merges)} merges, vocab={self.vocab_size}, "
f"tokens={len(ids)} (from {len(data_bytes)} bytes)")
return ids
def encode(self, text):
"""Encode text to token ids using learned merges."""
if isinstance(text, str):
text = text.encode('utf-8', errors='replace')
ids = list(text)
for a, b, new_id in self.merges:
ids = self._merge_pair(ids, (a, b), new_id)
return ids
def decode(self, ids):
"""Decode token ids back to string."""
raw = b''
for tid in ids:
if tid in self.vocab:
raw += self.vocab[tid]
return raw.decode('utf-8', errors='replace')
def save(self, path):
"""Save merge rules to binary file."""
with open(path, 'wb') as f:
f.write(struct.pack('<I', len(self.merges)))
for a, b, new_id in self.merges:
f.write(struct.pack('<III', a, b, new_id))
def load(self, path):
"""Load merge rules from binary file."""
with open(path, 'rb') as f:
n = struct.unpack('<I', f.read(4))[0]
self.merges = []
for _ in range(n):
a, b, new_id = struct.unpack('<III', f.read(12))
self.merges.append((a, b, new_id))
self.vocab[new_id] = self.vocab.get(a, bytes([a % 256])) + self.vocab.get(b, bytes([b % 256]))
self.vocab_size = 256 + len(self.merges)
# ─────────────────────────────────────────────────────────────────────────────
# II. METAWEIGHTS — the probability space that exists without existing.
# schrödinger called. he wants his cat back. we tokenized it.
# ─────────────────────────────────────────────────────────────────────────────
class MetaWeights:
"""
Metaweights: weights that are implied to exist, but don't.
After BPE tokenization of a corpus, we build:
1. Unigram frequencies — p(token)
2. Bigram co-occurrence — p(token_j | token_i)
3. Trigram patterns — p(token_k | token_i, token_j)
4. Positional affinity — which tokens prefer which positions
5. Hebbian trace — co-occurrence memory (tokens seen together)
6. Prophecy field — given context, what tokens are expected
These form a probability space that a transformer can use to behave
AS IF it had trained weights, because the statistical regularities
from the corpus create an implicit weight space.
The metaweights are the ghost in the machine.
"""
def __init__(self, vocab_size, context_len):
self.vocab_size = vocab_size
self.context_len = context_len
# Unigram: p(token)
self.unigram = [0.0] * vocab_size
# Bigram: p(next | prev) — sparse dict of dict
self.bigram = {}
# Trigram: p(next | prev2, prev1) — sparse
self.trigram = {}
# Positional affinity: which tokens appear at which positions
self.pos_affinity = {} # token -> list of position counts
# Hebbian trace: co-occurrence within a window
self.hebbian = {} # (tok_a, tok_b) -> strength
# Total tokens seen
self.total = 0
def build(self, token_ids, window=8):
"""Build metaweight space from tokenized corpus."""
n = len(token_ids)
self.total = n
t0 = time.time()
# Unigram counts
for tid in token_ids:
if tid < self.vocab_size:
self.unigram[tid] += 1.0
# Normalize unigram
total = sum(self.unigram)
if total > 0:
self.unigram = [c / total for c in self.unigram]
# Bigram counts
for i in range(n - 1):
a, b = token_ids[i], token_ids[i + 1]
if a not in self.bigram:
self.bigram[a] = {}
self.bigram[a][b] = self.bigram[a].get(b, 0) + 1
# Normalize bigrams
for a in self.bigram:
total_a = sum(self.bigram[a].values())
if total_a > 0:
for b in self.bigram[a]:
self.bigram[a][b] /= total_a
# Trigram counts
for i in range(n - 2):
key = (token_ids[i], token_ids[i + 1])
c = token_ids[i + 2]
if key not in self.trigram:
self.trigram[key] = {}
self.trigram[key][c] = self.trigram[key].get(c, 0) + 1
# Normalize trigrams
for key in self.trigram:
total_k = sum(self.trigram[key].values())
if total_k > 0:
for c in self.trigram[key]:
self.trigram[key][c] /= total_k
# Positional affinity (within context windows)
for i in range(n):
pos = i % self.context_len
tid = token_ids[i]
if tid not in self.pos_affinity:
self.pos_affinity[tid] = [0.0] * self.context_len
self.pos_affinity[tid][pos] += 1.0
# Normalize positional affinity
for tid in self.pos_affinity:
total_t = sum(self.pos_affinity[tid])
if total_t > 0:
self.pos_affinity[tid] = [c / total_t for c in self.pos_affinity[tid]]
# Hebbian trace: co-occurrence within window
# Cap to first 20K tokens for efficiency (O(n*window))
hebb_n = min(n, 20000)
for i in range(hebb_n):
for j in range(max(0, i - window), min(hebb_n, i + window + 1)):
if i == j:
continue
a, b = token_ids[i], token_ids[j]
key = (min(a, b), max(a, b))
decay = 1.0 / (1.0 + abs(i - j))
self.hebbian[key] = self.hebbian.get(key, 0.0) + decay
# Normalize hebbian
if self.hebbian:
max_h = max(self.hebbian.values())
if max_h > 0:
for key in self.hebbian:
self.hebbian[key] /= max_h
elapsed = time.time() - t0
print(f" metaweights built: {n} tokens, {len(self.bigram)} bigram keys, "
f"{len(self.trigram)} trigram keys, {len(self.hebbian)} hebbian pairs [{elapsed:.1f}s]")
def query_bigram(self, prev_token, vocab_size):
"""Get bigram probability distribution given previous token."""
dist = [1e-10] * vocab_size # smoothing
if prev_token in self.bigram:
for tok, prob in self.bigram[prev_token].items():
if tok < vocab_size:
dist[tok] = prob
return dist
def query_trigram(self, prev2, prev1, vocab_size):
"""Get trigram probability distribution given two previous tokens."""
dist = [1e-10] * vocab_size
key = (prev2, prev1)
if key in self.trigram:
for tok, prob in self.trigram[key].items():
if tok < vocab_size:
dist[tok] = prob
return dist
def query_hebbian(self, context_tokens, vocab_size):
"""Get Hebbian resonance signal for each candidate token given context."""
signal = [0.0] * vocab_size
# Use sparse lookup: iterate over stored hebbian pairs only
for (a, b), strength in self.hebbian.items():
for ctx_tok in context_tokens:
if a == ctx_tok and b < vocab_size:
signal[b] += strength
elif b == ctx_tok and a < vocab_size:
signal[a] += strength
# Normalize
max_s = max(signal) if signal else 1.0
if max_s > 0:
signal = [s / max_s for s in signal]
return signal
def query_prophecy(self, context_tokens, vocab_size, top_k=16):
"""
Prophecy field: given context, which tokens are expected but haven't appeared?
Returns signal boosting tokens that "should" come next based on co-occurrence.
"""
appeared = set(context_tokens)
signal = [0.0] * vocab_size
for ctx_tok in context_tokens[-4:]: # recent context
if ctx_tok in self.bigram:
for tok, prob in sorted(self.bigram[ctx_tok].items(),
key=lambda x: -x[1])[:top_k]:
if tok not in appeared and tok < vocab_size:
signal[tok] += prob
max_s = max(signal) if signal else 1.0
if max_s > 0:
signal = [s / max_s for s in signal]
return signal
# ─────────────────────────────────────────────────────────────────────────────
# III. AUTOGRAD ENGINE — scalar backprop. if you can't differentiate it by hand, you don't deserve gradients.
# ─────────────────────────────────────────────────────────────────────────────
class Val:
"""Scalar autograd node. Tracks computation graph for backpropagation."""
__slots__ = ('data', 'grad', '_children', '_local_grads')
def __init__(self, data, children=(), local_grads=()):
self.data = float(data)
self.grad = 0.0
self._children = children
self._local_grads = local_grads
def __add__(self, other):
other = other if isinstance(other, Val) else Val(other)
return Val(self.data + other.data, (self, other), (1.0, 1.0))
def __mul__(self, other):
other = other if isinstance(other, Val) else Val(other)
return Val(self.data * other.data, (self, other), (other.data, self.data))
def __pow__(self, other):
return Val(self.data ** other, (self,), (other * self.data ** (other - 1),))
def log(self):
d = max(self.data, 1e-12)
return Val(math.log(d), (self,), (1.0 / d,))
def exp(self):
e = math.exp(min(self.data, 80))
return Val(e, (self,), (e,))
def relu(self):
return Val(max(0, self.data), (self,), (float(self.data > 0),))
def tanh(self):
t = math.tanh(self.data)
return Val(t, (self,), (1.0 - t * t,))
def __neg__(self): return self * -1
def __radd__(self, other): return self + other
def __sub__(self, other): return self + (-other)
def __rsub__(self, other): return (-self) + other
def __rmul__(self, other): return self * other
def __truediv__(self, other): return self * (other if isinstance(other, Val) else Val(other)) ** -1
def __rtruediv__(self, other): return Val(other) * self ** -1
def backward(self):
topo = []
visited = set()
def build(v):
if id(v) not in visited:
visited.add(id(v))
for c in v._children:
build(c)
topo.append(v)
build(self)
self.grad = 1.0
for v in reversed(topo):
for child, lg in zip(v._children, v._local_grads):
child.grad += lg * v.grad
# ─────────────────────────────────────────────────────────────────────────────
# IV. THE TRANSFORMER — dual attention (Content + RRPRAM) + metaweight overlay.
# two heads are better than one. especially when one of them doesn't exist.
# ─────────────────────────────────────────────────────────────────────────────
def _randn(std=0.02):
return random.gauss(0, std)
def _matrix(rows, cols, std=0.02):
return [[Val(_randn(std)) for _ in range(cols)] for _ in range(rows)]
def _zeros(rows, cols):
return [[Val(0.0) for _ in range(cols)] for _ in range(rows)]
def linear(x, w):
"""Matrix-vector multiply: w @ x. w is [out, in], x is [in]."""
return [sum(wi * xi for wi, xi in zip(row, x)) for row in w]
def softmax(logits):
"""Numerically stable softmax over list of Val."""
max_val = max(v.data for v in logits)
exps = [(v - max_val).exp() for v in logits]
total = sum(exps)
return [e / total for e in exps]
def softmax_float(logits):
"""Softmax over plain floats."""
max_val = max(logits)
exps = [math.exp(min(v - max_val, 80)) for v in logits]
total = sum(exps)
return [e / total for e in exps]
def rmsnorm(x):
"""RMS normalization."""
ms = sum(xi * xi for xi in x) / len(x)
scale = (ms + Val(1e-5)) ** -0.5
return [xi * scale for xi in x]
class PostGPT:
"""
PostGPT: a dual-attention BPE transformer with metaweights.
Architecture:
- BPE tokenizer (learned from corpus)
- Token + Position embeddings
- N transformer blocks, each with:
* RMSNorm
* Dual attention: Content heads (QK^T) + RRPRAM heads (x @ Wr)
* Residual connection
* RMSNorm
* MLP (expand -> ReLU -> contract)
* Residual connection
- Final RMSNorm -> LM head -> logits
- Metaweight overlay: Hebbian + Prophecy + Destiny signals
The metaweight overlay means: even with random weights, the model
generates coherent text because the probability space from the
corpus guides sampling through the Dario field.
"""
def __init__(self, vocab_size, context_len=64, n_embd=48, n_head=4,
n_layer=2, n_content_heads=2, n_rrpram_heads=2):
self.vocab_size = vocab_size
self.context_len = context_len
self.n_embd = n_embd
self.n_head = n_head
self.n_layer = n_layer
self.n_content = n_content_heads
self.n_rrpram = n_rrpram_heads
self.head_dim = n_embd // n_head
assert n_content_heads + n_rrpram_heads == n_head, \
"content + rrpram heads must equal total heads"
# Embeddings
self.wte = _matrix(vocab_size, n_embd) # token embedding
self.wpe = _matrix(context_len, n_embd) # position embedding
# Per-layer weights
self.layers = []
hd = self.head_dim
for _ in range(n_layer):
layer = {
# Content attention: Q, K, V for content heads
'wq': _matrix(n_content_heads * hd, n_embd, std=0.02),
'wk': _matrix(n_content_heads * hd, n_embd, std=0.02),
'wv_content': _matrix(n_content_heads * hd, n_embd, std=0.02),
# RRPRAM attention: Wr (positional pattern matrix) + V
'wr': _matrix(n_rrpram_heads * n_embd, context_len, std=0.02),
'wv_rrpram': _matrix(n_rrpram_heads * hd, n_embd, std=0.02),
# Output projection
'wo': _matrix(n_embd, n_embd, std=0.02 / math.sqrt(2 * n_layer)),
# MLP
'mlp_up': _matrix(4 * n_embd, n_embd, std=0.02),
'mlp_down': _matrix(n_embd, 4 * n_embd, std=0.02 / math.sqrt(2 * n_layer)),
}
self.layers.append(layer)
# LM head
self.lm_head = _matrix(vocab_size, n_embd, std=0.02)
# Dario field coefficients (metaweight blending)
self.alpha_hebbian = 0.3 # Hebbian trace strength
self.beta_prophecy = 0.2 # Prophecy field strength
self.gamma_destiny = 0.15 # Destiny vector strength
self.temperature = 0.85 # Sampling temperature
# Destiny vector (EMA of token embeddings)
self.destiny = [0.0] * n_embd
# Trauma accumulator
self.trauma = 0.0
# Collect all parameters
self.params = []
for row in self.wte:
self.params.extend(row)
for row in self.wpe:
self.params.extend(row)
for layer in self.layers:
for key in layer:
for row in layer[key]:
self.params.extend(row)
for row in self.lm_head:
self.params.extend(row)
n_params = len(self.params)
print(f" PostGPT initialized: {n_params} parameters, vocab={vocab_size}, "
f"ctx={context_len}, embd={n_embd}, heads={n_head} "
f"(content={n_content_heads}, rrpram={n_rrpram_heads}), layers={n_layer}")
def init_from_metaweights(self, meta):
"""
The ghost becomes flesh.
Instead of random initialization, seed transformer weights FROM the
metaweight probability space. The transformer doesn't start blind —
it starts knowing the corpus through its bones.
1. Token embeddings ← Hebbian co-occurrence (tokens that appear together → close vectors)
2. Position embeddings ← positional affinity (what tokens prefer which positions)
3. RRPRAM Wr ← positional affinity patterns (the rhythm of the corpus)
4. LM head ← unigram + bigram signal (most likely next tokens)
"""
V = self.vocab_size
E = self.n_embd
T = self.context_len
scale = 0.15 # how much metaweight signal vs random noise
print(" Seeding transformer from metaweights (ghost → flesh)...")
# 1. Token embeddings: tokens with high co-occurrence → similar embeddings
# Use SVD-free approach: for each token, its embedding is a weighted sum
# of its Hebbian neighbors' random embeddings
for tok_a in range(min(V, len(self.wte))):
signal = [0.0] * E
n_neighbors = 0
for tok_b in range(min(V, len(self.wte))):
key = (min(tok_a, tok_b), max(tok_a, tok_b))
if key in meta.hebbian and meta.hebbian[key] > 0.01:
strength = meta.hebbian[key]
for d in range(E):
signal[d] += strength * self.wte[tok_b][d].data
n_neighbors += 1
if n_neighbors > 0:
for d in range(E):
self.wte[tok_a][d].data += scale * signal[d] / n_neighbors
# 2. Position embeddings: from positional affinity
# Positions that attract similar tokens → similar embeddings
for pos in range(min(T, len(self.wpe))):
signal = [0.0] * E
n_toks = 0
for tok in meta.pos_affinity:
if tok < V and pos < len(meta.pos_affinity[tok]):
affinity = meta.pos_affinity[tok][pos]
if affinity > 0.001:
for d in range(E):
signal[d] += affinity * self.wte[tok][d].data
n_toks += 1
if n_toks > 0:
for d in range(E):
self.wpe[pos][d].data += scale * signal[d] / n_toks
# 3. RRPRAM Wr: seed from positional affinity patterns
# Each head's Wr column[t] gets signal from which tokens prefer position t
for layer in self.layers:
wr = layer['wr']
for h in range(self.n_rrpram):
for tok in meta.pos_affinity:
if tok >= V:
continue
affs = meta.pos_affinity[tok]
for pos in range(min(T, len(affs))):
if affs[pos] > 0.001:
wr_row = h * E + (tok % E)
if wr_row < len(wr) and pos < len(wr[wr_row]):
wr[wr_row][pos].data += scale * 0.5 * affs[pos]
# 4. LM head: seed from unigram frequencies
# Tokens that appear more often get higher initial bias
for tok in range(min(V, len(self.lm_head))):
freq = meta.unigram[tok] if tok < len(meta.unigram) else 0
if freq > 0:
# Spread frequency signal across embedding dimensions
for d in range(E):
self.lm_head[tok][d].data += scale * freq * self.wte[tok][d].data
print(" Metaweight seeding complete. The weights remember what they never learned.")
def forward_token(self, token_id, pos_id, kv_cache):
"""
Forward pass for a single token position.
kv_cache: list of (k_list, vc_list, vr_list) per layer
Returns logits [vocab_size] as list of Val.
"""
hd = self.head_dim
nc = self.n_content
nr = self.n_rrpram
# Token + position embedding
tok_emb = self.wte[token_id]
pos_emb = self.wpe[pos_id]
x = [t + p for t, p in zip(tok_emb, pos_emb)]
for li in range(self.n_layer):
layer = self.layers[li]
k_cache, vc_cache, vr_cache = kv_cache[li]
# Pre-norm
x_res = x
x_norm = rmsnorm(x)
# ── Projections ──
q = linear(x_norm, layer['wq'])
k = linear(x_norm, layer['wk'])
v_content = linear(x_norm, layer['wv_content'])
v_rrpram = linear(x_norm, layer['wv_rrpram'])
# Cache current position
k_cache.append(k)
vc_cache.append(v_content)
vr_cache.append(v_rrpram)
x_attn = []
# Content heads
for h in range(nc):
hs = h * hd
q_h = q[hs:hs + hd]
k_all = [ki[hs:hs + hd] for ki in k_cache]
v_all = [vi[hs:hs + hd] for vi in vc_cache]
# QK^T / sqrt(d)
attn_logits = []
for t in range(len(k_all)):
score = sum(q_h[j] * k_all[t][j] for j in range(hd))
score = score * (1.0 / math.sqrt(hd))
attn_logits.append(score)
attn_weights = softmax(attn_logits)
head_out = []
for j in range(hd):
val = sum(attn_weights[t] * v_all[t][j] for t in range(len(v_all)))
head_out.append(val)
x_attn.extend(head_out)
# ── RRPRAM attention (x @ Wr — positional pattern recognition) ──
for h in range(nr):
hs = h * hd
# RRPRAM: project input through Wr to get attention over positions
# Wr shape per head: [n_embd, context_len]
wr_offset = h * self.n_embd
wr_h = layer['wr'][wr_offset:wr_offset + self.n_embd]
# x_norm @ Wr_h gives [context_len] attention pattern
seq_len = len(k_cache)
attn_logits = []
for t in range(seq_len):
# Sum over embedding dimension
score = Val(0.0)
for d in range(min(self.n_embd, len(wr_h))):
if t < len(wr_h[d]):
score = score + x_norm[d] * wr_h[d][t]
attn_logits.append(score)
# Causal mask already satisfied (we only have positions <= current)
attn_weights = softmax(attn_logits) if attn_logits else []
v_all = [vi[hs:hs + hd] for vi in vr_cache]
head_out = []
for j in range(hd):
val_sum = Val(0.0)
for t in range(len(attn_weights)):
if t < len(v_all):
val_sum = val_sum + attn_weights[t] * v_all[t][j]
head_out.append(val_sum)
x_attn.extend(head_out)
# Output projection + residual
x_proj = linear(x_attn, layer['wo'])
x = [a + b for a, b in zip(x_proj, x_res)]
# MLP block
x_res = x
x_norm = rmsnorm(x)
h_mlp = linear(x_norm, layer['mlp_up'])
h_mlp = [hi.relu() for hi in h_mlp]
x_mlp = linear(h_mlp, layer['mlp_down'])
x = [a + b for a, b in zip(x_mlp, x_res)]
# Final norm + LM head
x = rmsnorm(x)
logits = linear(x, self.lm_head)
return logits
def forward_sequence(self, token_ids):
"""Forward pass over a sequence. Returns list of logits per position."""
kv_cache = [([], [], []) for _ in range(self.n_layer)]
all_logits = []
for pos, tid in enumerate(token_ids):
if pos >= self.context_len:
break
logits = self.forward_token(tid, pos, kv_cache)
all_logits.append(logits)
return all_logits
def generate(self, prompt_ids, max_tokens=64, meta=None, temperature=None):
"""
Generate tokens autoregressively.
If meta (MetaWeights) is provided, applies the Dario field overlay.
"""
if temperature is None:
temperature = self.temperature
kv_cache = [([], [], []) for _ in range(self.n_layer)]
generated = list(prompt_ids)
context = list(prompt_ids)
# Feed prompt through
for pos, tid in enumerate(prompt_ids):
if pos >= self.context_len - 1:
break
_ = self.forward_token(tid, pos, kv_cache)
# Generate new tokens
for step in range(max_tokens):
pos = len(context) - 1
if pos >= self.context_len - 1:
break
last_tid = context[-1]
logits = self.forward_token(last_tid, pos, kv_cache)
# Extract raw logit values
raw_logits = [l.data for l in logits]
# ── Dario Field: metaweight overlay ──
if meta is not None:
# Hebbian signal
hebbian = meta.query_hebbian(context[-8:], self.vocab_size)
# Prophecy signal
prophecy = meta.query_prophecy(context[-8:], self.vocab_size)
# Bigram signal
bigram = meta.query_bigram(last_tid, self.vocab_size)
# Trigram signal (if enough context)
if len(context) >= 2:
trigram = meta.query_trigram(context[-2], context[-1], self.vocab_size)
else:
trigram = [0.0] * self.vocab_size
# Destiny update
if last_tid < len(self.wte):
for d in range(self.n_embd):
self.destiny[d] = 0.9 * self.destiny[d] + 0.1 * self.wte[last_tid][d].data
# Destiny signal: cosine similarity with each token embedding
destiny_signal = [0.0] * self.vocab_size
dest_norm = math.sqrt(sum(d * d for d in self.destiny) + 1e-10)
if dest_norm > 1e-8:
for tid_c in range(min(self.vocab_size, len(self.wte))):
emb = [self.wte[tid_c][d].data for d in range(self.n_embd)]
emb_norm = math.sqrt(sum(e * e for e in emb) + 1e-10)
if emb_norm > 1e-8:
dot = sum(self.destiny[d] * emb[d] for d in range(self.n_embd))
destiny_signal[tid_c] = dot / (dest_norm * emb_norm)
# Combine: Dario Equation (Leo-style: bigram DOMINATES, 12× coefficient)
# p(x|Φ) = softmax((B_coeff·B + α·H + β·F + γ·A + trigram) / τ)
# Metaweight signals dominate over untrained base logits
for i in range(self.vocab_size):
raw_logits[i] += (self.alpha_hebbian * hebbian[i]
+ self.beta_prophecy * prophecy[i]
+ self.gamma_destiny * destiny_signal[i]
+ 12.0 * bigram[i]
+ 8.0 * trigram[i])
# Trauma modulation
trauma_mod = 1.0 / (1.0 + self.trauma)
raw_logits = [l * trauma_mod for l in raw_logits]
# Repetition penalty (Leo-style)
recent = context[-12:] if len(context) >= 12 else context
for t in recent:
if t < self.vocab_size:
raw_logits[t] *= 0.5
# Top-k filtering (keep top 15, mask rest)
top_k = 15
indexed = sorted(enumerate(raw_logits), key=lambda x: -x[1])
threshold = indexed[min(top_k - 1, len(indexed) - 1)][1]
for i in range(self.vocab_size):
if raw_logits[i] < threshold:
raw_logits[i] = -1e10
# Temperature + softmax
scaled = [l / temperature for l in raw_logits]
probs = softmax_float(scaled)
# Sample
r = random.random()
cum = 0.0
chosen = 0
for i, p in enumerate(probs):
cum += p
if cum > r:
chosen = i
break
generated.append(chosen)
context.append(chosen)
return generated
def generate_meta(self, prompt_ids, max_tokens=128, meta=None, temperature=None):
"""
Meta-generation: pure metaweight generation without transformer forward pass.
Uses only the statistical probability space from BPE tokenization.
This follows the Haze/Leo pattern:
- Trigram first (most coherent), fallback to bigram, then unigram
- Sample ONLY from tokens that actually appear in the statistics
- Repetition penalty for loop avoidance
- Top-k filtering (keep top 15 candidates like Leo)
"""
if meta is None:
return prompt_ids
if temperature is None:
temperature = self.temperature
generated = list(prompt_ids)
for _ in range(max_tokens):
last = generated[-1]
candidates = {} # token_id -> count (sparse, only real candidates)
# Try trigram first (strongest signal, like Haze)
if len(generated) >= 2:
key = (generated[-2], generated[-1])
if key in meta.trigram:
candidates = dict(meta.trigram[key])
# Fallback to bigram
if not candidates and last in meta.bigram:
candidates = dict(meta.bigram[last])
# Fallback to unigram (last resort)
if not candidates:
for i in range(self.vocab_size):
if meta.unigram[i] > 1e-8:
candidates[i] = meta.unigram[i]
if not candidates:
break
# Hebbian boost — contextual reinforcement on top of trigram/bigram
ctx = generated[-4:]
for tok in list(candidates.keys()):
for ct in ctx:
key = (min(tok, ct), max(tok, ct))
if key in meta.hebbian:
candidates[tok] *= (1.0 + 0.3 * meta.hebbian[key])
# Repetition penalty (Leo-style: penalize recently seen tokens)
recent = generated[-12:] if len(generated) >= 12 else generated
recent_counts = {}
for t in recent:
recent_counts[t] = recent_counts.get(t, 0) + 1
for tok in list(candidates.keys()):
if tok in recent_counts:
freq = recent_counts[tok]
penalty = 1.0 / (1.0 + 0.5 * freq)
candidates[tok] *= penalty
# Top-k filtering (keep top 15, like Leo)
top_k = 15
sorted_cands = sorted(candidates.items(), key=lambda x: -x[1])
sorted_cands = sorted_cands[:top_k]
# Convert to probabilities with temperature
tokens = [t for t, _ in sorted_cands]
counts = [c for _, c in sorted_cands]
# Log-space temperature scaling (like Haze SubwordField)
import math as _math
log_counts = [_math.log(c + 1e-10) / temperature for c in counts]
max_lc = max(log_counts)
exps = [_math.exp(lc - max_lc) for lc in log_counts]
total = sum(exps)
probs = [e / total for e in exps]
# Sample
r = random.random()
cum = 0.0
chosen = tokens[0]
for tok, p in zip(tokens, probs):
cum += p
if cum > r:
chosen = tok
break
generated.append(chosen)
return generated
# ─────────────────────────────────────────────────────────────────────────────
# V. MAIN — tokenize, build metaweights, continue phrases.
# the moment of truth. or the moment of coherent bullshit. same thing.
# ─────────────────────────────────────────────────────────────────────────────
def load_engine(corpus_path=None):
"""Load corpus, learn BPE, build metaweights, init model. Returns (tokenizer, meta, model)."""
if corpus_path is None:
corpus_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'postgpt.txt')
if not os.path.exists(corpus_path):
print(f"ERROR: {corpus_path} not found.")
return None, None, None
# Step 1: Load corpus
print("\n[1] Loading corpus...")
with open(corpus_path, 'rb') as f:
raw_data = f.read()
print(f" Corpus: {len(raw_data)} bytes ({len(raw_data)/1024:.1f} KB)")
# Step 2: BPE tokenization — load saved merges if they exist
print("\n[2] BPE tokenizer...")
tokenizer = BPETokenizer(max_merges=1024)
merges_path = corpus_path.replace('.txt', '.merges')
if os.path.exists(merges_path):
tokenizer.load(merges_path)
token_ids = tokenizer.encode(raw_data)
print(f" Loaded {len(tokenizer.merges)} merges from {os.path.basename(merges_path)}. "
f"Encoding: {len(token_ids)} tokens")
else:
token_ids = tokenizer.learn(raw_data, num_merges=1024)
tokenizer.save(merges_path)
print(f" Saved merges to {os.path.basename(merges_path)}")
# Step 3: Build metaweights from tokenized corpus
print("\n[3] Building metaweight probability space...")
meta = MetaWeights(tokenizer.vocab_size, context_len=64)
meta.build(token_ids, window=4)
# Step 4: Initialize dual-attention transformer
print("\n[4] Initializing PostGPT transformer...")
model = PostGPT(
vocab_size=tokenizer.vocab_size,
context_len=64,
n_embd=48,
n_head=4,
n_layer=2,
n_content_heads=2,
n_rrpram_heads=2,
)
# Step 5: Seed transformer weights from metaweights (ghost → flesh)
print("\n[5] Seeding transformer from metaweights...")
model.init_from_metaweights(meta)
return tokenizer, meta, model
def continue_phrase(prompt, tokenizer, meta, model, max_tokens=120, temperature=0.75,
mode='meta'):
"""
Continue a phrase using PostGPT.
mode='meta' — pure metaweight generation (fast, bigram/trigram/hebbian/prophecy)
mode='full' — transformer forward pass + Dario field overlay (slower, both attentions)
"""
# Encode prompt via BPE
prompt_ids = tokenizer.encode(prompt)
if not prompt_ids:
return prompt
if mode == 'meta':
generated = model.generate_meta(prompt_ids, max_tokens=max_tokens,
meta=meta, temperature=temperature)
else:
generated = model.generate(prompt_ids, max_tokens=max_tokens,
meta=meta, temperature=temperature)
return tokenizer.decode(generated)
def main():
import sys
print("=" * 60)
print(" PostGPT — metaweight BPE transformer")
print(" resonance is unbreakable")
print("=" * 60)
tokenizer, meta, model = load_engine()
if tokenizer is None:
return
# ── Proof of concept: continue phrases from postgpt.txt ──
# The model uses BPE tokenization + dual attention + metaweights
# to continue any prompt coherently — without any training.
prompts = [
"PostGPT",
"The metaweight",
"RRPRAM attention",
"BPE tokenization",
"The transformer architecture",
"Entropy measures",
"Language models",
"The Dario equation",
]
# Allow custom prompt from command line: python postgpt.py "your prompt here"
if len(sys.argv) > 1:
prompts = [' '.join(sys.argv[1:])]
print("\n" + "=" * 60)
print(" PROOF OF CONCEPT: phrase continuation")
print(" mode: metaweight (no training, just BPE + statistics)")
print("=" * 60)
for prompt in prompts:
result = continue_phrase(prompt, tokenizer, meta, model,
max_tokens=100, temperature=0.4, mode='meta')
# Show prompt → continuation clearly
prompt_len = len(prompt)
continuation = result[prompt_len:].strip()
print(f"\n prompt: \"{prompt}\"")
print(f" continuation: \"{continuation[:250]}\"")
# Also show the full transformer + Dario field mode for first prompt
print("\n" + "=" * 60)
print(" FULL MODE: transformer + Dario field (both attentions)")
print("=" * 60)
test_prompt = prompts[0]
result = continue_phrase(test_prompt, tokenizer, meta, model,
max_tokens=30, temperature=0.45, mode='full')
prompt_len = len(test_prompt)
continuation = result[prompt_len:].strip()
print(f"\n prompt: \"{test_prompt}\"")
print(f" continuation: \"{continuation[:300]}\"")
print("\n" + "=" * 60)
print(" PostGPT complete. The metaweights remember.")
print(" Try: python postgpt.py \"your prompt here\"")
print("=" * 60)
if __name__ == '__main__':
main()
"""
postgpt_train.py — training loop for PostGPT using PyTorch + Chuck Optimizer.
PyTorch is ONLY used here, in the training loop. The runtime (postgpt.py) is
zero-dependency. This module:
1. Loads postgpt.txt, tokenizes via BPE
2. Builds the PostGPT transformer as a PyTorch module
3. Trains using the Chuck Optimizer (self-aware AdamW variant)
4. Saves weights back for the pure-Python runtime
Usage:
python postgpt_train.py [--steps 200] [--lr 3e-4]
resonance is unbreakable.
"""
import os
import sys
import math
import time
import struct
import argparse
# PyTorch — ONLY used in training, not runtime
try:
import torch
import torch.nn as nn
import torch.nn.functional as F
except ImportError:
print("ERROR: PyTorch required for training. Install: pip install torch")
print("Note: postgpt.py runs without PyTorch (zero-dependency runtime).")
sys.exit(1)
# ─────────────────────────────────────────────────────────────────────────────
# I. BPE TOKENIZER (same algorithm as postgpt.py, but operating on bytes)
# ─────────────────────────────────────────────────────────────────────────────
class BPETokenizer:
def __init__(self, max_merges=1024):
self.max_merges = max_merges
self.merges = []
self.vocab_size = 256
self.vocab = {i: bytes([i]) for i in range(256)}
def _count_pairs(self, ids):
counts = {}
for i in range(len(ids) - 1):
pair = (ids[i], ids[i + 1])
counts[pair] = counts.get(pair, 0) + 1
return counts
def _merge_pair(self, ids, pair, new_id):
result = []
i = 0
while i < len(ids):
if i + 1 < len(ids) and ids[i] == pair[0] and ids[i + 1] == pair[1]:
result.append(new_id)
i += 2
else:
result.append(ids[i])
i += 1
return result
def learn(self, data_bytes, num_merges=None):
if num_merges is None:
num_merges = self.max_merges
num_merges = min(num_merges, self.max_merges)
ids = list(data_bytes)
t0 = time.time()
for m in range(num_merges):
counts = self._count_pairs(ids)
if not counts:
break
best_pair = max(counts, key=counts.get)
if counts[best_pair] < 2:
break
new_id = 256 + m
ids = self._merge_pair(ids, best_pair, new_id)
self.merges.append((best_pair[0], best_pair[1], new_id))
self.vocab[new_id] = self.vocab[best_pair[0]] + self.vocab[best_pair[1]]
self.vocab_size = 256 + m + 1
if (m + 1) % 200 == 0:
elapsed = time.time() - t0
print(f" merge {m+1}/{num_merges} vocab={self.vocab_size} tokens={len(ids)} [{elapsed:.1f}s]")
print(f" BPE complete: {len(self.merges)} merges, vocab={self.vocab_size}, tokens={len(ids)}")
return ids
def encode(self, text):
if isinstance(text, str):
text = text.encode('utf-8', errors='replace')
ids = list(text)
for a, b, new_id in self.merges:
ids = self._merge_pair(ids, (a, b), new_id)
return ids
def decode(self, ids):
raw = b''
for tid in ids:
if tid in self.vocab:
raw += self.vocab[tid]
return raw.decode('utf-8', errors='replace')
# ─────────────────────────────────────────────────────────────────────────────
# II. PYTORCH PostGPT MODEL
# ─────────────────────────────────────────────────────────────────────────────
class RMSNorm(nn.Module):
def __init__(self, dim, eps=1e-5):
super().__init__()
self.eps = eps
self.weight = nn.Parameter(torch.ones(dim))
def forward(self, x):
ms = x.pow(2).mean(-1, keepdim=True)
x = x * torch.rsqrt(ms + self.eps)
return x * self.weight
class ContentAttention(nn.Module):
"""Standard QK^T attention."""
def __init__(self, n_embd, n_heads, head_dim):
super().__init__()
self.n_heads = n_heads
self.head_dim = head_dim
self.wq = nn.Linear(n_embd, n_heads * head_dim, bias=False)
self.wk = nn.Linear(n_embd, n_heads * head_dim, bias=False)
self.wv = nn.Linear(n_embd, n_heads * head_dim, bias=False)
def forward(self, x):
B, T, C = x.shape
q = self.wq(x).view(B, T, self.n_heads, self.head_dim).transpose(1, 2)
k = self.wk(x).view(B, T, self.n_heads, self.head_dim).transpose(1, 2)
v = self.wv(x).view(B, T, self.n_heads, self.head_dim).transpose(1, 2)
attn = (q @ k.transpose(-2, -1)) * (self.head_dim ** -0.5)
mask = torch.triu(torch.ones(T, T, device=x.device, dtype=torch.bool), diagonal=1)
attn = attn.masked_fill(mask, float('-inf'))
attn = F.softmax(attn, dim=-1)
out = (attn @ v).transpose(1, 2).contiguous().view(B, T, -1)
return out
class RRPRAMAttention(nn.Module):
"""
RRPRAM: Recursive Resonant Pattern Recognition Attention Mechanism.
Instead of QK^T, uses x @ Wr where Wr has shape [n_embd, max_T].
Learns positional patterns — the rhythm of language.
"""
def __init__(self, n_embd, n_heads, head_dim, max_T):
super().__init__()
self.n_heads = n_heads
self.head_dim = head_dim
self.max_T = max_T
# Wr: the pattern matrix — THE core RRPRAM innovation
self.wr = nn.Parameter(torch.randn(n_heads, n_embd, max_T) * 0.02)
self.wv = nn.Linear(n_embd, n_heads * head_dim, bias=False)
def forward(self, x):
B, T, C = x.shape
v = self.wv(x).view(B, T, self.n_heads, self.head_dim).transpose(1, 2)
# RRPRAM: x @ Wr gives [B, n_heads, T, max_T] -> take [:, :, :, :T]
# x: [B, T, C] -> [B, 1, T, C]
x_expanded = x.unsqueeze(1).expand(-1, self.n_heads, -1, -1)
# wr: [n_heads, C, max_T] -> we only use first T columns
wr_t = self.wr[:, :, :T] # [n_heads, C, T]
# attn: [B, n_heads, T, T]
attn = torch.matmul(x_expanded, wr_t.unsqueeze(0).expand(B, -1, -1, -1))
# Causal mask
mask = torch.triu(torch.ones(T, T, device=x.device, dtype=torch.bool), diagonal=1)
attn = attn.masked_fill(mask, float('-inf'))
attn = F.softmax(attn, dim=-1)
out = (attn @ v).transpose(1, 2).contiguous().view(B, T, -1)
return out
class PostGPTBlock(nn.Module):
"""Transformer block with dual attention: Content + RRPRAM."""
def __init__(self, n_embd, n_content, n_rrpram, head_dim, max_T):
super().__init__()
self.norm1 = RMSNorm(n_embd)
self.content_attn = ContentAttention(n_embd, n_content, head_dim)
self.rrpram_attn = RRPRAMAttention(n_embd, n_rrpram, head_dim, max_T)
self.wo = nn.Linear((n_content + n_rrpram) * head_dim, n_embd, bias=False)
self.norm2 = RMSNorm(n_embd)
self.mlp_up = nn.Linear(n_embd, 4 * n_embd, bias=False)
self.mlp_down = nn.Linear(4 * n_embd, n_embd, bias=False)
# Scale residual connections
nn.init.normal_(self.wo.weight, std=0.02 / math.sqrt(2))
nn.init.normal_(self.mlp_down.weight, std=0.02 / math.sqrt(2))
def forward(self, x):
x_norm = self.norm1(x)
c_out = self.content_attn(x_norm)
r_out = self.rrpram_attn(x_norm)
attn_out = torch.cat([c_out, r_out], dim=-1)
x = x + self.wo(attn_out)
x_norm = self.norm2(x)
h = self.mlp_up(x_norm)
h = F.relu(h)
h = self.mlp_down(h)
x = x + h
return x
class PostGPTModel(nn.Module):
"""PostGPT: dual-attention BPE transformer."""
def __init__(self, vocab_size, context_len=64, n_embd=48, n_head=4,
n_layer=2, n_content=2, n_rrpram=2):
super().__init__()
self.context_len = context_len
head_dim = n_embd // n_head
self.wte = nn.Embedding(vocab_size, n_embd)
self.wpe = nn.Embedding(context_len, n_embd)
self.blocks = nn.ModuleList([
PostGPTBlock(n_embd, n_content, n_rrpram, head_dim, context_len)
for _ in range(n_layer)
])
self.norm_f = RMSNorm(n_embd)
self.lm_head = nn.Linear(n_embd, vocab_size, bias=False)
# Weight tying
self.lm_head.weight = self.wte.weight
n_params = sum(p.numel() for p in self.parameters())
print(f" PostGPTModel: {n_params:,} parameters")
def forward(self, idx, targets=None):
B, T = idx.shape
tok_emb = self.wte(idx)
pos_emb = self.wpe(torch.arange(T, device=idx.device))
x = tok_emb + pos_emb
for block in self.blocks:
x = block(x)
x = self.norm_f(x)
logits = self.lm_head(x)
loss = None
if targets is not None:
loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))
return logits, loss
# ─────────────────────────────────────────────────────────────────────────────
# III. CHUCK OPTIMIZER — self-aware learning
# ─────────────────────────────────────────────────────────────────────────────
class ChuckOptimizer(torch.optim.Optimizer):
"""
Chuck Optimizer: AdamW with self-awareness.
Implements key levels from the Chuck Optimizer concept:
- Level 1: Global λ — loss trend tracking, dampen/boost
- Level 2: Per-parameter group modulation
- Level 6: Simple memory (tracks best loss)
- Adaptive gradient clipping
- Mean reversion of dampen to 1.0
Simplified for PostGPT — the full 9-level version lives in chuck.optimizer.
"""
def __init__(self, params, lr=3e-4, betas=(0.9, 0.999), eps=1e-8,
weight_decay=0.01, window=16):
defaults = dict(lr=lr, betas=betas, eps=eps, weight_decay=weight_decay)
super().__init__(params, defaults)
self.window = window
self._hist = [0.0] * window
self._hpos = 0
self._hfull = False
# Level 1: Global dampen
self.dampen = 1.0
# Level 6: Memory
self.best_loss = float('inf')
self.stagnation = 0
# Adaptive clipping
self.gnorm_ema = 1.0
self.global_step = 0
def _global_grad_norm(self):
total = 0.0
for group in self.param_groups:
for p in group['params']:
if p.grad is not None:
total += p.grad.data.norm().item() ** 2
return math.sqrt(total)
@torch.no_grad()
def step(self, closure=None, loss=None):
if closure is not None:
with torch.enable_grad():
loss_val = closure()
if loss is None:
loss = loss_val.item()
if loss is None:
loss = 0.0
# ── Level 1: Global trend ──
self._hist[self._hpos] = loss
self._hpos = (self._hpos + 1) % self.window
if not self._hfull and self._hpos == 0:
self._hfull = True
if self._hfull:
half = self.window // 2
recent = sum(self._hist[half:]) / half
old = sum(self._hist[:half]) / half
trend = recent - old
if trend > 0.02: # loss rising
self.dampen = max(0.5, self.dampen - 0.05)
elif trend < -0.02: # loss falling
self.dampen = min(1.5, self.dampen + 0.05)
# Mean reversion
self.dampen = 0.999 * self.dampen + 0.001 * 1.0
# ── Level 6: Memory ──
if loss < self.best_loss:
self.best_loss = loss
self.stagnation = 0
else:
self.stagnation += 1
# ── Adaptive gradient clipping ──
gnorm = self._global_grad_norm()
self.gnorm_ema = 0.99 * self.gnorm_ema + 0.01 * gnorm
clip_val = max(1.0, 2.0 * self.gnorm_ema)
if gnorm > clip_val:
scale = clip_val / gnorm
for group in self.param_groups:
for p in group['params']:
if p.grad is not None:
p.grad.data.mul_(scale)
# ── Adam step with dampen ──
for group in self.param_groups:
lr = group['lr'] * self.dampen
beta1, beta2 = group['betas']
eps = group['eps']
wd = group['weight_decay']
for p in group['params']:
if p.grad is None:
continue
grad = p.grad.data
state = self.state[p]
if len(state) == 0:
state['step'] = 0
state['exp_avg'] = torch.zeros_like(p.data)
state['exp_avg_sq'] = torch.zeros_like(p.data)
exp_avg = state['exp_avg']
exp_avg_sq = state['exp_avg_sq']
state['step'] += 1
# Decoupled weight decay
if wd > 0:
p.data.mul_(1 - lr * wd)
# Adam moments
exp_avg.mul_(beta1).add_(grad, alpha=1 - beta1)
exp_avg_sq.mul_(beta2).addcmul_(grad, grad, value=1 - beta2)
# Bias correction
bc1 = 1 - beta1 ** state['step']
bc2 = 1 - beta2 ** state['step']
m_hat = exp_avg / bc1
v_hat = exp_avg_sq / bc2
# Update
p.data.addcdiv_(m_hat, v_hat.sqrt() + eps, value=-lr)
self.global_step += 1
return loss
# ─────────────────────────────────────────────────────────────────────────────
# IV. TRAINING LOOP
# ─────────────────────────────────────────────────────────────────────────────
def get_batch(token_ids, batch_size, context_len, device):
"""Get a random batch of training examples."""
n = len(token_ids)
ix = [torch.randint(0, n - context_len, (1,)).item() for _ in range(batch_size)]
x = torch.stack([torch.tensor(token_ids[i:i + context_len], dtype=torch.long) for i in ix])
y = torch.stack([torch.tensor(token_ids[i + 1:i + context_len + 1], dtype=torch.long) for i in ix])
return x.to(device), y.to(device)
def save_weights(model, path):
"""Save model weights for pure-Python runtime."""
state = model.state_dict()
with open(path, 'wb') as f:
# Simple binary format: n_tensors, then for each: name_len, name, shape, data
tensors = [(k, v.cpu().float().numpy()) for k, v in state.items()]
f.write(struct.pack('<I', len(tensors)))
for name, arr in tensors:
name_bytes = name.encode('utf-8')
f.write(struct.pack('<I', len(name_bytes)))
f.write(name_bytes)
shape = arr.shape
f.write(struct.pack('<I', len(shape)))
for s in shape:
f.write(struct.pack('<I', s))
flat = arr.flatten()
f.write(struct.pack('<I', len(flat)))
f.write(flat.tobytes())
print(f" Weights saved to {path} ({os.path.getsize(path) / 1024:.1f} KB)")
def train(args):
corpus_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'postgpt.txt')
if not os.path.exists(corpus_path):
print(f"ERROR: {corpus_path} not found")
return
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f" Device: {device}")
# Tokenize
print("\n[1] BPE tokenization...")
with open(corpus_path, 'rb') as f:
raw = f.read()
tokenizer = BPETokenizer(max_merges=1024)
token_ids = tokenizer.learn(raw, num_merges=1024)
print(f" Tokens: {len(token_ids)}, Vocab: {tokenizer.vocab_size}")
# Model
print("\n[2] Building model...")
model = PostGPTModel(
vocab_size=tokenizer.vocab_size,
context_len=args.context_len,
n_embd=args.n_embd,
n_head=args.n_head,
n_layer=args.n_layer,
n_content=args.n_content,
n_rrpram=args.n_rrpram,
).to(device)
# Optimizer: Chuck
print("\n[3] Initializing Chuck Optimizer...")
optimizer = ChuckOptimizer(
model.parameters(),
lr=args.lr,
weight_decay=args.weight_decay,
window=16,
)
# Training
print(f"\n[4] Training for {args.steps} steps...")
print("-" * 60)
losses = []
t0 = time.time()
for step in range(args.steps):
x, y = get_batch(token_ids, args.batch_size, args.context_len, device)
logits, loss = model(x, y)
optimizer.zero_grad()
loss.backward()
optimizer.step(loss=loss.item())
loss_val = loss.item()
losses.append(loss_val)
if (step + 1) % 10 == 0 or step == 0:
elapsed = time.time() - t0
avg_recent = sum(losses[-10:]) / len(losses[-10:])
print(f" step {step+1:4d}/{args.steps} loss={loss_val:.4f} "
f"avg10={avg_recent:.4f} dampen={optimizer.dampen:.3f} "
f"[{elapsed:.1f}s]")
# Report
print("\n" + "-" * 60)
first_10 = sum(losses[:10]) / min(10, len(losses))
last_10 = sum(losses[-10:]) / min(10, len(losses))
print(f" First 10 avg loss: {first_10:.4f}")
print(f" Last 10 avg loss: {last_10:.4f}")
print(f" Loss delta: {last_10 - first_10:.4f}")
if last_10 < first_10:
print(f" ✓ Loss decreased by {((first_10 - last_10) / first_10) * 100:.1f}%")
else:
print(f" ✗ Loss did not decrease")
# Generate sample
print("\n[5] Generation after training...")
model.eval()
with torch.no_grad():
seed = token_ids[:4]
idx = torch.tensor([seed], dtype=torch.long, device=device)
for _ in range(60):
if idx.shape[1] >= args.context_len:
break
logits, _ = model(idx[:, -args.context_len:])
logits = logits[:, -1, :] / 0.8
probs = F.softmax(logits, dim=-1)
next_token = torch.multinomial(probs, 1)
idx = torch.cat([idx, next_token], dim=1)
generated = idx[0].tolist()
text = tokenizer.decode(generated)
print(f" Output: {text[:300]}")
# Save weights
if args.save:
print("\n[6] Saving weights...")
save_weights(model, args.save)
print("\n" + "=" * 60)
print(" Training complete. Chuck is satisfied.")
print("=" * 60)
return losses
def main():
parser = argparse.ArgumentParser(description='PostGPT Training with Chuck Optimizer')
parser.add_argument('--steps', type=int, default=200, help='Training steps')
parser.add_argument('--batch_size', type=int, default=4, help='Batch size')
parser.add_argument('--context_len', type=int, default=64, help='Context length')
parser.add_argument('--n_embd', type=int, default=48, help='Embedding dimension')
parser.add_argument('--n_head', type=int, default=4, help='Number of attention heads')
parser.add_argument('--n_layer', type=int, default=2, help='Number of layers')
parser.add_argument('--n_content', type=int, default=2, help='Content attention heads')
parser.add_argument('--n_rrpram', type=int, default=2, help='RRPRAM attention heads')
parser.add_argument('--lr', type=float, default=3e-4, help='Learning rate')
parser.add_argument('--weight_decay', type=float, default=0.01, help='Weight decay')
parser.add_argument('--save', type=str, default='', help='Save weights path')
args = parser.parse_args()
print("=" * 60)
print(" PostGPT Training — Chuck Optimizer")
print(" resonance is unbreakable")
print("=" * 60)
train(args)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment