let client = RpcClient::new("http://localhost:8899".to_string());
let secret: &[u8] = &[...];
let payer = keypair::Keypair::try_from(secret).unwrap();
let ix = solana_sdk::instruction::Instruction {
program_id: "BMkMnSGqZVpRvzeE4agDa1AuvfxfTkhESJ7Yy6LScCzp".parse().unwrap(),
accounts: vec![],
data: exchange::instruction::Initialize {}.data(),
};
let recent_blockhash = client.get_latest_blockhash().await.unwrap();
let message = v0::Message::try_compile(&payer.pubkey(), &[ix], &[], recent_blockhash)
.expect("Failed to compile message V0");
let versioned_message = VersionedMessage::V0(message);
let tx = VersionedTransaction::try_new(versioned_message, &[payer]).unwrap();
let result = client.send_and_confirm_transaction(&tx).await;
let account_data = rpc.get_account_data(&address).await.expect("failed to load account");
let table = AddressLookupTable::deserialize(&account_data).expect("failed to deserailize account");
let lut = AddressLookupTableAccount {
key: address,
addresses: table.addresses.to_vec(),
}
// First to complete wins (others dropped!)
tokio::select! {
order = rx.recv() => handle(order),
_ = interval.tick() => update_funding(),
_ = tokio::signal::ctrl_c() => break,
}
tokio::spawn(async move { .. }); // async task on runtime
tokio::task::spawn_blocking(move || { .. }); // blocking/CPU work on separate pool
tokio::task::block_in_place(|| { .. }); // block current thread (multi_thread only)
// mpsc (bounded — use for backpressure)
let (tx, mut rx) = mpsc::channel::<Order>(1000);
tx.send(order).await?; // blocks if full
tx.try_send(order)?; // error if full
rx.recv().await; // None = all senders dropped
// oneshot (request-response)
let (tx, rx) = oneshot::channel::<Result>();
tx.send(result).unwrap();
let r = rx.await?;
// broadcast (fan-out, every subscriber gets every msg)
let (tx, _) = broadcast::channel::<Price>(1000);
let mut rx = tx.subscribe();
// watch (latest-value, readers see most recent)
let (tx, rx) = watch::channel(initial);
tx.send(new_val)?;
let cur = rx.borrow().clone();
rx.changed().await; // wait for update
// std::sync::Mutex is FASTER if never held across .await
// tokio::sync::Mutex only when you must hold across .await
let m = Arc::new(tokio::sync::Mutex::new(state));
{ let mut g = m.lock().await; g.update(); } // drop guard ASAP
// RwLock
let rw = Arc::new(RwLock::new(book));
let r = rw.read().await; // multiple readers
let w = rw.write().await; // exclusive
sleep(Duration::from_millis(100)).await;
let mut interval = interval(Duration::from_secs(1));
loop { interval.tick().await; do_work().await; } // first tick is instant
match timeout(Duration::from_secs(5), rpc_call()).await {
Ok(r) => r?,
Err(_) => return Err(Timeout),
}
let token = CancellationToken::new();
let child = token.child_token(); // hierarchical cancellation
tokio::spawn(async move {
loop {
tokio::select! {
_ = child.cancelled() => break,
msg = rx.recv() => { /* work */ }
}
}
});
signal::ctrl_c().await?;
token.cancel(); // cancels all children
use anchor_lang::prelude::*;
use crate::state::Orderbook;
#[derive(Accounts)]
pub struct Initialize<'info> {
#[account(mut)]
pub payer: Signer<'info>,
#[account(
init,
payer = payer,
space = 8 + std::mem::size_of::<Orderbook>(),
seeds = [b"orderbook"],
bump
)]
pub orderbook: AccountLoader<'info, Orderbook>,
pub system_program: Program<'info, System>,
}
impl<'info> Initialize<'info> {
pub fn handler(ctx: Context<Initialize>) -> Result<()> {
let mut orderbook = ctx.accounts.orderbook.load_init()?;
orderbook.bump = ctx.bumps.orderbook;
Ok(())
}
}
#[account(zero_copy)]
pub struct Orderbook {
pub bids: [Level; 256],
pub asks: [Level; 256],
}
#[zero_copy]
pub struct Level {
pub price: u64,
}
#[error_code]
pub enum ExchangeError {
#[msg("Empty orderbook")]
EmptyBook,
}
use thiserror::Error;
#[derive(Error, Debug)]
pub enum TaskError {
#[error("Unknown error: {0}")]
Unknown(String),
}
let sem = Arc::new(Semaphore::new(max_concurrency));
let mut workers = JoinSet::new();
Arc<dyn SvmTransactionSender>
#[async_trait]
pub trait SvmTransactionSender: Send + Sync {
async fn send_versioned_transaction(
&self,
rpc_client: &solana_client::nonblocking::rpc_client::RpcClient,
instructions: Vec<Instruction>,
) -> Result<String>;
}