Skip to content

Instantly share code, notes, and snippets.

@tommy-mor
Created September 17, 2025 02:34
Show Gist options
  • Save tommy-mor/c4377ccc5020260fa4913ebe1affafc1 to your computer and use it in GitHub Desktop.
Save tommy-mor/c4377ccc5020260fa4913ebe1affafc1 to your computer and use it in GitHub Desktop.
rama megadump
= Why use Rama?
Software development suffers from a major disconnect: describing a large-scale application is far easier than building it. Too much time is spent on low-level "plumbing"—ingesting, processing, storing, and querying data—instead of building product features. This involves gluing components, fixing impedance mismatches, and managing complex configurations.
Rama is a paradigm shift that offers a cohesive, end-to-end model for building scalable backends. It enables teams to:
* Build backends with up to 100x less code.
* xref:integrating.adoc[Easily integrate] with existing tools.
* Iterate on product ideas with extreme speed.
* Dramatically reduce operational burden and infrastructure.
* Lower overall complexity.
== When to Use Rama
Rama is ideal for applications requiring:
* Realtime, transactional, or reactive capabilities
* Realtime analytics
* Complex and varied indexing
* xref:acid.adoc[Strong fault-tolerance and data consistency]
* Massive scale for reads and writes
* High performance
== Complexity Rama Eliminates
Rama removes vast amounts of backend complexity by replacing:
* The need for multiple, disparate databases
* Complex toolchains and ad-hoc deployment scripts
* Low-level glue code for serialization, routing, and data transforms
* Impedance mismatches between data and application models
* Multiple narrow APIs, often in different languages
== Get Started
To start learning Rama and see these benefits for yourself, begin with our xref:tutorial1.adoc[tutorial].=========We will now explain the core concepts of rama using the java api.========== Depots =
Depots are distributed, partitioned logs of data. All new data enters Rama through depots, which serve as the source for topologies.
All examples can be found in the link:https://github.com/redplanetlabs/rama-examples[rama-examples] project.
== Declaring Depots ==
Depots are declared in a module using `setup.declareDepot()`. A depot's _partitioning scheme_ determines which partition receives an appended record.
[source, java]
----
public class BasicDepotExamplesModule implements RamaModule {
@Override
public void define(Setup setup, Topologies topologies) {
setup.declareDepot("*depot1", Depot.random());
setup.declareDepot("*depot2", Depot.hashBy(Ops.FIRST));
setup.declareDepot("*depot3", Depot.disallow());
}
}
----
=== Built-in Partitioning Schemes ===
* `Depot.random()`: Appends to a random partition for even distribution. Ordering is not guaranteed.
* `Depot.hashBy(function)`: Appends to a partition based on the hash of a value extracted by the provided function. This ensures data with the same extracted value is processed by the same partition, maintaining local ordering.
* `Depot.disallow()`: Prohibits client appends. Used for depots that are only appended to from within a topology.
You can also provide a class that implements `Depot.Partitioning` for custom partitioning logic.
=== Why Partitioning Matters ===
1. *Local Ordering*: To process related events in the order they occurred (e.g., a user's `Follow` event before their `Unfollow` event), they must be sent to the same partition. `Depot.hashBy` is ideal for this.
2. *Performance*: If a depot is partitioned by the same key as a PState, a topology can write to the PState without an extra network hop. This is known as _co-location_.
.Good: Co-located depot and PState partitioning avoids a network hop.
[source, java]
----
// Depot partitioned by user ID
setup.declareDepot("*profileFieldsDepot", Depot.hashBy(Ops.FIRST));
// Topology can write directly to the PState
profiles.source("*profileFieldsDepot").out("*tuple")
.each(Ops.EXPAND, "*tuple").out("*userId", "*field", "*value")
.localTransform("$$profiles", Path.key("*userId", "*field").termVal("*value"));
----
.Bad: Mismatched partitioning requires an extra `hashPartition` step.
[source, java]
----
// Depot is randomly partitioned
setup.declareDepot("*profileFieldsDepot", Depot.random());
// Topology must re-partition the data, causing a network hop
profiles.source("*profileFieldsDepot").out("*tuple")
.each(Ops.EXPAND, "*tuple").out("*userId", "*field", "*value")
.hashPartition("*userId") // <-- Extra network hop
.localTransform("$$profiles", Path.key("*userId", "*field").termVal("*value"));
----
=== Depot Organization Guidelines ===
* **Same Depot**: Group related data that requires local ordering or affects the same conceptual entities (e.g., `Follow` and `Unfollow` events). Use `subSource` to process different data types from the same depot stream.
* **Separate Depots**: Keep unrelated data in separate depots to avoid unnecessary filtering in topologies (e.g., "pageviews" vs. "profile updates").
=== Depot Options ===
A depot can be declared as `global()`, creating a single-partition depot. This is not scalable and should not be used for high-throughput data.
[source, java]
----
setup.declareDepot("*myGlobalDepot", Depot.random()).global();
----
== Tick Depots ==
Tick depots emit a constant value at a configured frequency, useful for time-based logic in topologies. They cannot be appended to.
[source, java]
----
public class TickDepotModule implements RamaModule {
@Override
public void define(Setup setup, Topologies topologies) {
// Emits a tick every 3000 milliseconds
setup.declareTickDepot("*ticks", 3000);
StreamTopology s = topologies.stream("s");
s.source("*ticks").each(Ops.PRINTLN, "Tick");
}
}
----
Ticks are emitted on task 0. Use `allPartition` to broadcast the tick to all tasks.
* *Stream Topologies*: Process ticks at approximately the configured frequency (push-based).
* *Microbatch Topologies*: Emit at most one tick per microbatch execution, only if enough time has passed since the last tick (pull-based). The frequency is limited by the microbatch duration.
== Depot Client API ==
Depot clients are used to append data and query depot partitions.
=== Appends and Ack Levels ===
The `append()` method sends data to the correct partition automatically. You can specify an `AckLevel` to control when the call returns success.
[source, java]
----
depot.append("some data", AckLevel.ACK); // Default
depot.append("some data", AckLevel.APPEND_ACK);
depot.append("some data", AckLevel.NONE);
----
* `AckLevel.NONE`: Returns immediately (fire-and-forget).
* `AckLevel.APPEND_ACK`: Returns after the data is successfully written and replicated in the depot.
* `AckLevel.ACK` (Default): Returns after the conditions for `APPEND_ACK` are met, *plus* all co-located stream topologies have processed the data.
An async version, `appendAsync()`, returns a `CompletableFuture`.
=== Querying Depot Partitions ===
You can read a range of records from a specific depot partition. A record is identified by its `partition index` and `offset` (starting from 0).
1. `getObjectInfo()`: Get the number of partitions for the depot.
2. `getPartitionInfo(partitionIndex)`: Get the start and end offsets for a partition.
3. `read(partitionIndex, startOffset, endOffset)`: Fetch a list of records in the given range.
To avoid long-running remote calls, fetch data in smaller batches (e.g., under 50kb).
== Streaming Ack Returns ==
When using `AckLevel.ACK`, co-located stream topologies can return arbitrary information to the client. The `append()` call returns a `Map<String, Object>` mapping topology names to their return values. This is useful for returning generated IDs or success messages from the topology itself.
== Appending from Topologies ==
Topologies can append to depots using `depotPartitionAppend()`.
Unlike client appends, `depotPartitionAppend` writes to the partition of the current task. Therefore, you must use a partitioner (`hashPartition`, etc.) to direct the data to the correct destination partition.
[source, java]
----
public class DepotPartitionAppendModule implements RamaModule {
@Override
public void define(Setup setup, Topologies topologies) {
setup.declareDepot("*incomingDepot", Depot.hashBy(Ops.FIRST));
setup.declareDepot("*outgoingDepot", Depot.disallow());
StreamTopology s = topologies.stream("s");
s.source("*incomingDepot").out("*tuple")
.each(Ops.EXPAND, "*tuple").out("*k", "*k2", "*v")
.hashPartition("*k2") // Direct data to the correct partition
.each(Ops.TUPLE, "*k2", new Expr(Ops.INC, "*v")).out("*newTuple")
.depotPartitionAppend("*outgoingDepot", "*newTuple");
}
}
----
== Migrations ==
Depot migrations transform or remove existing records during a module update. A migration is a function that takes a record and returns a new value, or `Depot.TOMBSTONE` to delete it.
[source, java]
----
setup.declareDepot("*depot", Depot.random())
.migration("myMigrationId", (Integer num) -> {
if(num.equals(10)) return Depot.TOMBSTONE;
else return "" + num; // Convert Integer to String
});
----
Migrations take effect instantly. They work by creating a new, migrated log in the background and atomically swapping it upon completion. Disk usage will temporarily increase during this process. The migration function is only applied to data that existed before the update; clients must begin appending data in the new format immediately after the update.
== Depot Trimming ==
Depot trimming automatically deletes old records to manage disk space, configured via dynamic options.
* `depot.max.entries.per.partitition`: Sets the number of recent entries to keep per partition.
* `depot.excess.proportion`: A buffer of extra entries kept to prevent race conditions with new topologies starting from the beginning.
* `depot.trim.coordinate.local.topologies`: (Default: `true`) If true, prevents trimming data that is still needed by co-located topologies.
* `depot.trim.coordinate.remote.topologies`: (Default: `true`) If true, prevents trimming data still needed by topologies in other modules.
== Tuning Options ==
=== Dynamic Options ===
* `replication.depot.append.timeout.millis`: Timeout for replicating a depot append.
* `depot.ack.failure.on.any.streaming.failure`: (Default: `true`) If `false`, an `AckLevel.ACK` append will wait for a stream topology to succeed on a retry instead of failing immediately.
=== Foreign Depot Client Configs ===
* `foreign.depot.flush.delay.millis`: (Default: `0`) Adds a delay before flushing appends to increase batching. Optimal values are often 0-50ms.
* `foreign.depot.operation.timeout.millis`: Timeout for foreign depot operations.
== Summary ==
Depots are the entry point for all data into Rama. Effective use of depots hinges on three key design decisions:
1. How many depots to create.
2. Which data belongs in each depot.
3. How each depot should be partitioned.
Mastering these concepts is central to building efficient and well-structured Rama applications.= PStates
Partitioned states (PStates) are durable, replicated data structures that index data for application queries. Rama programming centers on defining PState structures and how they are updated. Based on composable data structures, PStates offer far more flexibility than traditional databases.
PStates also introduce "fine-grained reactive queries", a novel feature that allows applications to be reactive by receiving minimal "diffs" when data changes, rather than the entire new value.
== PStates versus Databases
Databases offer "fixed indexing models" (key/value, document, relational), which are essentially specific combinations of data structures. If a model doesn't fit a new use case, you often need another database.
Rama provides a "flexible indexing model" where each PState can be any combination of data structures you need. This aligns backend development with standard programming practices of choosing the right data structures for the job.
Key differences from traditional databases:
* **Update Model**: A database is global mutable state. A PState is only updated by its owning ETL topology, which consumes data from an immutable depot log. This centralizes write logic, simplifies data flow, and provides a replayable history of all changes.
* **Reactivity**: Databases offer, at best, "coarse-grained reactivity" (e.g., triggers) that only notify you that a value changed. PStates provide "fine-grained reactivity", telling you precisely *how* a value changed, no matter how deeply nested it is.
== Declaring PStates
PStates are declared in stream or microbatch topologies with a var (e.g., `$$p`) and a schema.
[source, java]
----
public class BasicPStateDeclarationsModule implements RamaModule {
@Override
public void define(Setup setup, Topologies topologies) {
StreamTopology s = topologies.stream("s");
s.pstate("$$p1", Long.class);
s.pstate("$$p2", PState.mapSchema(String.class, Integer.class));
s.pstate(
"$$p3",
PState.mapSchema(String.class,
PState.fixedKeysSchema(
"count", Integer.class,
"someList", PState.listSchema(String.class))));
}
}
----
Available schema types:
* `mapSchema`: A map with a specified key class and value schema.
* `setSchema`: A set. Cannot contain other schemas.
* `listSchema`: A list.
* `fixedKeysSchema`: A map with a fixed set of keys, each with its own value schema.
Top-level PState schemas can only be a `mapSchema`, `fixedKeysSchema`, or a direct class reference.
=== Subindexing
For performance, Rama can index elements of a collection individually. This is called "subindexing". It is the default for top-level maps and can be enabled for nested structures.
[source, java]
----
s.pstate("$$p",
PState.mapSchema(String.class,
PState.setSchema(Long.class).subindexed()));
----
Without subindexing, reading a single element from a large collection requires reading the entire collection from disk. With subindexing, Rama can access individual elements efficiently. This is critical for performance when dealing with collections containing thousands or millions of elements.
Subindexed maps and sets are sorted, enabling efficient range queries using navigators like `sortedMapRange` and `sortedSetRange`.
==== Size Tracking
By default, subindexed structures track their size, making size queries an O(1) operation (e.g., `Path.key("a").view(Ops.SIZE)`). This adds a small overhead to writes. It can be disabled for maximum write performance if size queries are not needed.
[source, java]
----
PState.setSchema(Long.class).subindexed(SubindexOptions.withoutSizeTracking());
----
==== Deleting Subindexed Structures
Subindexed structures can be removed like any other value (e.g., `.localTransform("$$p", Path.key("a").termVoid())`).
*Caution*: Deleting a subindexed structure directly cleans up its elements from disk. Deleting its *parent* container will only remove the reference, orphaning the elements on disk. Always delete nested subindexed structures explicitly before deleting their parent.
=== PState Options
* `.global()`: Creates a single-partition PState on task 0. Useful for global aggregates like "top 10" lists or global counts.
* `.initialValue(val)`: Sets an initial value for each partition on module launch. Only for PStates with a class reference schema.
* `.makePrivate()`: Makes the PState readable only by its owning topology.
* `.keyPartitioner(fn)`: Specifies a custom function to route client queries to partitions, overriding the default hash-based partitioning.
== Using PStates in Topologies
PStates are queried and updated within topologies using the dataflow API.
=== Basic Querying
* `localSelect`: Queries the PState partition on the current task. It is synchronous.
* `select`: Partitions the computation based on a key in the path, then queries the target task.
[source, java]
----
// Queries the collection at "*k" on the local partition
.localSelect("$$p", Path.key("*k").all()).out("*v")
// Partitions by "*k", then queries the collection on the target partition
.select("$$p", Path.key("*k").all()).out("*v")
----
=== Yielding Select
For queries over large amounts of data, use `SelectOptions.allowYield()`. This prevents blocking the task thread by executing the query over multiple events, operating on a stable snapshot of the PState.
[source, java]
----
.localSelect("$$p", Path.key("cagney").all(), SelectOptions.allowYield()).out("*v")
----
=== Transforming
PStates are updated by their owning topology using:
* `localTransform`: Uses xref:paths.adoc[Paths] to specify fine-grained updates.
* `agg` / `compoundAgg`: Higher-level abstractions for aggregation.
== Using PState Clients
PState clients, obtained from a `RamaClusterManager`, perform ad-hoc point queries from outside a Rama module. They use yielding selects, so queries can be of any size.
The non-reactive API includes `select` (returns a list of results) and `selectOne` (returns a single result).
[source, java]
----
// In a client application
PState p = cluster.clusterPState(moduleName, "$$p");
// Returns a list with one element: [#{12 14 10}]
List results = p.select(Path.key("davis"));
// Returns the set itself: #{12 14 10}
Set result = p.selectOne(Path.key("davis"));
----
*Note*: Lambdas cannot be used in PState client paths, as the function object's class must exist on both the client and module classpaths.
=== How Client Queries Are Routed
For partitioned PStates, queries must either begin with `Path.key(someKey)` to use the configured key partitioner, or you must provide an explicit partitioning key as an argument to `select`/`selectOne`. The latter is useful when a PState is partitioned by data not present in its own structure.
=== Non-blocking API
Asynchronous versions like `selectAsync` and `selectOneAsync` are available, returning `CompletableFuture` objects.
== Reactive Queries
Reactive queries are created with `proxy`, which works like `selectOne` but returns a stateful `ProxyState` object. This object's value is kept in sync with the PState on the cluster by receiving fine-grained "diffs" in the background.
[source, java]
----
// Get a live, self-updating view of the map at key "a"
ProxyState<Map> proxy = p.proxy(Path.key("a"));
System.out.println("Initial value: " + proxy.get()); // e.g., {"b": 1, "c": 1}
// After an update on the server...
Thread.sleep(50);
System.out.println("New value: " + proxy.get()); // e.g., {"b": 1, "c": 2}
----
A callback can be registered to inspect the diffs as they arrive.
[source, java]
----
p.proxy(Path.key("a"), new ProxyState.Callback<Map>() {
@Override
public void change(Map newVal, Diff diff, Map oldVal) {
System.out.println("Received diff: " + diff);
}
});
----
// Example output: Received diff: KeyDiff[c | NewValueDiff[2]]
This is far more efficient than re-sending the entire map on every change.
=== Diffs
The type of diff generated by a PState update is determined by the transform path used. For example, `Path.key("k").term(...)` generates a `KeyDiff`. These diffs are only computed if there are active subscribers. Subscribers only receive the portion of a diff relevant to their proxied path.
=== Processing Diffs
Diffs can be processed using a "double dispatch" API. You create a processor that implements interfaces for the specific diff types you want to handle (e.g., `KeyDiff.Processor`). Rama will automatically convert or expand diffs to match what your processor can handle.
It is critical to handle `ResyncDiff`, which is sent on initial subscription or after a fault-recovery event. This indicates your client should re-evaluate the entire new value.
=== Fault Handling and Cleanup
`ProxyState` is robust, using CRCs and heartbeats to detect and recover from errors, automatically resyncing if needed.
An active `ProxyState` consumes resources on both the client and server. Call `proxy.close()` when it is no longer needed to release these resources promptly. The server will also garbage collect stale subscriptions after a timeout.
== Schema Validation
By default, Rama performs a full validation of data against the PState schema on every write. For maximum performance in production, two potentially expensive checks can be disabled via configuration:
* `pstate.validate.subindexed.structure.locations`: Prevents moving a subindexed structure.
* `pstate.maximal.schema.validations`: Prevents deep iteration of large non-subindexed structures.
== Migrations
PStates can be migrated to a new schema during a module update. The migration is specified with a transformation function. Rama applies the migration on-read immediately after the update, while migrating the data on disk in the background. This means a module update is not delayed by the migration.
[source, java]
----
// Original Schema: PState.mapSchema(Long.class, Long.class)
// Migrated Schema:
s.pstate("$$p",
PState.mapSchema(Long.class,
PState.migrated(
String.class,
"myMigrationId",
(Object o) -> { // Migration function must be idempotent
if(o instanceof String) return o;
else return o.toString() + "!";
})));
----
Migrations can also convert structures to subindexed, or add/remove keys from a `fixedKeysSchema`. The migration function must not change data structure types in a way that breaks existing topology code (e.g., keep `clojure.lang.IPersistentMap` if paths operate on it).
* **Valid Locations**: Migrations can only be declared on "full values" (values of a top-level map or a subindexed structure), not on parts of a larger, non-subindexed value.
* **Updates**: If you update a module mid-migration, the migration continues if the migration ID is unchanged. Changing the ID restarts the migration.
* **Telemetry**: The Cluster UI provides detailed telemetry on migration progress.
* **Completion**: Once the UI shows a migration is complete, the `PState.migrated` wrapper can be removed in the next module update.
* **Rate Control**: Migration speed is controllable via dynamic options to manage task thread load.
* **Implicit Migrations**: Simple schema changes, like generalizing `Integer.class` to `Object.class`, do not require a formal migration.
== Tuning Options
=== Configurations (Fixed on Deploy)
* `pstate.rocksdb.options.builder`: Customize RocksDB settings for map-based PStates.
* `foreign.pstate.operation.timeout.millis`: Timeout for queries to PStates in other modules.
* `foreign.proxy.*`: Configure `ProxyState` thread pools and failure thresholds.
* `pstate.validate.*` / `pstate.maximal.*`: Disable expensive schema validation checks.
=== Dynamic Options (Editable in UI)
* `pstate.excessive.write.time.warning.millis`: Log `localTransform` calls that exceed a time threshold.
* `pstate.reactivity.queue.limit`: Limit for queued-up reactive subscribers.
* `pstate.yielding.select.page.size`: Default page size for yielding selects.= Paths
Paths are the core mechanism in Rama for reading and writing PStates. They offer a flexible, concise, and composable way to specify fine-grained behavior on complex data structures.
== Navigation Model
A path is a sequence of _navigators_ that defines a traversal through a data structure. A path can navigate to zero, one, or many locations.
Consider this data structure:
[source, java]
----
Map data = new HashMap() {{
put("a0", new HashMap() {{
put("a1", Arrays.asList(9, 3, 6));
put("b1", Arrays.asList(0, 8));
}});
put("b0", new HashMap() {{
put("c1", Arrays.asList("x", "y"));
}});
}};
----
Paths navigate this structure by "hopping" between locations:
* **Single Navigation:** `Path.key("a0", "a1").nth(1)` navigates to the value `3`.
* **Multi-Navigation:** `Path.key("a0").mapVals().all()` navigates to each of the five numbers individually: `9, 3, 6, 0, 8`.
* **Filtered Navigation:** `Path.key("a0").mapVals().all().filterLessThan(7)` navigates to `3, 6, 0`.
The behavior of `select` depends on the context. In the dataflow API, it emits a separate record for each navigated value. In the PState client API, it returns a single list of all navigated values.
== Value Navigators
Value navigators navigate to zero or more _existing_ subvalues within a data structure.
* `key(k)`: Navigates to the value for key `k` in a map.
* `nth(i)`: Navigates to the value at index `i` in a list.
* `mapVals()`: Navigates to each value in a map.
* `all()`: Navigates to every element in a list, set, or map entry.
* `first()`: Navigates to the first element of a list.
* `mapKey(k)`: Navigates to the _key_ `k` itself, not its value. This is primarily useful in transforms for renaming a map key.
== Virtual Value Navigators
Virtual value navigators navigate to locations that do not yet exist, enabling insertion during transforms. They have no effect in `select` operations.
* `voidSetElem()`: Navigates to a "void" location in a set. Transforming this location adds a new element.
* `afterElem()`, `beforeElem()`: Navigate to void locations at the end or beginning of a list for appending/prepending.
* `beforeIndex(i)`: Navigates to a void location before index `i` for insertion.
== Filter Navigators
Filter navigators conditionally continue navigation from the current location. If the condition fails, the path stops for that branch.
* `filterPred(predicate)`: Continues if the `predicate` function returns true for the current value.
* `filterSelected(path)`: Continues if the given sub-`path` navigates to at least one value from the current location. It does not change the current location.
== Substructure Navigators
Substructure navigators navigate to a smaller version of a data structure (e.g., a sublist or submap). Transforms on the substructure affect the original data structure.
* `sublist(start, end)`: Navigates to a contiguous sublist.
* `filteredList(path)`: Navigates to a sublist containing only elements that match the filter `path`. The original positions are maintained for transforms.
* `sortedMapRange(start, end)`, `sortedMapRangeFrom(start, limit)`: Efficiently navigate ranges in sorted maps (e.g., in subindexed PStates). Essential for range queries.
== View Navigators
View navigators navigate to a transformation of the currently navigated value.
* `view(function)`: Navigates to the result of applying the `function` to the current value.
* `transformed(path)`: Navigates to a new value by applying a transform `path` to the current value.
* `nullToVal(defaultValue)`: If the current value is `null`, navigates to `defaultValue`. Otherwise, stays at the current value.
[NOTE]
====
When used in PState queries from a client, functions passed to `view` must exist on the module's classpath. Lambdas are not permitted.
====
== Control Navigators
Control navigators manipulate the flow of navigation, similar to control flow statements in programming.
* `ifPath(conditionPath, thenPath, [elsePath])`: If `conditionPath` selects any values, continue with `thenPath`, otherwise use `elsePath` (or `stop()` if not provided).
* `multiPath(path1, path2, ...)`: Navigates down each provided path sequentially from the current location. Like an `OR` in a filter condition.
* `subselect(path)`: Executes the sub-`path` from the current location and navigates to a single list containing all the results. In transforms, it retains the original locations of the selected items, allowing for complex, cross-substructure manipulations.
* `stay()`, `stop()`: Continues navigation at the current location, or stops it entirely.
== Transform Paths
Paths used in transforms must end with a "term" navigator. The navigation itself works identically to `select`.
Term navigators:
* `termVal(value)`: Replaces the navigated location with a static `value`.
* `term(function, [args...])`: Replaces the navigated location with the result of applying the `function` to it.
* `termVoid()`: Removes the element at the navigated location from its parent collection.
[NOTE]
====
Transform paths operate on PStates and Clojure-style immutable data structures, not standard mutable Java collections like `HashMap` or `ArrayList`.
====
== Custom Navigators
For advanced use cases, such as handling custom data types, you can implement the `Navigator` interface.
[source, java]
----
public interface Navigator<T> extends RamaSerializable {
interface Next { Object invokeNext(Object obj); }
Object select(T obj, Next next);
T transform(T obj, Next next);
}
----
* `select(obj, next)`: Implements read-only navigation. Calls `next.invokeNext(subvalue)` for each item to navigate to.
* `transform(obj, next)`: Implements transform logic. Calls `next.invokeNext(subvalue)` to get the replacement value and updates `obj`.
Use `Path.customNav(new MyNavigator())` to add a custom navigator to a path.
== Value Collection
A feature for collecting values into a side-list during navigation. If any values are collected, a `select` operation returns a list containing the collected values, with the final navigated value appended.
Key methods include `collectOne(path)`, `collect(path)`, and `putCollected(value)`.
== Reactivity
Reactive queries (`proxy`, `proxyAsync`) are implemented at the path level. Fine-grained reactivity is supported for built-in navigators. Custom navigators will result in coarse-grained diffs.
== Summary
Paths are a powerful, composable abstraction for querying and manipulating PStates. While there are many navigators, a small subset provides significant utility. Mastery comes from understanding the core navigation model and composing navigators to achieve the desired behavior.= Query Topologies
Query topologies are a powerful Rama feature for executing real-time, on-demand, distributed queries across any PState or task. They are defined using the same expressive Java dataflow API as ETLs, allowing them to handle complex logic that goes far beyond simple point queries.
While PState clients are excellent for fetching data from a single partition, query topologies excel at querying multiple PStates simultaneously or aggregating data across the entire cluster.
All examples can be found in the link:https://github.com/redplanetlabs/rama-examples[rama-examples] project.
== Structure of Query Topologies
Query topologies behave like distributed functions: they accept input arguments and return a single output object. They are defined by calling `topologies.query()` within a `RamaModule`.
[source, java]
----
// A simple query topology that computes (*a + *b + 1)
topologies.query("q", "*a", "*b").out("*res")
.each(Ops.PLUS, "*a", "*b", 1).out("*res")
.originPartition();
----
Key characteristics:
* **Definition:** `topologies.query("name", "*input1", ...).out("*output")` defines the signature.
* **Implicit Batch Block:** All query topologies are implicitly batch blocks, enabling powerful aggregation patterns.
* **`originPartition()`:** The computation must end with this special partitioner, which routes the final result back to the query's origin.
* **Single Emission:** The output variable must be emitted exactly once. Emitting it zero or multiple times will cause an error.
For example, this topology is problematic because `Ops.EXPLODE` can emit the output var multiple times:
[source, java]
----
topologies.query("q", "*list").out("*res")
.originPartition()
.each(Ops.EXPLODE, "*list").out("*res");
----
=== Example: URL Reach
A more complex example is calculating "URL reach"—the number of unique users exposed to a URL. This involves fetching all users who shared the URL, fetching all of their followers, and then computing a distinct count of the followers.
This can be implemented efficiently with a query topology that parallelizes the work.
[source, java]
----
public class ReachModule implements RamaModule {
// Sub-batch to compute partial distinct counts in parallel
private SubBatch partialReachCounts(String urlVar) {
return new SubBatch(
Block.hashPartition(urlVar)
.localSelect("$$urlToUsers", Path.key(urlVar).all()).out("*userId")
.select("$$followers", Path.key("*userId").all()).out("*reachedUserId")
.hashPartition("*reachedUserId") // <-- Key to parallel distinct count
.agg(Agg.set("*reachedUserId")).out("*partialReachedSet")
.each(Ops.SIZE, "*partialReachedSet").out("*count"),
"*count");
}
@Override
public void define(Setup setup, Topologies topologies) {
// ... PState and ETL definitions for $$urlToUsers and $$followers ...
topologies.query("reach", "*url").out("*numUniqueUsers")
.subBatch(partialReachCounts("*url")).out("*partialCount")
.originPartition()
.agg(Agg.sum("*partialCount")).out("*numUniqueUsers");
}
public static void main(String[] args) throws Exception {
try(InProcessCluster cluster = InProcessCluster.create()) {
// ... Cluster setup, data loading, and waiting for ETLs ...
QueryTopologyClient<Integer> reach = cluster.clusterQuery(module.getClass().getName(), "reach");
System.out.println("grapefruit.com reach: " + reach.invoke("grapefruit.com"));
// Prints: grapefruit.com reach: 6
}
}
}
----
The query distributes the computation by hashing on `*reachedUserId`. This sends all instances of the same follower to the same task, where a set aggregation handles the distinct counting. The final result is a sum of the sizes of these partial sets.
image::./diagrams/query-diagrams/reach.png[]
This approach is highly efficient because the `select` and `localSelect` calls read from PStates that are colocated on the same task, minimizing network overhead.
== Query Topology Client
You get a `QueryTopologyClient` from a `RamaClusterManager` (or `InProcessCluster` for tests).
[source, java]
----
QueryTopologyClient<Integer> q = cluster.clusterQuery(moduleName, "q");
----
Clients have two ways to execute a query:
* **`invoke(...)`**: A blocking call that returns the result directly or throws an exception on failure.
+
[source, java]
----
Integer result = q.invoke(1, 2);
----
* **`invokeAsync(...)`**: A non-blocking call that immediately returns a `CompletableFuture`. The future is completed with the result or an exception.
+
[source, java]
----
CompletableFuture<Integer> futureResult = q.invokeAsync(1, 2);
----
== Leading Partitioner Optimization
To improve performance, a query can start execution directly on the task holding the initial data it needs. This "leading partitioner optimization" saves network hops.
The optimization is triggered if the *first operation* in a query topology is a suitable partitioner (e.g., `hashPartition`).
Restrictions for a partitioner to be "leading":
* Must be a built-in partitioner that targets a single task.
* Its inputs must be query topology input variables.
* Cannot be a `select` call, as its implicit partitioner does not count. This is why the "reach" example uses `.hashPartition` followed by `.localSelect`.
If no leading partitioner is present, the query starts on a random task.
== Invoking Colocated Query Topologies
Query topologies can be invoked from other topologies in the same module using `.invokeQuery()`. This is useful for decomposing complex logic.
[source, java]
----
// "q2" invokes "q1"
topologies.query("q1", "*a").out("*res")
.each(Ops.TIMES, "*a", 3).out("*res")
.originPartition();
topologies.query("q2", "*a").out("*res")
.invokeQuery("q1", "*a").out("*v")
.each(Ops.PLUS, "*v", 1).out("*res")
.originPartition();
----
== Recursive Query Topologies
Query topologies can call themselves, enabling recursion. Rama protects against infinite recursion by enforcing the query's timeout.
[source, java]
----
// Recursive Fibonacci implementation
topologies.query("fib", "*n").out("*res")
.ifTrue(new Expr(Ops.OR, new Expr(Ops.EQUAL, "*n", 0),
new Expr(Ops.EQUAL, "*n", 1)),
Block.each(Ops.IDENTITY, 1).out("*res"),
Block.invokeQuery("fib", new Expr(Ops.DEC, "*n")).out("*a")
.invokeQuery("fib", new Expr(Ops.MINUS, "*n", 2)).out("*b")
.each(Ops.PLUS, "*a", "*b").out("*res"))
.originPartition();
----
== Temporary In-Memory State
For complex queries needing temporary state, Rama provides an implicit, per-invocation PState.
* **Name:** `$$<queryName>$$` (e.g., `$$findPath$$` for a query named `"findPath"`).
* **Scope:** The state is unique to each invocation and is cleared afterward. Each partition starts as `null`.
[source, java]
----
topologies.query("foo", "*v", "*v2").out("*res")
.hashPartition("*v")
// Store *v2 in the temporary PState on the current partition
.localTransform("$$foo$$", Path.termVal("*v2"))
.shufflePartition()
.hashPartition("*v")
// Read the stored value back from the temporary PState
.localSelect("$$foo$$", Path.stay()).out("*v3")
.originPartition()
.each(Ops.PLUS, "*v", "*v3").out("*res");
----
== Tuning Options
Query topologies have one dynamic option, editable in the Cluster UI:
* `topology.query.timeout.millis`: The execution timeout for the query.
== Summary
Designing a Rama application involves balancing precomputation (ETLs and PStates) with on-demand computation. Query topologies provide a flexible and powerful tool for the on-demand side. By using the same dataflow API as ETLs, they make it easy to implement even sophisticated, distributed query logic with high efficiency.= Aggregator
Aggregators provide a high-level, and often more performant, alternative to xref:paths.adoc[paths] for updating PStates. They can offer more concise code for simple transformations and huge performance gains for complex ones, especially for global aggregations.
This page covers:
* The two types of aggregators: accumulators and combiners.
* The built-in aggregators and how to define your own.
* How combiners enable massive performance optimizations in batch blocks.
* Special features like `captureNewValInto`.
* The `topMonotonic` and `limitAgg` special aggregators.
* The "group by" operator.
== Using Aggregators
Aggregators are used with the `agg` and `compoundAgg` methods. `agg` updates a PState's top-level value, while `compoundAgg` targets and updates sub-values within a PState, much like a path.
[source, java]
----
public class AggregateModule implements RamaModule {
@Override
public void define(Setup setup, Topologies topologies) {
setup.declareDepot("*depot", Depot.random());
StreamTopology s = topologies.stream("s");
s.pstate("$$count", Long.class).initialValue(0L); // Top-level value
s.pstate("$$countByKey", PState.mapSchema(String.class, Long.class)); // Nested values
s.source("*depot").out("*k")
.agg("$$count", Agg.count())
.compoundAgg("$$countByKey", CompoundAgg.map("*k", Agg.count()));
}
// A main method would show this code produces:
// Count: 4
// Count by key: [["bette davis" 1] ["james cagney" 2] ["spencer tracy" 1]]
}
----
All built-in aggregators are available as static methods on the `Agg` class.
A key advantage of aggregators is that they automatically initialize nested values that don't exist. The equivalent `compoundAgg` using a path is more verbose:
[source, java]
----
// Aggregator version
.compoundAgg("$$countByKey", CompoundAgg.map("*k", Agg.count()));
// Path version
.localTransform("$$countByKey", Path.key("*k").nullToVal(0).term(Ops.PLUS, 1L));
----
NOTE: For top-level values like `$$count`, you are still responsible for setting an `initialValue` on the PState.
== Defining Aggregators: Accumulators vs. Combiners
Aggregators are defined by an `initVal` (or `zeroVal`) and a function to update a value. There are two types:
* **Accumulators**: Less flexible. The update function takes the current value and new data. All aggregation must happen sequentially.
* **Combiners**: More flexible. The update function combines two values of the same type. This property allows Rama to parallelize the aggregation, leading to significant performance gains.
=== Accumulators
An accumulator implements a `RamaAccumulatorAgg` interface, which has an `initVal()` method and an `accumulate` method.
// A simple accumulator to count inputs.
[source, java]
----
public class AccumCount implements RamaAccumulatorAgg0<Integer> {
@Override
public Integer initVal() { return 0; }
@Override
public Integer accumulate(Integer currVal) {
return currVal + 1;
}
}
// Usage: .agg("$$p", Agg.accumulator(new AccumCount()))
----
=== Combiners
A combiner implements the `RamaCombinerAgg` interface, which has `zeroVal()` and `combine` methods. The `combine` function's associative property is what enables parallelization. Rama can compute partial aggregates and then combine those partial aggregates into a final result.
// A combiner to sum integers.
[source, java]
----
public class CombinerSum implements RamaCombinerAgg<Integer> {
@Override
public Integer zeroVal() { return 0; }
@Override
public Integer combine(Integer val1, Integer val2) {
return val1 + val2;
}
}
// Usage: .agg("$$p", Agg.combiner(new CombinerSum(), "*v"))
----
An optional `isFlushRequired()` method can be overridden to return `true` if the combined value can grow unboundedly (e.g., aggregating into a map). This tells Rama to flush partial results more frequently to conserve memory.
== Built-in Aggregators
Rama provides many built-in aggregators on the `Agg` class.
* `count()`: A combiner that increments by one.
* `sum()`: A combiner that adds inputs.
* `min()` / `max()`: Combiners to find the minimum/maximum value.
* `and()` / `or()`: Combiners for boolean logic.
* `last()`: An accumulator that stores the last seen value.
* `voided()`: Removes an element from a collection, like `Path.termVoid()`.
See the link:https://redplanetlabs.com/javadoc/com/rpl/rama/Agg.html[Agg Javadoc] for a complete list.
== High-Performance Two-Phase Aggregation with Combiners
In batch blocks (used in microbatch and query topologies), Rama can perform a powerful optimization called **two-phase aggregation** if and only if *all* aggregators in the agg phase are combiners.
Consider a global aggregation:
[source, java]
----
// Inefficient: sends all data to one task before aggregating.
mb.source("*depot").out("*mb")
.explodeMicrobatch("*mb").out("*v")
.globalPartition()
.agg("$$p", Agg.sum("*v"));
// Efficient: uses a batch block to enable two-phase aggregation.
mb.source("*depot").out("*mb")
.batchBlock(
Block.explodeMicrobatch("*mb").out("*v")
.globalPartition()
.agg("$$p", Agg.sum("*v")));
----
The second example is massively more scalable. Instead of sending every record across the network via `globalPartition`, Rama performs the optimization. Conceptually, it executes like this:
1. **Phase 1 (Local Aggregation):** Each task partially aggregates its local data into a "combiner buffer". For `Agg.sum()`, each task sums its local numbers into a single partial sum.
2. **Phase 2 (Global Aggregation):** Only the partial results are sent over the `globalPartition`. The final task then combines these few partial results into the final total.
This drastically reduces network traffic and the load on the final aggregation task. If any aggregator is an accumulator, this optimization is disabled. For global aggregations, always use combiners inside a `batchBlock`.
== Advanced Batch Aggregation Features
=== Capturing Newly Updated Values
In a batch block, an aggregator's `captureNewValInto` method lets you capture the final aggregated value for each updated key and use it in the post-agg phase. This is very useful for "top N" computations.
[source, java]
----
// This sub-batch counts words and emits each updated word and its new total count.
private SubBatch wordCounts(String microbatchVar) {
Block b = Block.explodeMicrobatch(microbatchVar).out("*word")
.hashPartition("*word")
.compoundAgg("$$wordCounts",
CompoundAgg.map(
"*word",
Agg.count().captureNewValInto("*count"))); // <1>
return new SubBatch(b, "*word", "*count");
}
----
<1> For every `*word` processed in the microbatch, `captureNewValInto` makes the final `*count` available in the post-agg phase. Even if "apple" appears 10 times, only one `["apple", <new-total-count>]` tuple is emitted from this subbatch.
*Restriction*: `captureNewValInto` cannot be used if the `compoundAgg` call branches into multiple maps, as Rama cannot determine a single set of output variables.
=== Aggregating to a Value or Temporary PState
In batch blocks, `agg` can also output a result to a new variable instead of updating a persistent PState. This is fundamental to query topologies.
[source, java]
----
// Query that sums a list of numbers. The result is emitted into *res.
topologies.query("q", "*nums").out("*res")
.each(Ops.EXPLODE, "*nums").out("*num")
.originPartition()
.agg(Agg.sum("*num")).out("*res");
----
You can also aggregate into a temporary, in-memory PState that exists only for the duration of a microbatch. This allows you to share an intermediate result between sequential batch blocks.
[source, java]
----
.batchBlock(
Block.explodeMicrobatch("*mb").out("*k")
.hashPartition("*k")
.compoundAgg(CompoundAgg.map("*k", Agg.count())).out("$$keyCounts")) // <1>
.batchBlock( // <2>
Block.allPartition()
.localSelect("$$keyCounts", Path.all()).out(...)
...)
----
<1> A temporary PState `$$keyCounts` is created and populated. It is not declared in `setup`.
<2> A subsequent `batchBlock` can read from `$$keyCounts`.
== Special Aggregators
=== `topMonotonic`
`Agg.topMonotonic` efficiently computes a top-N list. It's a combiner, making it highly performant for global top-N computations in batch blocks.
[source, java]
----
.each(Ops.TUPLE, "*word", "*count").out("*tuple")
.globalPartition()
.agg("$$topWords",
Agg.topMonotonic(3, "*tuple") // <1>
.idFunction(Ops.FIRST) // <2>
.sortValFunction(Ops.LAST)); // <3>
----
<1> Keep the top 3 `*tuple` elements.
<2> The ID of each element is its first field (`*word`). Used to identify updates to existing elements.
<3> The sort value is the last field (`*count`).
The "monotonic" name implies a condition for correctness in incremental topologies: the sort values must be strictly increasing (or decreasing if `.ascending()` is used). If a word's count could decrease, `topMonotonic` might not produce the correct global top-N list. In query topologies, where all data is processed at once, this is not an issue.
`topMonotonic` uses an efficient amortized `O(log N)` algorithm. In non-batched contexts, the stored list might be in an intermediate state (e.g., up to 2*N elements), requiring the client to perform a final sort-and-trim.
=== `limitAgg`
`limitAgg` is a `Block` operator (not in `Agg`) that filters a batch down to a fixed number of elements. It can only be used in batch contexts.
[source, java]
----
.limitAgg(LimitAgg.create(3, "*v1", "*v3") // Keep at most 3 elements
.sort("*v2") // Sort by *v2 before limiting
.reverse() // Sort descending
.indexVar("*index")); // Bind the 0-based index to *index
----
This example keeps the three elements with the highest `*v2` values and makes their `*v1` and `*v3` fields available in the post-agg phase, along with their rank `*index`. `limitAgg` uses a combiner, so it is efficient for global limits.
== "Group By" Operator
The `groupBy` operator partitions data by a set of keys and applies aggregations within each group, similar to SQL's `GROUP BY`.
[source, java]
----
.each(Ops.EXPLODE, "*tuplesVar").out("*tuple")
.each(Ops.EXPAND, "*tuple").out("*k", "*val")
.groupBy("*k", // <1>
Block.agg(Agg.count()).out("*count") // <2>
.agg(Agg.sum("*val")).out("*sum")); // <3>
----
<1> Groups data by `*k`. Automatically inserts a hash partition on `*k`.
<2> For each `*k`, `Agg.count()` is applied to produce `*count`.
<3> For each `*k`, `Agg.sum("*val")` is applied to produce `*sum`.
The post-agg phase emits tuples of `[*k, *count, *sum]` for each unique key. You can group by up to six variables.
== Summary
Aggregators are a powerful abstraction in Rama. While they offer conciseness for simple PState updates, their true strength lies in batched contexts. Combiners enable massive performance gains through two-phase aggregation, and specialized tools like `topMonotonic`, `limitAgg`, and `groupBy` allow for expressive and efficient data processing.= Stream Topologies =
Stream topologies reactively process data from depots with millisecond-level latency. They are ideal for use cases requiring fast updates to PStates and for coordinating work with front-end clients.
All examples can be found in the link:https://github.com/redplanetlabs/rama-examples[rama-examples] project.
== Usage ==
Stream topologies consume data from any number of depots to update any number of PStates. They have access to the full dataflow API, except for batch blocks. This allows for complex logic using conditionals, loops, and branching.
[source,java]
----
public class BasicStreamTopologyModule implements RamaModule {
@Override
public void define(Setup setup, Topologies topologies) {
setup.declareDepot("*depot", Depot.hashBy(Ops.FIRST));
setup.declareDepot("*depot2", Depot.random());
StreamTopology s = topologies.stream("s");
s.pstate("$$p1", PState.mapSchema(String.class, Long.class));
s.pstate(
"$$p2",
PState.mapSchema(
String.class,
PState.mapSchema(String.class, Long.class).subindexed()));
s.source("*depot").out("*tuple")
.each(Ops.EXPAND, "*tuple").out("*k1", "*k2")
.compoundAgg("$$p1", CompoundAgg.map("*k1", Agg.count()))
.compoundAgg("$$p2", CompoundAgg.map("*k1", CompoundAgg.map("*k2", Agg.count())))
.ifTrue(new Expr(Ops.NOT_EQUAL, "*k1", "*k2"),
Block.hashPartition("*k2")
.compoundAgg("$$p1", CompoundAgg.map("*k2", Agg.count()))
.compoundAgg("$$p2", CompoundAgg.map("*k2", CompoundAgg.map("*k1", Agg.count()))));
s.source("*depot2").out("*v")
.each(Ops.CURRENT_TASK_ID).out("*taskId")
.each(Ops.PRINTLN, "From *depot2:", "*taskId", "*v");
}
// main method omitted for brevity
}
----
Running this code with sample appends produces:
[source,text]
----
a count: 2
b count: 2
c count: 1
b subcounts: [["a" 1] ["c" 1]]
From *depot2: 3 X
From *depot2: 0 Y
----
.Key Concepts from Usage
* **No Batch Blocks**: Stream topologies cannot use batch blocks, which prevents them from using xref:aggregators.adoc#_high_performance_two_phase_aggregation_with_combiners[two-phase aggregation]. For efficient global aggregation, prefer xref:microbatch.adoc[microbatch topologies].
* **Depot Integration**: Depot appends can be configured to block until colocated stream topologies have finished processing the data. See xref:depots.adoc[depots] for different "ack levels".
== Operation ==
When a depot partition receives a record, it pushes it to all subscribed stream topologies. The processing of a single record can trigger a dynamic "event tree" of downstream work across many tasks. The depot record is considered processed only when the entire event tree completes.
image::./diagrams/stream/event-tree.png[]
.Key Operational Details
* **Event-level Visibility**: PState writes from an event are made visible externally only after the event completes and its writes are replicated. This differs from microbatch topologies, where all writes for a microbatch become visible at once.
* **Auto-Batching**: To optimize performance, Rama batches multiple pending stream topology events on a task. All PState writes within a batch are buffered and become visible externally at the same time, providing a balance of low latency and high throughput.
* **Progress Tracking**: Stream topologies track their progress per depot partition in an internal PState (e.g., `"\$$\__streaming-state-s"`). This state is checkpointed periodically, not after every record, which has implications for fault-tolerance.
== Fault-tolerance and Retry Modes ==
If an event fails for any reason (e.g., code exception, machine failure, timeout), the originating depot record is retried according to the source's configured "retry mode". This provides "at least once processing" semantics.
IMPORTANT: Because records can be processed more than once upon retry, non-idempotent operations like incrementing a counter can become inaccurate. For use cases requiring perfect accuracy with such operations, use a xref:microbatch.adoc[microbatch topology].
Retry modes are configured per source:
[source,java]
----
s.source("*depot", StreamSourceOptions.retryNone());
s.source("*depot2", StreamSourceOptions.retryAllAfter());
----
.Available Retry Modes
* `individual` (Default): Only the failed depot record is retried. Order is not guaranteed on retry.
* `none`: Failed records are never retried ("at most once" semantics).
* `all after`: The failed record and all subsequent records from the same depot partition are retried, preserving order.
.Failure Scenarios
* **Timeouts**: A record fails if its event tree does not complete within a configured timeout.
* **Checkpoint Lag**: If a leader fails after processing records but before checkpointing, those records will be retried by the new leader.
* **Batch Failure**: If any event in a processing batch fails, the entire batch of events is discarded and retried. All PState writes from that batch are rolled back.
== Initial Processing Position ==
The first time a stream topology runs on a depot partition, you can configure where it starts processing. By default, it starts from the end (processing only new records).
[source,java]
----
s.source("*depot", StreamSourceOptions.startFromBeginning());
s.source("*depot2", StreamSourceOptions.startFromOffsetAfterTimestamp(107740800000));
s.source("*depot3", StreamSourceOptions.startFromOffsetAgo(10000, OffsetAgo.RECORDS));
s.source("*depot4", StreamSourceOptions.startFromOffsetAgo(15, OffsetAgo.DAYS));
// Options can be chained
s.source("*depot", StreamSourceOptions.startFromBeginning().retryNone());
----
NOTE: "Start from" options only apply the very first time a topology processes a depot partition. On subsequent runs (after updates or leader switches), it resumes from its last checkpointed position.
== Ack Return Aggregation ==
When using `AckLevel.ACK`, depot appends can receive a value back from colocated stream topologies. This is achieved with the `ackReturn()` method. By default, if multiple `ackReturn()` calls occur for a single depot record, the last value is returned.
[source,java]
----
// Returns the new count after each append
s.source("*depot").out("*k")
.compoundAgg("$$p", CompoundAgg.map("*k", Agg.count()))
.localSelect("$$p", Path.key("*k")).out("*v")
.ackReturn("*v");
----
The aggregation logic can be customized using xref:aggregators.adoc[aggregators].
[source,java]
----
// Aggregates all ackReturn values by summing them
s.source("*depot", StreamSourceOptions.ackReturnAgg(Agg::sum)).out("*v")
.each(Ops.RANGE, 0, "*v")
.shufflePartition()
.ackReturn("*v");
----
A custom aggregator can be provided via `StreamSourceOptions.ackReturnAgg(new MyAggregator())`.
== Throttling ==
To prevent being overloaded, each task enforces a limit on the number of depot records that can be simultaneously processing for a stream topology. When this per-task limit is hit:
* The depot partition pauses sending new records to that task.
* Depot appends with `AckLevel.ACK` will throw an exception.
* Records for sources with `retryNone` will be skipped entirely.
You should scale your modules so this limit is not normally hit. It is a safeguard against unexpected traffic bursts.
== Tuning Options ==
Stream topology behavior can be fine-tuned via configurations and dynamic options.
=== Configurations (Fixed on Deploy) ===
* `topology.stream.periodic.checkpoint.seconds`: How often to checkpoint progress, ensuring progress is saved for low-throughput depots.
=== Dynamic Options (Editable in UI) ===
* `topology.stream.checkpoint.progress.threshold`: Checkpoint after a certain number of records have been processed.
* `topology.stream.max.events.per.batch`: Max number of events to process in a single execution batch on a task.
* `topology.stream.max.executing.per.task`: The per-task throttling limit for concurrently processing depot records.
* `topology.stream.timeout.seconds`: Timeout for an event tree to complete.
* `depot.cache.cardinality`: Size of the depot cache used for retries.
* `depot.cache.catchup.chunk.size`: Number of records to fetch at a time when a topology is catching up.
* `depot.max.pending.streaming.per.partition`: How many acked depot appends can be tracked per depot partition.
* `depot.ack.failure.on.any.streaming.failure`: If `true` (default), an acked append fails immediately on the first stream processing failure. If `false`, it waits for retries to potentially succeed.= Microbatch topologies
Microbatch topologies share similarities with stream topologies but have important differences. They offer greater computational power, different performance characteristics, and provide simple, exactly-once fault-tolerance for PState updates.
This page covers:
* Advanced computational capabilities of microbatch topologies.
* The operational phases of a microbatch.
* How exactly-once semantics are achieved despite failures.
* Available tuning options.
All examples are in the link:https://github.com/redplanetlabs/rama-examples[rama-examples] project.
== Usage
Microbatch topologies process accumulated depot data in coordinated batches, unlike stream topologies which process records individually as they arrive. A single microbatch can process thousands of records from each depot partition simultaneously.
[source, java]
----
public class ExampleMicrobatchTopologyModule implements RamaModule {
@Override
public void define(Setup setup, Topologies topologies) {
setup.declareDepot("*keyPairsDepot", Depot.hashBy(Ops.FIRST));
setup.declareDepot("*numbersDepot", Depot.random());
MicrobatchTopology mb = topologies.microbatch("mb");
mb.pstate(
"$$keyPairCounts",
PState.mapSchema(
String.class,
PState.mapSchema(String.class, Long.class).subindexed()));
mb.pstate("$$globalSum", Long.class).global().initialValue(0L);
mb.source("*keyPairsDepot").out("*microbatch")
.explodeMicrobatch("*microbatch").out("*tuple")
.each(Ops.EXPAND, "*tuple").out("*k", "*k2")
.compoundAgg("$$keyPairCounts", CompoundAgg.map("*k", CompoundAgg.map("*k2", Agg.count())));
mb.source("*numbersDepot").out("*microbatch")
.batchBlock(
Block.explodeMicrobatch("*microbatch").out("*v")
.globalPartition()
.agg("$$globalSum", Agg.sum("*v")));
}
// main method...
}
----
A microbatch `source` emits a single object representing the entire batch of data. This object must be passed to `explodeMicrobatch`, which then emits each individual record from the batch for processing.
The full dataflow API is available, including powerful batch computation capabilities via `batchBlock`. Batch blocks enable two-phase aggregation, joins, and temporary PStates. For details, see xref:intermediate-dataflow.adoc#_batch_blocks[batch blocks].
All dataflow constructs are composable. The only restriction is that `batchBlock` must be initiated from task 0, which is always the case at the start of a microbatch source block.
In tests, use `waitForMicrobatchProcessedCount` to wait for a microbatch to process a specific number of appended records, as processing is asynchronous to appends.
== Operation and fault-tolerance
Microbatch topologies achieve high throughput and exactly-once fault-tolerance through a phased execution model controlled by a "runner" on the task 0 leader. This runner operates as a series of asynchronous events.
A single execution cycle is a "microbatch attempt", identified by a `microbatch ID` and a `version`. If an attempt fails (due to an exception, leadership change, or timeout), it is retried with the same `microbatch ID` and an incremented `version`.
Crucially, each attempt for a given `microbatch ID`:
1. Processes the *exact same set of depot data*.
2. Starts with PStates in the *exact same state* as they were after the previous successful microbatch.
This guarantees exactly-once semantics for PState updates.
A microbatch executes in three sequential phases coordinated by the runner:
image::./diagrams/microbatch/microbatch-loop.png[]
. *Priming Phase:* Prepares all tasks for the microbatch. This involves clearing internal buffers and resetting the _internal view_ of user PStates to their state after the last successful microbatch ID. Topology code always operates on this internal view.
. *Processing Phase:* Executes the user-defined topology code. The range of depot data to be processed is fixed for the current microbatch ID.
. *Commit Phase:* After processing completes, all PState updates are checkpointed and xref:replication.adoc[replicated]. Once a task's PStates are checkpointed and replicated, their _external view_ is updated, making the changes visible to clients and other topologies. All PState changes on a given task become visible atomically.
A microbatch topology's output is not strictly deterministic if the code uses non-deterministic operations like `shufflePartition` or random numbers. However, the resulting PStates still correctly represent the processing of each depot record exactly once.
[NOTE]
====
Depot appends performed within a microbatch topology do not currently have exactly-once semantics. This is on the roadmap.
====
== "Start from" options
You can configure a microbatch topology to start processing from a specific point in a depot's history. These options apply only the first time a topology encounters a depot.
[source, java]
----
mb.source("*depot", MicrobatchSourceOptions.startFromBeginning());
mb.source("*depot2", MicrobatchSourceOptions.startFromOffsetAfterTimestamp(107740800000));
mb.source("*depot3", MicrobatchSourceOptions.startFromOffsetAgo(10000, OffsetAgo.RECORDS));
mb.source("*depot4", MicrobatchSourceOptions.startFromOffsetAgo(15, OffsetAgo.DAYS));
----
For descriptions, see xref:stream.adoc#_start_from_options[this section]. Unlike stream topologies, no additional fault-tolerance options are needed, as microbatch topologies are always exactly-once.
== Tuning options
Microbatch topologies have several xref:operating-rama.adoc#_worker_configurations_and_dynamic_options[dynamic options] that can be changed live from the Cluster UI.
* `depot.microbatch.max.records`: Max records to read per depot partition in a microbatch. Higher values increase batching but also memory usage.
* `depot.max.fetch`: Max depot records to fetch from a partition in a single network request. Must be <= `depot.microbatch.max.records`.
* `topology.microbatch.phase.timeout.seconds`: Timeout for each microbatch phase. A timeout triggers a retry.
* `topology.microbatch.empty.sleep.time.millis`: Time the runner sleeps if a microbatch processes zero records, preventing CPU spinning on idle depots.
* `topology.microbatch.pstate.flush.path.count`: Rate at which PState writes are flushed to disk. Higher values improve write batching but can increase task thread utilization.
* `worker.combined.transfer.limit`: Batches outgoing events into a single network message. Higher values improve throughput but can cause network-level delays.
* `topology.combiner.limit`: Flush frequency for combiners that require it (most do not).
* `topology.microbatch.ack.branching.factor`: Branching factor for the hierarchical acking tree used to detect processing phase completion.
* `topology.microbatch.ack.delay.base.millis`: Base wait time for new completion info in the acking algorithm.
* `topology.microbatch.ack.delay.step.millis`: Additional wait time per level in the acking tree, increasing towards the root (task 0).
== Summary
Unless you require millisecond-level update latency for PStates, you should prefer microbatch topologies over stream topologies. They offer higher throughput, simpler fault-tolerance semantics, and significantly more powerful computational capabilities through batch blocks.= Building your first module: Hello, World
Rama is a distributed-first, scalable programming platform for building an application's entire data layer. It integrates data ingestion, processing, indexing, and querying into a single, unified system.
This tutorial introduces the basics of writing and running a Rama application by building a simple "Hello, World" example.
== Setting up your sandbox
First, clone the `rama-examples` project.
[source,shell]
----
git clone https://github.com/redplanetlabs/rama-examples.git
cd rama-examples
----
The project uses Maven. To run the examples, launch an interactive Groovy shell. You must compile the project first.
[source,shell]
----
mvn compile
mvn gplus:shell
----
Starting the shell may take 30-60 seconds as it loads Rama classes.
== Hello, World Example
The following `HelloWorldModule` defines a minimal Rama application.
[source,java]
----
package rama.examples.tutorial;
import com.rpl.rama.*;
import com.rpl.rama.module.*;
import com.rpl.rama.ops.Ops;
import com.rpl.rama.test.*;
public class HelloWorldModule implements RamaModule {
@Override
public void define(Setup setup, Topologies topologies) {
setup.declareDepot("*depot", Depot.random());
StreamTopology s = topologies.stream("s");
s.source("*depot").out("*data")
.each(Ops.PRINTLN, "*data");
}
public static void main(String[] args) throws Exception {
try (InProcessCluster cluster = InProcessCluster.create()) {
cluster.launchModule(new HelloWorldModule(), new LaunchConfig(1, 1));
String moduleName = HelloWorldModule.class.getName();
Depot depot = cluster.clusterDepot(moduleName, "*depot");
depot.append("Hello, world!!");
}
}
}
----
Run this code from the Groovy shell:
[source,shell]
----
groovy:000> rama.examples.tutorial.HelloWorldModule.main(null)
----
After a brief pause on the first run, you will see `Hello, world!!` printed to the console.
== The Big Picture: Event Sourcing
Rama's programming model is based on _event sourcing_ and _materialized views_. Instead of mutating data in place (like a traditional database), you append immutable facts to a log. From this log, you compute indexed "views" of your data to serve queries.
This separates the _source of truth_ (the log) from the _query layer_ (the views).
* **Source of Truth**: An append-only, normalized log ensures data consistency and provides a natural audit trail. If a bug corrupts a view, it can be recomputed from the log.
* **Materialized Views**: Views can be denormalized and structured specifically for your application's query patterns, ensuring high performance.
Rama integrates this process, allowing client appends to be coordinated with view updates. This makes it suitable for both synchronous, interactive use cases and asynchronous ones.
== Core Concepts
=== Clusters
A _cluster_ is Rama's execution environment. It is a group of networked machines (_nodes_) running processes with three distinct roles:
* **Conductor**: Orchestrates launching and updating modules.
* **Supervisor**: Manages worker processes on a single machine.
* **Worker**: Executes the data processing logic defined in your modules.
image::./diagrams/module-diagrams/cluster-overview.svg[]
For development and testing, Rama provides an **in-process cluster (IPC)**. An IPC is a lightweight, virtual cluster that runs within a single JVM process, enabling a much faster feedback loop. The example uses `InProcessCluster.create()`.
=== Modules
A _module_ is Rama's executable unit. It's where you define your data schemas, processing logic, and queries. To create a module, you implement the `RamaModule` interface and its `define` method.
A module contains several key components:
* **Depots**: Entry points for data into the module.
* **ETLs**: Logic for extracting, transforming, and loading (processing) data.
* **PStates**: Indexed, partitioned data structures (materialized views).
* **Query Topologies**: Pre-defined, distributed queries.
=== Anatomy of the Example
Let's break down the `HelloWorldModule` code.
**1. The `main` method (Client-side logic)**
The `main` method simulates a client interacting with the Rama cluster. In a real application, this logic would live in your application's backend services, not inside the module definition.
[source,java]
----
// 1. Create a lightweight cluster for development.
try (InProcessCluster cluster = InProcessCluster.create()) {
// 2. Deploy the module definition to the cluster with a resource configuration.
cluster.launchModule(new HelloWorldModule(), new LaunchConfig(1, 1));
String moduleName = HelloWorldModule.class.getName();
// 3. Get a handle to the module's depot.
Depot depot = cluster.clusterDepot(moduleName, "*depot");
// 4. Append data to the depot, triggering the module's ETL.
depot.append("Hello, world!!");
}
----
[NOTE]
====
Launching modules on a real cluster is done via Rama's xref:operating-rama.adoc[command-line client], which offers more configuration options. The concepts remain the same.
====
**2. The `define` method (Module Definition)**
The `define` method describes the module's static topology. This code runs when the module is launched on the cluster.
[source,java]
----
@Override
public void define(Setup setup, Topologies topologies) {
// 1. Declare a depot named "*depot". Data will enter the module here.
// Strings starting with * are variables in Rama's APIs.
setup.declareDepot("*depot", Depot.random());
// 2. Define a stream topology, which processes data as it arrives.
StreamTopology s = topologies.stream("s");
s.source("*depot").out("*data") // 3. Source data from "*depot" and bind it to the *data variable.
.each(Ops.PRINTLN, "*data"); // 4. For each incoming datum, execute the PRINTLN operation.
}
----
When `depot.append()` is called in `main`, the string `"Hello, world!!"` enters `*depot`. The stream topology, which is sourcing that depot, immediately receives the data and executes `Ops.PRINTLN` on it.
=== PStates and Queries
Typically, an ETL's purpose is not just to print data, but to build and maintain indexes to answer application queries. In Rama, these indexes are called xref:pstates.adoc[PStates] (partitioned states).
You can query PStates directly for simple lookups or define complex, distributed xref:query.adoc[query topologies] for more advanced needs. You will learn about PStates and queries in the next tutorial.
== Summary
* Rama applications run on **clusters**. For development, you use an **in-process cluster**.
* **Modules** are Rama's executables, containing your data processing logic.
* You **launch** a module on a cluster to run it.
* Data enters a module through **depots** when a client **appends** to them.
* **ETLs** (like stream topologies) **source** data from depots and perform operations, often to build and update **PStates** (indexes).= Doing work in Rama: Depots, ETLs, and PStates
This document introduces the core Rama constructs for building applications: depots for ingesting data, PStates for storing it, and ETLs for processing it. The fundamental workflow involves gathering data in depots, transforming it with ETLs, and storing the results in PStates for efficient retrieval.
== Storing results in PStates
Rama's construct for data storage is the xref:pstates.adoc[partitioned state] (PState). While PStates are named data containers like tables in an RDBMS, they are far more flexible. A PState is an _arbitrarily compound data structure of arbitrary size_. This allows you to store data in the exact shape you need to use it, avoiding the impedance mismatches common with other databases.
The following example implements a simple word count to demonstrate the basic concepts.
[source,java]
----
package rama.examples.tutorial;
import com.rpl.rama.*;
import com.rpl.rama.module.*;
import com.rpl.rama.test.*;
public class SimpleWordCountModule implements RamaModule {
@Override
public void define(Setup setup, Topologies topologies) {
setup.declareDepot("*wordDepot", Depot.random());
StreamTopology s = topologies.stream("wordCountStream");
s.pstate("$$wordCounts", PState.mapSchema(String.class, Long.class));
s.source("*wordDepot").out("*token")
.hashPartition("*token")
.compoundAgg("$$wordCounts", CompoundAgg.map("*token", Agg.count()));
}
public static void main(String[] args) throws Exception {
try (InProcessCluster cluster = InProcessCluster.create()) {
cluster.launchModule(new SimpleWordCountModule(), new LaunchConfig(1, 1));
String moduleName = SimpleWordCountModule.class.getName();
Depot depot = cluster.clusterDepot(moduleName, "*wordDepot");
depot.append("one");
depot.append("two");
depot.append("two");
depot.append("three");
depot.append("three");
depot.append("three");
PState wc = cluster.clusterPState(moduleName, "$$wordCounts");
System.out.println("one: " + wc.selectOne(Path.key("one")));
System.out.println("two: " + wc.selectOne(Path.key("two")));
System.out.println("three: " + wc.selectOne(Path.key("three")));
}
}
}
----
=== Creating a partitioned state
You create a PState on a topology instance with `.pstate()`:
[source,java]
----
s.pstate("$$wordCounts", PState.mapSchema(String.class, Long.class));
----
A PState requires a name (e.g., `$$wordCounts`) and a schema. The schema defines the PState's data structure, like `PState.mapSchema(String.class, Long.class)` for a key-value store. PStates belong to a topology but must have names unique within the module. They are materialized views of depots, and only the owning topology can write to them.
=== Updating PStates
PStates are updated via ETLs that create a data pipeline from a depot. As new data is appended to the depot, the ETL processes it and updates the PState.
[width="80%",options="header"]
|====
| Time| Event| `$$wordCounts`
| 1| `append("one")`| `{"one": 1}`
| 2| `append("two")`| `{"one": 1, "two": 1}`
| 3| `append("two")`| `{"one": 1, "two": 2}`
|====
This is achieved with the following code:
[source,java]
----
s.source("*wordDepot").out("*token")
.hashPartition("*token")
.compoundAgg("$$wordCounts", CompoundAgg.map("*token", Agg.count()));
----
. `.source("*wordDepot").out("*token")`: Reads from `*wordDepot` and binds each entry to the `*token` variable.
. `.hashPartition("*token")`: Relocates the subsequent computation to a specific partition based on the value of `*token`, enabling distribution and scalability.
. `.compoundAgg("$$wordCounts", ...)`: Updates the PState. xref:aggregators.adoc[Aggregators] like `CompoundAgg.map("*token", Agg.count())` specify how to navigate the PState's structure and what update to perform (e.g., increment a counter at the key specified by `*token`).
=== Querying PStates
To query a PState from a client, get a reference to it and use methods like `selectOne()`. Queries use xref:paths.adoc[Path] objects to navigate the PState's data structure. Paths are powerful selectors, similar to XPath or CSS, that can traverse nested structures, filter, and apply transformations.
[source,java]
----
PState wc = cluster.clusterPState(moduleName, "$$wordCounts");
// Returns the count for the word "one"
wc.selectOne(Path.key("one"));
----
If a PState contained nested data like `{"Frankenstein": {"rejoiced": 3}}`, a path could retrieve the nested value:
[source,java]
----
wc.selectOne(Path.key("Frankenstein", "rejoiced")) // returns 3
----
=== More PState capabilities
PStates also support advanced features like "subindexing" for huge inner data structures and "fine-grained reactive queries" for continuous updates. These are described on the xref:pstates.adoc[PStates page].
=== Rama programming
The word count example demonstrates the fundamental Rama workflow:
. Append data to depots.
. ETLs read from depots, process the data, and...
. ...store the results in PStates for later retrieval.
.The basic Rama workflow
image::./diagrams/module-diagrams/single-depot-etl.svg[]
A module can have multiple depots and ETLs, and ETLs can read from multiple depots to populate multiple PStates.
.A more complex module
image::./diagrams/module-diagrams/multiple-depots-etls.svg[]
Rama is designed to be a single, unified system for all your data needs. It integrates storage and computation, eliminating the need to glue together disparate systems like Kafka, Cassandra, and Redis.
== Rama as a Unified Data System
Traditional architectures often combine multiple systems (e.g., RDBMS, NoSQL databases, caches, message queues), leading to significant complexity in development, operation, and data consistency.
Rama's innovation is integrating all aspects of a data system into a single, cohesive platform:
* *PStates*:: Serve as any data model (key/value, document, graph) because they are arbitrary data structures. You can create as many as you need, each with its own optimal shape.
* *Topologies*:: Provide scalable, Turing-complete distributed computation to build and query PStates.
* *Depots*:: Act as the unified entry point for all incoming data.
* *Queries*:: A flexible API supports everything from simple lookups to complex, on-demand distributed computations.
* *Simplified Operations*:: Scalability, deployment, and monitoring are built-in.
== Theory in practice: PStates are views
A core part of Rama development is identifying useful views of your data and creating PStates to materialize them. You store data in the structure that is optimal for how it will be consumed.
For example, given raw web analytics events, you might create two PStates: one for page hit counts and another for user session histories.
.Raw Event Data
[source, json]
----
{"path": "/product/1", "duration": 3500, "sessionId": "abc"}
----
.Desired PState Views
[source, json]
----
// Page Hit Count PState
{"/product/1": 1}
// Session History PState
{"abc": [{"path": "/product/1", "duration": 3500}]}
----
=== Analytics app example
The following module implements this analytics application.
[source,java]
----
public class PageAnalyticsModule implements RamaModule {
@Override
public void define(Setup setup, Topologies topologies) {
setup.declareDepot("*depot", Depot.random());
StreamTopology s = topologies.stream("s");
// PState for page hit counts
s.pstate("$$pageViewCount", PState.mapSchema(String.class, Long.class));
// PState for session histories (map of string to list of maps)
s.pstate("$$sessionHistory",
PState.mapSchema(
String.class,
PState.listSchema(PState.mapSchema(String.class, Object.class))));
s.source("*depot").out("*pageVisit")
.each((Map<String, Object> visit) -> visit.get("sessionId"), "*pageVisit").out("*sessionId")
.each((Map<String, Object> visit) -> visit.get("path"), "*pageVisit").out("*path")
.compoundAgg("$$pageViewCount", CompoundAgg.map("*path", Agg.count()))
.compoundAgg("$$sessionHistory", CompoundAgg.map("*sessionId", Agg.list("*pageVisit")));
}
// main method omitted for brevity
}
----
=== Schemas define PState structure
A PState's structure is defined by its schema. Schemas can be simple types (e.g., `Long.class`) or collections like `mapSchema`, `listSchema`, and `setSchema`. These can be nested to create complex structures, allowing you to model your data naturally.
Examples of schemas:
`PState.mapSchema(String.class, Long.class)`:: A map from Strings to Longs.
`PState.listSchema(Object.class)`:: A list of any type.
`PState.mapSchema(String.class, PState.setSchema(Long.class))`:: A map from a String to a set of Longs.
=== ETLs update PStates
ETLs define a distributed data pipeline using a chain of methods to process data.
The key concepts of the ETL programming model are:
* *Vars*:: String identifiers starting with `*` (e.g., `"*pageVisit"`) act as temporary variables within the topology.
* `.source(...).out("*var")`:: Binds data from a depot to a var.
* `.each(javaLambda, "*inputVar").out("*outputVar")`:: Applies a Java function to an input var and binds the result to an output var. This is how you use arbitrary Java logic.
* `.compoundAgg("$$pstate", ...)`:: Uses vars to update a PState.
The ETL API is an abstraction for distributed computation. It is implemented in Java, allowing you to use the full power of the language (like lambdas) to transform data within the pipeline.
=== Queries can be distributed computations, too
Queries are not limited to simple key lookups. Rama provides *query topologies* (`topologies.query(...)`) for defining complex, on-demand distributed computations over PStates.
ETLs are a type of topology called a *stream topology* (`topologies.stream(...)`). Both share the same fundamental API for defining computation graphs, unifying how you work with data whether it's streaming in or being queried on-demand.
== Summary
This document covered the core workflow for building applications in Rama.
* **Data Flow**: Data enters via depots, is processed by topologies (ETLs) into PStates, and clients query those PStates.
* **PStates**: Flexible, arbitrarily structured materialized views, shaped for optimal consumption.
* **Topologies**: Stream and query topologies are distributed computations defined with a unified API that allows arbitrary Java logic for transformations.
The result is a single, powerful system that integrates storage and computation, simplifying backend development by letting you use standard Java to work with data in its natural shape.= Distributed Programming in Rama
Rama's distributed programming model enables you to spread work across a cluster of machines. This guide covers the model's core concepts and its design implications. While distributed programming is a complex field, Rama provides abstractions that simplify building high-performance, scalable applications without requiring deep expertise.
== Why Distributed?
Distributed systems are built for several reasons, including separation of concerns, performance, and resilience. This guide focuses on *performance*: how Rama's design allows you to distribute work across multiple machines to handle greater load.
[#task-model]
== The Task Model for Performance
In Rama, a *module* contains all the storage (PStates, depots) and logic (topologies) for your backend. When you deploy a module, you define its parallelism using three parameters:
. *Tasks:* A task is a partition of a module. It contains one partition of every depot and PState, plus an event queue. The number of tasks must be a power of two.
. *Threads:* The number of OS threads that will execute the tasks.
. *Workers:* The number of JVM processes (workers) running across the cluster.
For example, a module with 64 tasks, 32 threads, and 8 workers will spawn 8 worker processes. Each worker will have 4 threads, and each thread will run 2 tasks. You scale a module by adjusting the number of threads and workers.
Tasks are like lightweight threads, identified by an index (e.g., "task 0"). Rama's dataflow API makes it seamless to move between tasks to access different data partitions.
== PStates and Depots are Partitioned
A key concept is that PStates and depots are *partitioned* across all tasks. A logical entity, like a `$$wordCounts` PState, is physically split into separate partitions, one for each task.
When an ETL runs on a specific task, any reference to `$$wordCounts` accesses the partition local to that task. This has a critical implication: if two tasks independently increment a count for the same word, each task's partition will have a count of 1, leading to an incorrect global total of 1 instead of 2.
To achieve correct results, you must ensure that all operations for a given piece of data (like the word "ostentation") are always routed to the same task.
== Controlling Distribution with Partitioners
You control where data is sent and where work is performed using *partitioners*. You can specify a partitioner in two places:
. *Depot Definition:* A depot partitioner determines which task receives a record when it's appended.
. *ETL Logic:* An ETL partitioner relocates the current ETL execution to a different task, transferring its current scope (e.g., variables bound with `.out()`).
Consider this word count example:
[source,java]
----
public class SimpleWordCountModule implements RamaModule {
@Override
public void define(Setup setup, Topologies topologies) {
// Appended records are sent to a random task's depot partition.
setup.declareDepot("*wordDepot", Depot.random());
StreamTopology s = topologies.stream("wordCountStream");
s.pstate("$$wordCounts", PState.mapSchema(String.class, Long.class));
s.source("*wordDepot").out("*token")
// Relocate the ETL to a task determined by the hash of *token.
.hashPartition("*token")
.compoundAgg("$$wordCounts", CompoundAgg.map("*token", Agg.count()));
}
}
----
In this flow:
1. A word is appended to a *random* depot partition (e.g., on task 0).
2. The ETL on task 0 reads the word.
3. `.hashPartition("*token")` calculates a target task based on the word's value and relocates the ETL to that task (e.g., task 1).
4. The `.compoundAgg` operation then updates the `$$wordCounts` partition on task 1.
Because `.hashPartition` is deterministic, all occurrences of the same word will always be processed on the same task, ensuring its count is aggregated correctly in a single PState partition.
== Designing Applications for Distribution
In Rama, partitioning is a primary design concern that directly impacts your application's performance and data consistency.
=== Performance
Your partitioning strategy affects efficiency. In the example above, using `Depot.random()` is inefficient because an append might go to one task, only to require a network hop to the correct task for processing.
A more performant design sends data directly to the correct task from the start:
[source,java]
----
public class SimpleWordCountModule implements RamaModule {
@Override
public void define(Setup setup, Topologies topologies) {
// Depot partitioner hashes the record itself to pick a partition.
setup.declareDepot("*wordDepot", Depot.hashBy(Ops.IDENTITY)); // <1>
StreamTopology s = topologies.stream("wordCountStream");
s.pstate("$$wordCounts", PState.mapSchema(Object.class, Object.class));
s.source("*wordDepot").out("*token")
// No .hashPartition needed; the data is already on the correct task.
.compoundAgg("$$wordCounts", CompoundAgg.map("*token", Agg.count()));
}
}
----
<1> The depot now deterministically routes words to the correct task partition.
The general principle is to *co-locate related data and work on the same partition* to minimize network hops.
=== Consistency
Partitioning also affects data consistency. Rama does not guarantee a global processing order for records appended to different depot partitions. This can create race conditions.
Consider a user profile module where edits are appended to a depot with a random partitioner.
1. User `bananaman` submits edit A, which lands on depot partition 0.
2. User `bananaman` submits edit B, which lands on depot partition 1.
3. The ETL for edit A starts on task 0 and partitions to task 5 (the correct task for `bananaman`).
4. The ETL for edit B starts on task 1, partitions to task 5, and updates the profile.
5. The ETL for edit A arrives on task 5 and processes its stale data, overwriting the newer update from edit B.
The solution is to use a depot partitioner (e.g., `Depot.hashBy(username)`). Rama *does* guarantee that records appended to the *same* depot partition are processed in the order they were received. This ensures that edits for a single user are processed sequentially, preventing race conditions.
== Summary
* Rama's distributed model is built on *tasks*, which are concurrent, partitioned instances of a module.
* PStates and depots are physically *partitioned* across all tasks. An ETL on a given task interacts with its local partitions.
* *Partitioners* are used to control work distribution. Depot partitioners route incoming data, and ETL partitioners relocate execution between tasks.
* Designing an effective partitioning scheme is crucial for both *performance* (by co-locating data) and *consistency* (by ensuring correct processing order).= Dataflow Programming
In Rama, you write ETL and query topologies using a _dataflow API_. This chapter explains the dataflow programming paradigm and the API's core elements.
== The Dataflow Programming Paradigm
Dataflow programming can be understood in contrast to the more common imperative paradigm.
* **Imperative Programming:** A sequence of instructions for a worker to execute (e.g., call this method, assign this variable). Think of a robot following commands.
* **Dataflow Programming:** An assembly line where each worker waits for an input, performs an operation, and sends outputs to other workers. This forms a directed graph of operations, which Rama calls a _topology_.
image::./diagrams/dataflow-diagrams/basic-dataflow-graph.png[]
While this model may feel different, it is highly expressive for distributed programming, which is why Rama uses it.
== Operations, Input, and Output
In an imperative program, methods coordinate through a callstack, passing arguments and waiting for a single return value.
image::./diagrams/dataflow-diagrams/basic-function-call.png[]
In a dataflow program, _operations_ wait for input, perform work, and emit output to other operations.
image::./diagrams/dataflow-diagrams/operations.png[]
Key differences from imperative programming include:
. **Decoupled Execution:** The mapping of operations to threads is an implementation detail, not dictated by a main execution thread.
. **Reactivity:** Operations execute only when they receive input.
. **Flexible Output:** An operation can _emit_ multiple values to multiple downstream operations, unlike a method which _returns_ a single value to its caller.
== Dataflow in Rama
In Rama, you define a dataflow graph's structure and its operations simultaneously using a builder-style API. The following examples can be run in a Java process or a Groovy shell.
[source, shell]
----
groovy:000> import com.rpl.rama.Block;
groovy:000> import com.rpl.rama.ops.Ops;
----
Here is a simple dataflow graph with a single node:
[source,java]
----
public static void singleNode() {
Block.each(Ops.PRINTLN, 10).execute();
}
----
`Block.each()` appends a node to the dataflow graph. It means, "For each input, execute this operation with these arguments." The `.execute()` method sends a single, empty input to the graph, causing `Ops.PRINTLN` to run and print `10`.
image::./diagrams/dataflow-diagrams/one-node-example.png[]
=== Variable Scope
As you chain methods, you build a linear dataflow graph where the output of one node becomes the input for the next. Variables are managed using a _scope_.
[source,java]
----
Block.each(Ops.IDENTITY, "a").out("*x")
.each(Ops.PRINTLN, "*x")
.execute();
----
image::./diagrams/dataflow-diagrams/println-example.png[]
Here's how this works:
. `each(Ops.IDENTITY, "a")` emits the string `"a"`.
. `out("*x")` receives `"a"` and binds it to the var `*x` in a new scope. This scope is passed downstream.
. `each(Ops.PRINTLN, "*x")` receives the scope, looks up the value for `*x`, and prints `"a"`.
Strings prefixed with `*` or `$$` are treated as vars. To use a literal string with these prefixes, wrap it in a `Constant`, e.g., `new Constant("*x")`.
You can incrementally build up the scope:
[source,java]
----
Block.each(Ops.IDENTITY, "a").out("*x")
.each(Ops.IDENTITY, "b").out("*y")
.each(Ops.TO_STRING, "*x", "*y", 1, "!", 2).out("*z")
.each(Ops.PRINTLN, "*z")
.execute(); // Prints "ab1!2"
----
=== Performance
Rama compiles dataflow graphs into efficient bytecode. Vars and control flow constructs become optimized, low-level Java constructs.
== `execute()` vs. `source()`
The `execute()` method is used in these examples to synthetically trigger a dataflow graph. In production code, you will instead _source_ data from depots, which continuously read and emit records into the topology.
[source,java]
----
// In a real module
s.source("*wordDepot").out("*token")
.hashPartition("$$wordCounts", "*token")
.compoundAgg("$$wordCounts", CompoundAgg.map("*token", Agg.count()));
----
== Branching Dataflow Graphs
You can create graphs where one node emits to multiple branches, and multiple branches merge into one. This is achieved using a 1D sequence of API calls to build a 2D graph structure.
[source,java]
----
public static class Brancher implements RamaOperation0 {
@Override
public void invoke(OutputCollector collector) {
collector.emitStream("pizzaOrders", 1);
collector.emitStream("saladOrders", 10);
collector.emitStream("saladOrders", 11);
}
}
public static void unifyExample() {
Block.each(new Brancher()).outStream("pizzaOrders", "pizzaAnchor1", "*pizzaOrderSize")
.outStream("saladOrders", "saladAnchor1", "*saladOrderSize")
.each(Ops.DEC, "*saladOrderSize").out("*orderSize")
.anchor("saladAnchor2")
.hook("pizzaAnchor1")
.each(Ops.INC, "*pizzaOrderSize").out("*orderSize")
.anchor("pizzaAnchor2")
.unify("pizzaAnchor2", "saladAnchor2")
.each(Ops.PRINTLN, "*orderSize")
.execute();
}
----
This constructs the following graph:
image::./diagrams/dataflow-diagrams/unify-example.png[]
=== Output Streams, Anchors, Hooks, and Unify
* **Output Streams:** Operations emit values to named streams. By default, they emit to the _default output stream_. `emitStream()` in a custom operation allows emitting to specific streams.
* **`outStream`:** Creates a node that consumes from a named stream and binds its output to vars. It also lets you name the node by creating an _anchor_.
* **`anchor`:** Attaches a name (an anchor) to the preceding node.
* **`hook`:** Switches the context to a previously defined anchor, allowing you to add subsequent nodes to a different branch.
* **`unify`:** Merges multiple branches into a single stream. A var is in scope after a `unify` if it was:
** In scope before the branch point, OR
** Defined on *all* unified branches.
In the example, `*orderSize` is defined on both branches, so it is available after `unify`.
== Conditionals
You can add conditional branching to a dataflow graph.
[source,java]
----
Block.ifTrue(new Expr(Ops.EQUAL, 1, 2),
Block.each(Ops.PRINTLN, "math is dead!"),
Block.each(Ops.PRINTLN, "math is alive!"))
.each(Ops.PRINTLN, "conditional complete")
.execute();
----
image::./diagrams/dataflow-diagrams/full-conditional.png[]
`ifTrue` creates a conditional node. It takes a condition, a block for the `true` case, and an optional block for the `false` case. `Expr` is used here for a more concise inline expression.
The branches are implicitly unified. For a var to be available after the conditional, it must be defined in *all* branches. This code is valid because `*a` is defined in both the `true` and `false` blocks:
[source,java]
----
Block.ifTrue(new Expr(Ops.EQUAL, 1, 2),
Block.each(Ops.IDENTITY, 1).out("*a"),
Block.each(Ops.IDENTITY, 2).out("*a"))
.each(Ops.PRINTLN, "*a")
.execute();
----
Rama performs compile-time checks to prevent invalid var references.
== Writing Custom Operations
You can define custom operations by implementing `RamaFunction` or `RamaOperation`, or by using lambdas and method references.
* **`RamaFunction`:** Simpler and more efficient. The `invoke` method takes arguments and returns a single value, which is emitted to the default output stream.
[source,java]
----
public static class AddTen implements RamaFunction1<Integer, Integer> {
@Override
public Integer invoke(Integer l) {
return l + 10;
}
}
Block.each(new AddTen(), 1).out("*numberPlusTen").execute();
----
* **`RamaOperation`:** More general. The `invoke` method receives an `OutputCollector` to emit multiple values to multiple streams. When an operation emits, downstream code executes immediately, and control returns to the operation only after it finishes.
[source,java]
----
public static class MyOperation implements RamaOperation0 {
@Override
public void invoke(OutputCollector collector) {
collector.emitStream("streamA", 1);
collector.emit(2, 3, "s"); // Emits to the default stream
}
}
----
* **Lambdas and Method References:** A more concise way to define operations, and are generally preferred.
[source,java]
----
// Lambda
Block.each((Integer l) -> l + 10, 1).out("*numberPlusTen").execute();
// Method Reference
Block.each(Math::abs, -1.2).out("*absoluteValue").execute();
----
== Immutable Scope and Shadowing
Scope bindings in Rama are *immutable*. When a node emits, its scope cannot be changed by other nodes.
In the `shadowExample` below, the `"stream2"` branch rebinds `*person`. This creates a new, distinct scope and does not affect the `*person` binding in the `"stream1"` branch. The first branch will always print `"Megan"`.
[source,java]
----
// Defines a Person class and a MultiOut operation
public static void shadowExample() {
Block.each(Ops.IDENTITY, new Person("Megan")).out("*person")
.each(new MultiOut()).outStream("stream1", "stream1Anchor")
.outStream("stream2", "stream2Anchor")
.each(Ops.IDENTITY, new Person("John")).out("*person") // Shadowing
.hook("stream1Anchor")
.each((Person person) -> person.name).out("*personName")
.each(Ops.PRINTLN, "*personName") // Prints "Megan"
.execute();
}
----
However, if you *mutate* a shared object within a scope, the change will be visible across branches, potentially leading to race conditions. In `shadowMutationExample`, the `Person` object itself is modified, which can affect the output of other branches depending on execution order.
== Looping
Rama's dataflow API is Turing-complete, featuring a looping construct.
[source,java]
----
public static void loopExample() {
Block.loopWithVars(LoopVars.var("*i", 0),
Block.ifTrue(new Expr(Ops.NOT_EQUAL, "*i", 5),
Block.emitLoop("*i")
.each(Ops.PRINTLN, "Variable *i is not 5 yet")
.continueLoop(new Expr(Ops.INC, "*i")))).out("*loopValue")
.each(Ops.PRINTLN, "Emitted:", "*loopValue")
.execute();
}
----
image::./diagrams/dataflow-diagrams/looping.png[]
* `loopWithVars`: Initializes loop variables (e.g., `*i` to `0`).
* `emitLoop`: Emits a value *out* of the loop to the code that follows. This does not terminate the loop. Control flow immediately passes to the code after the loop, executes it, and then returns to the node after `emitLoop`.
* `continueLoop`: Triggers the next iteration of the loop with new values for the loop variables. If `continueLoop` is not called, the loop terminates.
Unlike standard loops, code following a Rama loop can execute multiple times via `emitLoop` *before* the loop has finished.
=== Looping without Vars
The `loop` method works like `loopWithVars` but without explicit loop variables. State is typically managed by a mutable object bound before the loop.
[source,java]
----
Block.each((RamaFunction0) ArrayList::new).out("*list")
.loop(
Block.each(Ops.SIZE, "*list").out("*size")
.ifTrue(new Expr(Ops.EQUAL, "*size", 3),
Block.emitLoop(), // Emits when list size is 3
Block.each((List l) -> l.add("iter"), "*list")
.continueLoop()))
.each(Ops.PRINTLN, "Result:", "*list")
.execute();
----
== Summary
Rama's dataflow API provides precise control over data processing, branching, and merging logic. More advanced features like batch computation and code reuse are covered in xref:intermediate-dataflow.adoc[Intermediate dataflow programming].=========We will now explain how to use rama using the clojure api.========== Rama Dataflow Language: A Distilled Guide
Rama's dataflow API is a Turing-complete language for defining ETL and query topologies, enabling the expression of arbitrary distributed computation.
// To run the examples, use the rama-clojure-starter project:
// https://github.com/redplanetlabs/rama-clojure-starter
[source, clojure]
----
(use 'com.rpl.rama)
(use 'com.rpl.rama.path)
(require '[com.rpl.rama.ops :as ops])
(require '[com.rpl.rama.aggs :as aggs])
----
== Basic Structure
Rama is based on a "call and emit" paradigm, a generalization of the "call and response" model used by most languages. An operation can be called with arguments and emit zero, one, or many values to multiple independent "output streams".
[source,clojure]
----
(deframaop foo [*arg]
(:> (inc *arg)) // Emit to the default stream
(:> (dec *arg)))
(?<-
(foo 5 :> *v) // Capture emits from the default stream into *v
(println "Emitted:" *v))
// Emitted: 6
// Emitted: 4
----
* `deframaop` defines a custom dataflow operation.
* `?<-` compiles and executes a dataflow block for testing and exploration.
Dataflow code consists of a sequence of *segments*. A segment includes an operation, input fields, and output declarations.
[source,clojure]
----
(operation input1 input2 :output-stream> <anchor> *var1 *var2)
----
Key components of a segment are:
* *Output Stream*:: A keyword ending in `>`. The default is `:>`.
* *Anchor*:: A symbol wrapped in `< >`, like `<my-anchor>`, used for branching and merging.
* *Variable*:: A symbol prefixed with `*` (value), `%` (anonymous operation), or `$$` (PState). Regular Clojure symbols are treated as constants.
Clojure macros and nested expressions are expanded and evaluated before being passed as arguments to Rama operations.
[source,clojure]
----
(?<-
(println "Res:" (-> 10 (+ 3) (* 4))))
// Res: 52
----
== Branching and Unification
Dataflow code can form a graph. `anchor>` labels a point, and `hook>` attaches subsequent code to a labeled anchor, creating branches.
[source,clojure]
----
(deframaop multi-stream-op [*arg]
(:> (inc *arg))
(:other> (dec *arg)))
(?<-
(multi-stream-op 5 :> <default> *v :other> <other> *v)
(println "Default stream:" *v)
(hook> <other>)
(println "Other stream:" *v))
----
Branches can be merged using `unify>`. A variable is only in scope after `unify>` if it is bound on *all* incoming branches.
[source,clojure]
----
(deframaop multi-out [] (:a> 1) (:b> 2))
(?<-
(multi-out :a> <a> *v :b> <b> *v)
(unify> <a> <b>)
(println "Val:" *v))
// Val: 1
// Val: 2
----
This creates an "abstract syntax graph" (ASG), which Rama reifies during compilation.
== Conditionals
Rama provides several ways to express conditional logic.
* `<<if`:: Similar to Clojure's `if`. The `then` and optional `else` branches are automatically unified.
+
[source,clojure]
----
(?<-
(<<if (= 1 2)
(println "True")
(else>)
(println "False")))
// False
----
* `if>`:: A primitive operation with `:then>` and `:else>` output streams. It is not a special form and can be passed as an argument.
* `ifexpr`:: A nested expression that returns a value, like Clojure's `if`.
* `<<cond`:: An analogue to Clojure's `cond`. All branches are unified. If a case emits multiple times, the continuation executes multiple times.
+
[source,clojure]
----
(?<-
(<<cond
(case> (> 2 1))
(identity 2 :> *v)
(default>)
(identity -1 :> *v))
(println "Val:" *v))
// Val: 2
----
+
Use `(default> :unify false)` to prevent unification on the default branch, which is useful when throwing exceptions.
== Loops
`loop<-` provides dataflow-aware looping. It emits values via `:>` and continues with `continue>`.
[source,clojure]
----
(?<-
(loop<- [*i 0 :> *v]
(:> *i) ; Emit the current value
(<<if (< *i 2)
(continue> (inc *i))))
(println "Emit:" *v))
// Emit: 0
// Emit: 1
// Emit: 2
----
The loop's continuation runs immediately after an emit. `continue>` can be called multiple times in an iteration if the loop body is asynchronous (e.g., after a partitioner).
== Custom Operations
Decompose logic using several types of operations.
* *Clojure Functions*:: Standard functions whose return value is treated as a single emit to `:>`.
* *`defoperation`*:: Defines a `ramaop` in Clojure. Output streams are bound to symbols that are called to emit values.
* *`deframaop`*:: Defines a `ramaop` using dataflow code. Emits are made by calling output stream keywords as operations.
* *`deframafn`*:: Like `deframaop`, but its body must emit to `:>` exactly once as its final action. It can be called directly from Clojure.
* *`<<ramaop` / `<<ramafn`*:: Define anonymous operations, which are bound to `%` variables and capture their lexical closure.
+
[source,clojure]
----
(?<-
(identity 10 :> *v)
(<<ramafn %f [*a]
(:> (+ *a *v)))
(println "Res:" (%f 1) (%f 2)))
// Res: 11 12
----
*NOTE:* Do not store anonymous operations in depots or PStates, as their underlying class names are not stable across module updates.
== Key Differences from Clojure
* No direct Java interop (`.`). Use Clojure wrapper functions.
* No `fn`. Use `<<ramaop` or `<<ramafn`.
* No support for primitives.
* No special forms (`if`, `let`, etc.). Rama provides dataflow equivalents like `<<if` and `<<cond`.
* No var indirection for constants. Symbols used as arguments are embedded as constants at compile time. This affects `with-redefs`.
* Constants must be serializable for modules (e.g., basic types, records, data structures, `RamaSerializable` implementations).
== Distributed Execution
=== Partitioners
Partitioners are operations that route computation, potentially to a different task on a different machine. They have a `|` prefix.
* `|hash`:: Partitions to a single task based on the hash of a key.
* `|all`:: Broadcasts to all tasks.
Rama automatically analyzes variable scope to serialize only the necessary data across partitioner boundaries. The `$$` variants (`|hash$$`) partition based on the partitions of a depot or PState.
=== Asynchronous Operations
To avoid blocking task threads, use these operations for asynchronous work.
* `yield-if-overtime`:: Pauses execution and reschedules it as a new event if the current event exceeds a time limit (default 5ms).
* `completable-future>`:: Integrates a Java `CompletableFuture`. The dataflow continues when the future completes, and failures are handled by Rama's retry mechanism.
== Interacting with PStates
PStates are accessed using operations and Specter-style paths from `com.rpl.rama.path`.
* `local-select>`:: Reads from the PState partition on the current task. Emits once per navigated value.
* `select>`:: Partitions based on the path's key before reading.
* `local-transform>`:: Writes to the local PState partition. The path must end with `term`, `termval`, or `NONE>`.
* `local-clear>`:: Resets a top-level PState to its initial value.
[source,clojure]
----
; Read all values from a list at *user-id
(local-select> [(keypath *user-id) ALL] $$p :> *v)
; Set the value at *user-id to *profile
(local-transform> [(keypath *user-id) (termval *profile)] $$profiles)
; Add 10 to the value at :some-field
(local-transform> [(keypath *user-id) :some-field (term (fn [v] (+ v 10)))] $$p2)
----
== Aggregators
Aggregators provide a declarative way to update PStates and perform computations in batch blocks. They are prefixed with `+`.
* `+compound`:: Aggregates nested values, automatically handling initialization.
+
[source,clojure]
----
; For each *k, add *v to the value at :a and increment the count
(+compound $$p {*k {:a [(aggs/+sum *v) (aggs/+count)]}})
----
+
In batch blocks, it can capture updated values with the `:new-val>` stream.
* `+group-by`:: Groups data by a key and applies aggregators, similar to SQL's `GROUP BY`.
Custom aggregators can be defined with:
* `accumulator`:: Defines an aggregation like a `reduce` function.
* `combiner`:: Defines a parallelizable aggregation by merging two values. Rama automatically uses two-phase aggregation for combiners, enabling massive scalability.
== Batch Blocks
Batch blocks (`<<batch`) provide a partially declarative execution mode with SQL-like capabilities, including joins and scalable aggregation.
[source,clojure]
----
(?<-
(<<batch
; Source 1
(ops/explode [[:a 1] [:b 2]] :> [*k *v1])
; Source 2 (joined on *k)
(gen>)
(ops/explode [[:a 10] [:c 4]] :> [*k **v2]) ; **v2 for outer join
; Post-aggregation
(println "Res:" *k *v1 **v2)))
// Res: :a 1 10
// Res: :b 2 nil
----
* `gen>` creates a new processing branch. Joins are inferred automatically based on variable usage.
* `**` variables indicate an outer join.
* Subbatches can be defined with `defgenerator` and `batch<-` for multi-stage aggregation.
* `materialize>` saves the results of a batch block to a temporary PState for reuse.
== Segmacros
Segmacros are Rama's macro system for generating dataflow segments. They operate after Clojure macros and return segments as vector data.
* `defbasicblocksegmacro`:: Defines a segmacro that expands to a block of segments.
* `defblock`:: A convenient way to define a segmacro that takes a block of dataflow code as an argument. These are typically prefixed with `<<`.
[source,clojure]
----
(defblock <<time [label block]
[[`System/currentTimeMillis :> '*t1]
[block> block]
[`System/currentTimeMillis :> '*t2]
[println label "elapsed:" (seg# - '*t2 '*t1) "ms"]])
(?<-
(<<time "myblock"
(identity 1)))
----
Notable built-in segmacros include `<<atomic`, `<<branch`, and `<<switch`.= Defining and Using Modules
This document is a reference for Rama's Clojure API, which mirrors the functionality of the Java API. For conceptual documentation, see the main xref:tutorial1.adoc[tutorial] (which uses Java) and the xref:terminology.adoc[Terminology page].
Useful resources:
* **Introductory Tutorial:** link:https://blog.redplanetlabs.com/2023/10/11/introducing-ramas-clojure-api/[Clojure API Blog Post]
* **API Documentation:** link:https://redplanetlabs.com/clojuredoc/index.html[ClojureDoc]
* **Examples:** link:https://github.com/redplanetlabs/rama-demo-gallery[rama-demo-gallery] project
== Defining Modules
Modules are defined with `defmodule`, which creates a Clojure function that receives `setup` and `topologies` arguments.
[source, clojure]
----
(defmodule MyModule [setup topologies]
(declare-depot setup *my-depot :random))
----
* `setup`: Declares depots, PStates, and dependencies on other modules.
* `topologies`: Declares stream, microbatch, and query topologies.
A module's name is derived from its namespace and symbol (e.g., `"com.mycompany/MyModule"`). You can override the symbol part of the name:
[source, clojure]
----
(defmodule MyModule {:module-name "OtherName"} [setup topologies]
; Module name is now "com.mycompany/OtherName"
)
----
Use `(get-module-name)` in tests to retrieve a module's name. For anonymous modules in tests, use `module`.
== Declaring Depots
xref:depots.adoc[Depots] are distributed, durable logs of data that act as sources for topologies. They are declared with `declare-depot`.
[source, clojure]
----
(defdepotpartitioner partition-by-value [data _]
(mod (:some-value data) _))
(defmodule Foo [setup topologies]
(declare-depot setup *depot1 :random)
(declare-depot setup *depot2 (hash-by first))
(declare-depot setup *depot3 :disallow)
(declare-depot setup *depot4 partition-by-value)
(declare-depot setup *depot5 :random {:global? true}))
----
The depot partitioner determines how client appends are distributed across partitions.
* `:random`: Appends go to a random partition.
* `(hash-by <fn>)`: Partitions based on the hash of a value extracted by `<fn>`.
* `:disallow`: Disables client appends; data can only be appended from within topologies.
* *Custom Partitioner*: A function defined with `defdepotpartitioner`.
The only option is `:global? true`, which creates a single-partition depot.
=== Depot Migrations
Depot records can be migrated during a xref:operating-rama.adoc#_updating_modules[module update]. A migration function transforms a record or removes it by returning `DEPOT-TOMBSTONE`. Migrations are applied on-read while data is updated in the background. See xref:depots.adoc#_migrations[the full documentation].
[source, clojure]
----
(declare-depot
setup *depot :random
{:migration (depot-migration
"my-migration-id"
(fn [record]
(if (= record 10) DEPOT-TOMBSTONE (str record))))})
----
The migration ID tracks progress. If the ID changes on a module update, the migration restarts.
== Declaring Tick Depots
Tick depots emit events at a specified frequency, allowing for time-based processing. The frequency is in milliseconds.
[source, clojure]
----
(defmodule Foo [setup topologies]
(declare-tick-depot setup *my-tick 60000)) ; Emits once per minute
----
See xref:depots.adoc#_tick_depots[this section] for behavior differences in stream vs. microbatch topologies.
== Declaring Mirrors
Mirrors are references to depots, PStates, or query topologies in other modules. See xref:module-dependencies.adoc[Module dependencies] for more details.
[source, clojure]
----
(defmodule Foo [setup topologies]
(mirror-depot setup *other-depot "com.mycompany.OtherModule" "*depot")
(mirror-pstate setup $$p "com.mycompany.OtherModule" "$$p")
(mirror-query setup *mirror-query "com.mycompany.OtherModule2" "some-query"))
----
Mirrored query topologies are invoked with `invoke-query`, just like local ones.
== Declaring Task Globals
link:https://redplanetlabs.com/docs/~/integrating.html#_task_globals[Task globals] provide global state to all tasks in a module, useful for large constants (like ML models) or external service clients. Declare them with `declare-object`.
[source, clojure]
----
(defmodule Foo [setup topologies]
(declare-object setup *my-global "global-value"))
----
If the object implements `TaskGlobalObject`, it can be specialized for each task.
== Declaring ETL Topologies
ETL (Extract-Transform-Load) topologies process data from depots. They can be streaming (see xref:stream.adoc[Stream topologies]) or microbatching (see xref:microbatch.adoc[Microbatch topologies]).
* `stream-topology`: Processes data as it arrives.
* `microbatch-topology`: Processes data in batches.
[source, clojure]
----
;; Stream Topology Example
(defmodule StreamExample [setup topologies]
(declare-depot setup *depot :random)
(let [s (stream-topology topologies "counts")]
(declare-pstate s $$counts {String Long})
(<<sources s
(source> *depot :> *data)
(|hash *data)
(+compound $$counts {*data (aggs/+count)}))))
;; Microbatch Topology Example
(defmodule MicrobatchExample [setup topologies]
(declare-depot setup *depot :random)
(let [mb (microbatch-topology topologies "counts")]
(declare-pstate mb $$counts {String Long})
(<<sources mb
(source> *depot :> %microbatch)
(%microbatch :> *data) ; Explode the batch into individual records
(|hash *data)
(+compound $$counts {*data (aggs/+count)}))))
----
`<<sources` defines the dataflow logic for processing depot data. In microbatching, `source>` emits a batch object, which you can then iterate over.
== Declaring PStates
xref:pstates.adoc[PStates] are partitioned, durable, and replicated indexes declared with `declare-pstate`. They are defined by a schema, which can include arbitrarily large nested data structures via xref:pstates.adoc#_subindexing["subindexing"].
[source, clojure]
----
(defmodule PStateExamples [setup topologies]
(let [s (stream-topology topologies "s")]
(declare-pstate s $$p1 {String Long})
; Subindexed set
(declare-pstate s $$p2 {Long (set-schema String {:subindex? true})})
; Top-level value PState
(declare-pstate s $$p3 Long)
; Nested subindexing with size tracking disabled
(declare-pstate s $$p4 {Long (map-schema Long
(set-schema Long {:subindex-options {:track-size? false}})
{:subindex? true})})
))
----
PState options include:
* `:global? true`: Creates a single-partition PState.
* `:initial-value <val>`: Sets the initial value for each partition.
* `:private? true`: Restricts PState access to within the module.
* `:key-partitioner <fn>`: Customizes routing for foreign PState queries.
=== PState Migrations
PStates can be migrated to new schemas during a xref:operating-rama.adoc#_updating_modules[module update]. Migrations are defined with the `migrated` function and are applied on-read. See the xref:pstates.adoc#_migrations[full documentation] for details.
[source, clojure]
----
;; Change a value's schema from Long to String
(declare-pstate $$p {Long (migrated String "v1-to-v2" str)})
;; Update a fixed-keys map
(declare-pstate
$$p
{Long (migrated (fixed-keys-schema {:b String, :c Long})
"update-keys"
(fn [m] (-> m (dissoc :a) (assoc :c 10) (update :b str)))
[(fixed-key-additions #{:c})
(fixed-key-removals #{:a})])})
----
== Depot Subscriptions and Dataflow
The `<<sources` macro defines ETL logic using the `source>` function to subscribe to depots.
Subscription options for `source>`:
* `:start-from`: Determines where to start processing on a depot for the first time.
** `:end` (default), `:beginning`
** `(offset-ago <amt> <unit>)` (e.g., `(offset-ago 10 :records)`)
** `(offset-after-timestamp-millis <ts>)`
* `:retry-mode`: (Stream topologies only) Defines fault-tolerance behavior.
** `:individual` (default), `:all-after`, `:none`. See xref:stream.adoc#_fault_tolerance_and_retry_modes[docs] for semantics.
[source, clojure]
----
(<<sources s
(source> *depot {:start-from :beginning, :retry-mode :all-after} :> *data)
;; ... dataflow logic ...
)
----
Rama's dataflow API is based on "call and emit", where operations can emit zero, one, or many values to downstream code, including asynchronously to other partitions. This is detailed on the xref:clj-dataflow-lang.adoc[next page].
== Declaring Query Topologies
A xref:query.adoc[query topology] is an on-demand, realtime, distributed computation over PStates.
[source, clojure]
----
(defmodule MyQueries [setup topologies]
(<<query-topology topologies "multiply" [*arg :> *res]
(* 10 (inc *arg) :> *res)
(|origin))) ; Partition back to the query's origin
----
A query definition includes a name, parameters, and dataflow code. It must end with the `|origin` partitioner. Query topologies can be invoked from other topologies using `invoke-query`.
== Foreign Module Clients
"Foreign" clients interact with a Rama cluster from the outside. First, get a cluster manager.
[source, clojure]
----
;; For a real cluster
(def manager (open-cluster-manager {"conductor.host" "1.2.3.4"}))
;; In tests, the InProcessCluster object is also a manager
(def ipc (create-ipc))
----
Then, fetch clients for depots, PStates, or query topologies.
[source, clojure]
----
(def depot (foreign-depot manager "com.mycompany/MyModule" "*depot"))
(def pstate (foreign-pstate manager "com.mycompany/MyModule" "$$p"))
(def query (foreign-query manager "com.mycompany/MyModule" "my-query"))
----
=== Foreign Depot Appends
Append data to a depot with `foreign-append!`.
[source, clojure]
----
(foreign-append! depot "some data") ; Uses default :ack level
(foreign-append! depot "other data" :append-ack)
----
*Ack Levels*:
* `nil`: Fire-and-forget. No guarantees.
* `:append-ack`: Blocks until data is persisted and replicated.
* `:ack` (default): Blocks until colocated stream topologies have also processed the record.
An async version, `foreign-append-async!`, returns a `CompletableFuture`.
=== Foreign Depot Queries
Read ranges of data directly from a depot partition.
* `(foreign-object-info depot)`: Returns info like `:num-partitions`.
* `(foreign-depot-partition-info depot <partition-idx>)`: Returns `:start-offset` and `:end-offset`.
* `(foreign-depot-read depot <partition-idx> <start> <end>)`: Reads records.
[source, clojure]
----
(foreign-depot-read depot 3 100 105)
;; => ["record100" "record101" "record102" "record103" "record104"]
----
Async versions `foreign-depot-partition-info-async` and `foreign-depot-read-async` are available.
=== Foreign PState Queries
PState queries use link:https://github.com/redplanetlabs/specter[Specter]-style paths via the `com.rpl.rama.path` namespace.
* `foreign-select`: Returns a sequence of results.
* `foreign-select-one`: Returns a single result.
[source, clojure]
----
(foreign-select [:a :b ALL even?] pstate)
(foreign-select-one [:k (nthpath 3)] pstate {:pkey "explicit-key"})
----
Use the `:pkey` option to specify an explicit partitioning key, overriding the default of using the first element in the path. Async versions `foreign-select-async` and `foreign-select-one-async` are available.
==== Range Queries
Rama provides special path navigators for efficient range queries on sorted structures (including all durable PState structures).
* `sorted-map-range`, `sorted-map-range-from`, `sorted-map-range-to`
* `sorted-set-range`, `sorted-set-range-from`, `sorted-set-range-to`
[source, clojure]
----
; Get a submap from key :a to :b
(foreign-select-one (sorted-map-range :a :b) pstate)
; Get up to 10 elements starting from key :k
(foreign-select-one (sorted-map-range-from :k 10) pstate)
----
==== Reactive Queries
Reactive queries provide fine-grained updates to clients. Use `foreign-proxy` to get a `ProxyState` object that represents the result of a query and is updated automatically.
[source, clojure]
----
(def proxy-val
(foreign-proxy [:a :b] pstate
{:callback-fn (fn [newval diff oldval]
(println "Value changed:" oldval "->" newval))}))
;; Get the current cached value without a remote call
(deref proxy-val)
----
The `ProxyState` receives minimal diffs from the server, which can be inspected in the `:callback-fn`. See xref:pstates.adoc#_reactive_queries[this section] for details. An async version, `foreign-proxy-async`, is also available.
=== Foreign Query Topology Invokes
Invoke a query topology like a regular function using `foreign-invoke-query`.
[source, clojure]
----
(def result (foreign-invoke-query my-query "arg1" :arg2 3))
----
The non-blocking `foreign-invoke-query-async` returns a `CompletableFuture`.
== Summary
This page covered how to define modules and their components (depots, PStates, topologies) and how to interact with them from external clients. The xref:clj-dataflow-lang.adoc[next page] provides a detailed guide to Rama's dataflow API.= Testing
Test Rama modules with link:https://redplanetlabs.com/javadoc/com/rpl/rama/test/InProcessCluster.html[InProcessCluster]. The link:https://redplanetlabs.com/clojuredoc/com.rpl.rama.test.html[com.rpl.rama.test] namespace provides a Clojure API for this and other testing utilities. For complete details and examples, see the xref:testing.adoc[main testing documentation], the link:https://blog.redplanetlabs.com/2023/10/11/introducing-ramas-clojure-api/[intro blog post], and the link:https://github.com/redplanetlabs/rama-demo-gallery[rama-demo-gallery].
To unit test a `deframafn` or `deframaop` that operates on a PState, use the `link:https://redplanetlabs.com/clojuredoc/com.rpl.rama.test.html#var-create-test-pstate[create-test-pstate]` function.
.Example
[source, clojure]
----
(use 'com.rpl.rama)
(use 'com.rpl.rama.path)
(require '[com.rpl.rama.test :as rtest])
(deframafn foo-op [$$p]
(local-transform> [:a "b" (term inc)] $$p)
(:>))
(with-open [tp (rtest/create-test-pstate
{clojure.lang.Keyword (map-schema String
Long
{:subindex? true})})]
(rtest/test-pstate-transform [:a "b" (termval 10)] tp)
(println "Initial:" (rtest/test-pstate-select-one [:a "b"] tp))
(foo-op tp)
(println "After one call:" (rtest/test-pstate-select-one [:a "b"] tp))
(foo-op tp)
(println "After two calls:" (rtest/test-pstate-select-one [:a "b"] tp)))
----
.Output
[source, text]
----
Initial: 10
After one call: 11
After two calls: 12
----=========We will now paste in some clojure code examples.=========(defproject com.rpl/rama-demo-gallery "1.0.0-SNAPSHOT"
:source-paths ["src/main/clj"]
:test-paths ["src/test/clj"]
:dependencies [[com.rpl/rama-helpers "0.10.0"]
[org.apache.logging.log4j/log4j-slf4j18-impl "2.16.0"]
[org.asynchttpclient/async-http-client "2.12.3"]]
:repositories [["releases" {:id "maven-releases"
:url "https://nexus.redplanetlabs.com/repository/maven-public-releases"}]]
:profiles {:dev {:resource-paths ["src/test/resources/"]}
:provided {:dependencies [[com.rpl/rama "1.0.0"]]}}
)
(ns rama.gallery.profile-module-test
(:use [com.rpl rama]
[com.rpl.rama path])
(:require
[clojure.test :refer [deftest is testing]]
[com.rpl.rama.test :as rtest]
[rama.gallery.profile-module :as pm]))
;; This function implements username registration, throwing an exception if the username is already registered.
;; This uses the ack return of the "profiles" topology to know if the registration request succeeded or not.
(defn register! [registration-depot username->registration username pwd-hash]
(let [{user-id "profiles"} (foreign-append! registration-depot
(pm/->Registration (str (java.util.UUID/randomUUID))
username
pwd-hash))]
(if (some? user-id)
user-id
(throw (ex-info "Username already registered" {})))))
(deftest profile-module-test
;; create-ipc creates an InProcessCluster which simulates a full Rama cluster in-process and is an ideal environment for
;; experimentation and unit-testing.
(with-open [ipc (rtest/create-ipc)]
(rtest/launch-module! ipc pm/ProfileModule {:tasks 4 :threads 2})
(let [module-name (get-module-name pm/ProfileModule)
;; Client usage of IPC is identical to using a real cluster. Depot and PState clients are fetched by
;; referencing the module name along with the variable used to identify the depot/PState within the module.
registration-depot (foreign-depot ipc module-name "*registration-depot")
profile-edits-depot (foreign-depot ipc module-name "*profile-edits-depot")
username->registration (foreign-pstate ipc module-name "$$username->registration")
profiles (foreign-pstate ipc module-name "$$profiles")
alice-id (register! registration-depot username->registration "alice" "hash1")
bob-id (register! registration-depot username->registration "bob" "hash2")]
;; verify registering alice again fails
(is (thrown? Exception (register! registration-depot username->registration "alice", "hash3")))
;; verify that profiles are initialized correctly
(is (= "alice" (foreign-select-one (keypath alice-id :username) profiles)))
(is (= "bob" (foreign-select-one (keypath bob-id :username) profiles)))
;; Do many profile edits at once and verify they all go through.
(foreign-append! profile-edits-depot
(pm/->ProfileEdits alice-id
[(pm/display-name-edit "Alice Smith")
(pm/height-inches-edit 65)
(pm/pwd-hash-edit "hash4")]))
(is (= {:username "alice"
:display-name "Alice Smith"
:height-inches 65
:pwd-hash "hash4"}
(foreign-select-one (keypath alice-id) profiles)))
;; Verify that profile editing only replaces specified fields
(foreign-append! profile-edits-depot
(pm/->ProfileEdits alice-id
[(pm/display-name-edit "Alicia Smith")]))
(is (= {:username "alice"
:display-name "Alicia Smith"
:height-inches 65
:pwd-hash "hash4"}
(foreign-select-one (keypath alice-id) profiles)))
;; Do a single profile edit on a different user.
(foreign-append! profile-edits-depot
(pm/->ProfileEdits bob-id
[(pm/display-name-edit "Bobby")]))
(is (= {:username "bob"
:display-name "Bobby"
:pwd-hash "hash2"}
(foreign-select-one (keypath bob-id) profiles)))
)))
(ns rama.gallery.time-series-module-test
(:use [com.rpl rama]
[com.rpl.rama path])
(:require
[clojure.test :refer [deftest is testing]]
[com.rpl.rama.test :as rtest]
[rama.gallery.time-series-module :as tsm]))
;; gets a random timestamp in the specified minute bucket
(defn minute [bucket]
(+ (* bucket 60 1000) (rand-int 60000)))
(deftest time-series-module-test
;; create-ipc creates an InProcessCluster which simulates a full Rama cluster in-process and is an ideal environment for
;; experimentation and unit-testing.
(with-open [ipc (rtest/create-ipc)]
(rtest/launch-module! ipc tsm/TimeSeriesModule {:tasks 4 :threads 2})
(let [module-name (get-module-name tsm/TimeSeriesModule)
;; Client usage of IPC is identical to using a real cluster. Depot and PState clients are fetched by
;; referencing the module name along with the variable used to identify the depot/PState within the module.
render-latency-depot (foreign-depot ipc module-name "*render-latency-depot")
window-stats (foreign-pstate ipc module-name "$$window-stats")
get-stats-for-minute-range (foreign-query ipc module-name "get-stats-for-minute-range")
expected-stats3 (tsm/->WindowStats 2 30 20 10 20)
expected-stats10 (tsm/->WindowStats 3 66 33 15 33)]
;; add some test data across many time buckets
(foreign-append! render-latency-depot (tsm/->RenderLatency "foo.com" 10 (minute 3)))
(foreign-append! render-latency-depot (tsm/->RenderLatency "foo.com" 20 (minute 3)))
(foreign-append! render-latency-depot (tsm/->RenderLatency "foo.com" 15 (minute 10)))
(foreign-append! render-latency-depot (tsm/->RenderLatency "foo.com" 18 (minute 10)))
(foreign-append! render-latency-depot (tsm/->RenderLatency "foo.com" 33 (minute 10)))
(foreign-append! render-latency-depot (tsm/->RenderLatency "foo.com" 20 (minute 65)))
(foreign-append! render-latency-depot (tsm/->RenderLatency "foo.com" 30 (minute 65)))
(foreign-append! render-latency-depot (tsm/->RenderLatency "foo.com" 100 (minute (* 60 24))))
(foreign-append! render-latency-depot (tsm/->RenderLatency "foo.com" 100 (minute (+ (* 60 24) 8))))
(foreign-append! render-latency-depot (tsm/->RenderLatency "foo.com" 50 (minute (+ (* 60 48) 122))))
;; Microbatching runs asynchronously to depot appends, so this code waits for microbatching to finish
;; processing all the depot appends so we can see those appends reflected in PState queries.
(rtest/wait-for-microbatch-processed-count ipc module-name "timeseries" 10)
;; Verify that a single bucket is correct.
(is (= expected-stats3
(foreign-select-one (keypath "foo.com" :m 3) window-stats)))
;; This is an example of doing a range query. This fetches a submap from the PState from buckets 3 (inclusive) to
;; 11 (exclusive) for :m granularity. That inner map is subindexed, and subindexed structures are sorted. Range
;; queries on subindexed structures are very efficient.
(is (= (sorted-map 3 expected-stats3
10 expected-stats10)
(foreign-select-one [(keypath "foo.com" :m)
(sorted-map-range 3 11)]
window-stats)))
;; This is an example of doing an aggregation within a PState query. This fetches the number of buckets between
;; minute 0 and minute 60*72. Critically, this entire path executes server-side. So the only thing transferred
;; back from the server is the count.
(is (= 6
(foreign-select-one [(keypath "foo.com" :m)
(sorted-map-range 0 (* 60 72))
(view count)]
window-stats)))
;; This is an example of invoking a query topology, which is just like invoking any regular function. You pass
;; it some arguments, and it returns a result back. The difference is it runs as a distributed computation on
;; a Rama cluster. This query topology efficiently fetches the aggregate WindowStats for an arbitrary range of
;; minute buckets, utilizing coarser granularities if possible to minimize the amount of buckets that need to
;; be fetched to perform the computation.
(is (= (tsm/->WindowStats 10 396 50 10 100)
(foreign-invoke-query get-stats-for-minute-range "foo.com" 0 (* 60 72))))
;; Verify that buckets at coarser granularity are aggregated correctly.
(is (= (tsm/->WindowStats 7 146 30 10 33)
(foreign-select-one (keypath "foo.com" :d 0) window-stats)))
)))
(deftest query-granularities-test
(let [res (tsm/query-granularities :m 63 10033)]
(is (= 5 (count res)))
(is (= #{[:m 63 120] [:h 2 24] [:d 1 6] [:h 144 167] [:m 10020 10033]}
(set res))))
(let [res (tsm/query-granularities :d 3 122)]
(is (= 3 (count res)))
(is (= #{[:d 3 30] [:td 1 4] [:d 120 122]}
(set res)))))
(ns rama.gallery.top-users-module-test
(:use [com.rpl rama]
[com.rpl.rama path])
(:require
[clojure.test :refer [deftest is testing]]
[com.rpl.rama.test :as rtest]
[rama.gallery.top-users-module :as tum]))
(deftest top-users-module-test
;; A redef of this constant is used to simplify testing this module by reducing the amount of users to store in the
;; "top users" PState.
(with-redefs [tum/TOP-AMOUNT 3]
;; create-ipc creates an InProcessCluster which simulates a full Rama cluster in-process and is an ideal environment for
;; experimentation and unit-testing.
(with-open [ipc (rtest/create-ipc)]
(rtest/launch-module! ipc tum/TopUsersModule {:tasks 4 :threads 2})
(let [module-name (get-module-name tum/TopUsersModule)
;; Client usage of IPC is identical to using a real cluster. Depot and PState clients are fetched by
;; referencing the module name along with the variable used to identify the depot/PState within the module.
purchase-depot (foreign-depot ipc module-name "*purchase-depot")
top-spending-users (foreign-pstate ipc module-name "$$top-spending-users")
;; declare some constants to make the test code easier to read
alice-id 0
bob-id 1
charlie-id 2
david-id 3
emily-id 4]
;; add some data to the depot
(foreign-append! purchase-depot (tum/->Purchase alice-id 300))
(foreign-append! purchase-depot (tum/->Purchase bob-id 200))
(foreign-append! purchase-depot (tum/->Purchase charlie-id 100))
(foreign-append! purchase-depot (tum/->Purchase david-id 100))
(foreign-append! purchase-depot (tum/->Purchase emily-id 400))
;; Microbatching runs asynchronously to depot appends, so this code waits for microbatching to finish
;; processing all the depot appends so we can see those appends reflected in PState queries.
(rtest/wait-for-microbatch-processed-count ipc module-name "topusers" 5)
;; To fetch the entire top users list, the navigator STAY is used.
(is (= [[emily-id 400]
[alice-id 300]
[bob-id 200]]
(foreign-select-one STAY top-spending-users)))
;; Add another record which will cause david-id to supplant bob-id on the top users list.
(foreign-append! purchase-depot (tum/->Purchase david-id 250))
(rtest/wait-for-microbatch-processed-count ipc module-name "topusers" 6)
;; Verify the new list of top users.
(is (= [[emily-id 400]
[david-id 350]
[alice-id 300]]
(foreign-select-one STAY top-spending-users)))
;; Fetch just the user IDs from the top users list by iterating over each tuple and selecting
;; just the first field. This path is executed completely server side and only the user IDs are
;; returned.
(is (= [emily-id
david-id
alice-id]
(foreign-select [ALL FIRST] top-spending-users)))
))))
(ns rama.gallery.rest-api-integration-module-test
(:use [com.rpl rama]
[com.rpl.rama path])
(:require
[clojure.test :refer [deftest is testing]]
[com.rpl.rama.test :as rtest]
[rama.gallery.rest-api-integration-module :as raim]))
(deftest rest-api-integration-module-test
;; create-ipc creates an InProcessCluster which simulates a full Rama cluster in-process and is an ideal environment for
;; experimentation and unit-testing.
(with-open [ipc (rtest/create-ipc)]
(rtest/launch-module! ipc raim/RestAPIIntegrationModule {:tasks 4 :threads 2})
(let [module-name (get-module-name raim/RestAPIIntegrationModule)
;; Client usage of IPC is identical to using a real cluster. Depot and PState clients are fetched by
;; referencing the module name along with the variable used to identify the depot/PState within the module.
get-depot (foreign-depot ipc module-name "*get-depot")
responses (foreign-pstate ipc module-name "$$responses")
url "https://official-joke-api.appspot.com/random_joke"]
;; This checks the behavior of the module by appending a few URLs and printing the responses recorded in the
;; PState. To write a real test with actual assertions, it's best to test the behavior of the module with
;; the external REST API calls mocked out, such as by using with-redefs.
(foreign-append! get-depot url)
(println "Response 1:" (foreign-select-one (keypath url) responses))
(foreign-append! get-depot url)
(println "Response 2:" (foreign-select-one (keypath url) responses))
)))
(ns rama.gallery.migrations-music-catalog-modules-test
(:use [com.rpl rama]
[com.rpl.rama path])
(:require
[clojure.test :refer [deftest is]]
[com.rpl.rama.test :as rtest]
[rama.gallery.migrations-music-catalog-modules :as mmcm]))
(deftest migrations-music-catalog-modules-test
;; InProcessCluster simulates a full Rama cluster in-process and is an ideal
;; environment for experimentation and unit-testing.
(with-open [ipc (rtest/create-ipc)]
;; First we deploy our initial module instance.
(rtest/launch-module! ipc mmcm/ModuleInstanceA {:tasks 4 :threads 2})
(let [;; We use our module's name to fetch handles on its depots/PStates.
module-name (get-module-name mmcm/ModuleInstanceA)
;; Client usage of IPC is identical to using a real cluster. Depot,
;; PState, and query topology clients are fetched by referencing the
;; module name along with the variable used to identify the
;; depot/PState/query within the module.
albums-depot (foreign-depot ipc module-name "*albums-depot")
albums (foreign-pstate ipc module-name "$$albums")]
;; Now we construct an Album and append it to the depot.
(foreign-append! albums-depot
(mmcm/->Album "Post Malone"
"F-1 Trillion"
["Have The Heart ft. Dolly Parton"]))
;; We wait for our microbatch to process the album.
(rtest/wait-for-microbatch-processed-count ipc module-name "albums" 1)
;; Once processed, we expect that our album will be indexed in the PState.
(is (= 1
(foreign-select-one [(keypath "Post Malone") (view count)]
albums)))
(is (= "F-1 Trillion"
(foreign-select-one [(keypath "Post Malone" "F-1 Trillion" :name)]
albums)))
(is (= "Have The Heart ft. Dolly Parton"
(foreign-select-one [(keypath "Post Malone" "F-1 Trillion" :songs)
FIRST]
albums)))
;; Now we deploy the next iteration of our module, which has a migration,
;; via module update.
(rtest/update-module! ipc mmcm/ModuleInstanceB)
;; We expect that albums previously appended will now have the new Song
;; structure.
(is (= "Have The Heart"
(foreign-select-one [(keypath "Post Malone" "F-1 Trillion" :songs)
FIRST (keypath :name)]
albums)))
(is (= ["Dolly Parton"]
(foreign-select-one [(keypath "Post Malone" "F-1 Trillion" :songs)
FIRST (keypath :featured-artists)]
albums)))
;; Albums newly appended will go through the new topology code, and have
;; their Songs parsed before indexing.
(foreign-append! albums-depot
(mmcm/->Album "Frank Ocean"
"Channel Orange"
["White feat. John Mayer"]))
;; We wait for our microbatch to process the album.
(rtest/wait-for-microbatch-processed-count ipc module-name "albums" 2)
(is
(= "White"
(foreign-select-one [(keypath "Frank Ocean" "Channel Orange" :songs)
FIRST (keypath :name)]
albums)))
(is
(= ["John Mayer"]
(foreign-select-one [(keypath "Frank Ocean" "Channel Orange" :songs)
FIRST (keypath :featured-artists)]
albums))))))
(ns rama.gallery.bank-transfer-module-test
(:use [com.rpl rama]
[com.rpl.rama path])
(:require
[clojure.test :refer [deftest is testing]]
[com.rpl.rama.test :as rtest]
[rama.gallery.bank-transfer-module :as btm]))
(deftest bank-transfer-module-test
;; create-ipc creates an InProcessCluster which simulates a full Rama cluster in-process and is an ideal environment for
;; experimentation and unit-testing.
(with-open [ipc (rtest/create-ipc)]
(rtest/launch-module! ipc btm/BankTransferModule {:tasks 4 :threads 2})
(let [module-name (get-module-name btm/BankTransferModule)
;; Client usage of IPC is identical to using a real cluster. Depot and PState clients are fetched by
;; referencing the module name along with the variable used to identify the depot/PState within the module.
transfer-depot (foreign-depot ipc module-name "*transfer-depot")
deposit-depot (foreign-depot ipc module-name "*deposit-depot")
funds (foreign-pstate ipc module-name "$$funds")
outgoing-transfers (foreign-pstate ipc module-name "$$outgoing-transfers")
incoming-transfers (foreign-pstate ipc module-name "$$incoming-transfers")
;; Declare some constants to make the test code easier to read
alice-id 0
bob-id 1
charlie-id 2]
(foreign-append! deposit-depot (btm/->Deposit alice-id 200))
(foreign-append! deposit-depot (btm/->Deposit bob-id 100))
(foreign-append! deposit-depot (btm/->Deposit charlie-id 100))
;; Microbatching runs asynchronously to depot appends, so this code waits for microbatching to finish
;; processing all the depot appends so we can see those appends reflected in PState queries.
(rtest/wait-for-microbatch-processed-count ipc module-name "banking" 3)
;; This transfer will succeed.
(foreign-append! transfer-depot (btm/->Transfer "alice->bob1" alice-id bob-id 50))
;; This transfer will fail because alice has only 150 funds after the first transfer.
(foreign-append! transfer-depot (btm/->Transfer "alice->charlie1" alice-id charlie-id 160))
;; This transfer will succeed.
(foreign-append! transfer-depot (btm/->Transfer "alice->charlie2" alice-id charlie-id 25))
;; This transfer will succeed.
(foreign-append! transfer-depot (btm/->Transfer "charlie->bob1" charlie-id bob-id 10))
(rtest/wait-for-microbatch-processed-count ipc module-name "banking" 7)
;; Assert on the final funds for each user
(is (= 125 (foreign-select-one (keypath alice-id) funds)))
(is (= 160 (foreign-select-one (keypath bob-id) funds)))
(is (= 115 (foreign-select-one (keypath charlie-id) funds)))
;; Verify the outgoing transfers of alice
(let [transfers (foreign-select [(keypath alice-id) ALL] outgoing-transfers)]
(is (= 3 (count transfers)))
(is (= #{["alice->bob1" {:to-user-id bob-id :amt 50 :success? true}]
["alice->charlie1" {:to-user-id charlie-id :amt 160 :success? false}]
["alice->charlie2" {:to-user-id charlie-id :amt 25 :success? true}]}
(set transfers))))
;; Verify the outgoing transfers of charlie
(let [transfers (foreign-select [(keypath charlie-id) ALL] outgoing-transfers)]
(is (= 1 (count transfers)))
(is (= [["charlie->bob1" {:to-user-id bob-id :amt 10 :success? true}]]
transfers)))
;; Verify the incoming transfers of bob
(let [transfers (foreign-select [(keypath bob-id) ALL] incoming-transfers)]
(is (= 2 (count transfers)))
(is (= #{["alice->bob1" {:from-user-id alice-id :amt 50 :success? true}]
["charlie->bob1" {:from-user-id charlie-id :amt 10 :success? true}]}
(set transfers))))
;; Verify the incoming transfers of charlie
(let [transfers (foreign-select [(keypath charlie-id) ALL] incoming-transfers)]
(is (= 2 (count transfers)))
(is (= #{["alice->charlie1" {:from-user-id alice-id :amt 160 :success? false}]
["alice->charlie2" {:from-user-id alice-id :amt 25 :success? true}]}
(set transfers))))
)))
(ns rama.gallery.time-series-module
(:use [com.rpl.rama]
[com.rpl.rama.path])
(:require [com.rpl.rama.aggs :as aggs]
[com.rpl.rama.ops :as ops]))
;; This module demonstrates time-series analytics for the example use case of the latency of rendering different URLs
;; on a website. The module receives a stream of latencies for individual renders and accumulates min/max/average
;; stats for windows at minute, hour, daily, and monthly granularities. Accumulating at multiple granularities speeds
;; up queries over larger ranges.
;;
;; See the test file time_series_module_test.clj for how a client interacts with this module to perform various kinds
;; of range queries, including aggregating a range of data server-side.
;;
;; The stats computed for each window are min, max, average, and latest. To capture full distributions, you can add
;; a data structure such as T-Digest to the WindowStats object.
;; As with all the demos, data is represented using plain Clojure records. You can represent
;; data however you want, and we generally recommend using a library with compact serialization,
;; strong schemas, and support for evolving types (like Thrift or Protocol Buffers). We use plain
;; Clojure records in these demos to keep them as simple as possible by not having additional
;; dependencies. Rama uses Nippy for serialization, and you can extend that directly or define a custom
;; serialization through Rama to support your own representations. In all cases you always work with
;; first-class objects all the time when using Rama, whether appending to depots, processing in ETLs,
;; or querying from PStates.
(defrecord RenderLatency
[url render-millis timestamp-millis])
(defrecord WindowStats
[cardinality total last-millis min-latency-millis max-latency-millis])
;; This defines an aggregator that combines two WindowStats objects into one aggregated WindowStats object.
;; It is used both in the ETL to update the time-series PState as well as the query topology fetching the
;; WindowStats for a specific range.
(def +combine-measurements
(combiner
(fn [window-stats1 window-stats2]
(->WindowStats
(+ (:cardinality window-stats1) (:cardinality window-stats2))
(+ (:total window-stats1) (:total window-stats2))
(or (:last-millis window-stats2) (:last-millis window-stats1))
(cond
(nil? (:min-latency-millis window-stats1))
(:min-latency-millis window-stats2)
(nil? (:min-latency-millis window-stats2))
(:min-latency-millis window-stats1)
:else
(min (:min-latency-millis window-stats1)
(:min-latency-millis window-stats2)))
(cond
(nil? (:max-latency-millis window-stats1))
(:max-latency-millis window-stats2)
(nil? (:max-latency-millis window-stats2))
(:max-latency-millis window-stats1)
:else
(max (:max-latency-millis window-stats1)
(:max-latency-millis window-stats2)))))
:init-fn (fn [] (->WindowStats 0 0 nil nil nil))))
(defn- single-window-stat [render-millis]
(->WindowStats 1 render-millis render-millis render-millis render-millis))
;; This is a custom operation used in the ETL to emit the time bucket to index each RenderLatency record
;; for each minute, day, hour, and thirty-day granularity.
(deframaop emit-index-granularities [*timestamp-millis]
(long (/ *timestamp-millis (* 1000 60)) :> *minute-bucket)
(long (/ *minute-bucket 60) :> *hour-bucket)
(long (/ *hour-bucket 24) :> *day-bucket)
(long (/ *day-bucket 30) :> *thirty-day-bucket)
(:> :m *minute-bucket)
(:> :d *day-bucket)
(:> :h *hour-bucket)
(:> :td *thirty-day-bucket))
;; These constants are used in the helper function below "query-granularities".
(def NEXT-GRANULARITY
{:m :h
:h :d
:d :td})
(def NEXT-GRANULARITY-DIVISOR
{:m 60
:h 24
:d 30})
;; This helper function is used in the query topology below to compute the minimal number of buckets that need to
;; be queried across all granularities to satisfy a query over an arbitrary range of minute buckets. For example, if querying
;; from minute 58 of hour 6 through minute 3 of hour 20 (both in the same day), then the queries that need to be done are:
;; - :m granularity, minute 58 of hour 6 through minute 0 of hour 7
;; - :h granularity, hour 7 through hour 20
;; - :m granularity, minute 0 of hour 20 through minute 3 of hour 20
(defn query-granularities [granularity start-bucket end-bucket]
(let [next-granularity (get NEXT-GRANULARITY granularity)]
(if (nil? next-granularity)
[[granularity start-bucket end-bucket]]
(let [divisor (get NEXT-GRANULARITY-DIVISOR granularity)
next-start-bucket (cond->
(long (/ start-bucket divisor))
(not= 0 (mod start-bucket divisor))
inc)
next-end-bucket (long (/ end-bucket divisor))
next-aligned-start-bucket (* next-start-bucket divisor)
next-aligned-end-bucket (* next-end-bucket divisor)
more (if (> next-end-bucket next-start-bucket)
(query-granularities next-granularity
next-start-bucket
next-end-bucket))]
(concat more
(if (>= next-aligned-start-bucket next-aligned-end-bucket)
[[granularity start-bucket end-bucket]]
(cond-> []
(> next-aligned-start-bucket start-bucket)
(conj [granularity start-bucket next-aligned-start-bucket])
(> end-bucket next-aligned-end-bucket)
(conj [granularity next-aligned-end-bucket end-bucket])
)))))))
;; This defines the module, whose body is a regular Clojure function implementation. All depots, ETLs,
;; PStates, and query topologies are defined via this entry point.
(defmodule TimeSeriesModule
[setup topologies]
;; This depot takes in RenderLatency objects. The second argument is a "depot partitioner" that controls
;; how appended data is partitioned across the depot, affecting on which task each piece of data begins
;; processing in ETLs.
(declare-depot setup *render-latency-depot (hash-by :url))
;; Defines the ETL as a microbatch topology. Microbatch topologies have higher throughput than stream topologies
;; with the tradeoff of update latency being in the hundreds of milliseconds range rather than single-digit milliseconds
;; range. They are generally preferable for analytics-oriented use cases like this one where the extra latency
;; doesn't matter.
(let [mb (microbatch-topology topologies "timeseries")]
;; PStates are durable and replicated datastores and are represented as an arbitrary combination of data structures. Reads
;; and writes to PStates go to disk and are not purely in-memory operations.
;; This PState stores bucketed stats for all granularities for each URL. Minute/hour/day/month granularities use the
;; keywords :m, :h, :d, and :td as keys in that position of the data structure. The final map in the data structure
;; is subindexed because it can contain millions of elements. Subindexing stores each value of those maps individually
;; and enables them to be written and queried efficiently even when they're huge. Subindexed maps are always sorted, and
;; it's also easy to do range queries on them. This is demonstrated in the query topology below.
;; This PState is structured so that all granularities for a given URL are stored on the same partition. This allows queries
;; for large time ranges that need to fetch data from multiple granularities to be efficient by fetching all data from one
;; partition (as opposed to needing to fetch different granularities from different partitions). The query topology below
;; demonstrates this.
;; Note that the coarser time granularities take up very little additional space because they have so many fewer buckets.
;; The hour granularity has 60x fewer buckets than the minute granularity, and the daily granularity has 24x fewer buckets
;; than the hour granularity. Space usage for time-series indexes like this is dominated by the finest granularity.
(declare-pstate
mb
$$window-stats
{String ; url
{clojure.lang.Keyword ; granularity
(map-schema Long ; bucket
WindowStats
{:subindex? true})}})
;; <<sources defines the ETL logic as Rama dataflow code. Rama's dataflow API works differently than Clojure, but it has
;; the same expressiveness as any general purpose language while also being able to seamlessly distribute computation.
(<<sources mb
;; This subscribes the ETL to *render-latency-depot, binding the batch of all data in this microbatch to %microbatch.
;; %microbatch is an anonymous operation which when invoked emits all data for the microbatch across all partitions.
(source> *render-latency-depot :> %microbatch)
;; Because of the depot partitioner on *render-latency-depot, computation for each piece of data
;; starts on the same task where stats are stored for that URL in the $$window-stats PState.
(%microbatch :> {:keys [*url *render-millis *timestamp-millis]})
;; The code for updating the stats in the PState is defined with a combiner aggregator, so it needs as input a WindowStats
;; object with just *render-millis in it. This helper function constructs that object and binds it to the variable
;; *single-stat.
(single-window-stat *render-millis :> *single-stat)
;; This invokes the helper function above to emit the bucket to index the new dataa for each granularity. Note how this
;; operation emits two fields, *granularity and *bucket.
(emit-index-granularities *timestamp-millis :> *granularity *bucket)
;; The writes to the $$window-stats PState are done with a compound aggregator, which specifies the write in the shape
;; of the data structure being written to. At the leaf of this aggregator is the +combine-measurements aggregator, defined
;; at the beginning of this file. It takes as input whatever WindowStats object is already stored in the PState at
;; that position as well as the WindowStats object in *singleStat.
(+compound $$window-stats
{*url
{*granularity
{*bucket (+combine-measurements *single-stat)}}}))
;; This defines a query topology for getting the aggregated stats for a range of minute buckets. Rather than fetch all
;; the minute buckets between the start and end buckets, it uses higher granularity buckets if possible to minimize the
;; amount of data that needs to be fetched from the PState.
;; Unlike the ETL code above, query topologies are batched computations. Batched computations in Rama can do the same things
;; you can do with relational languages like SQL: inner joins, outer joins, subqueries, and aggregations. This particular
;; query topology is straightforward and simply aggregates all fetched WindowStats into a single returned WindowStats object.
;; Since query topologies are colocated with the PStates in their module, they are very efficient. This query topology does
;; potentially many queries to the "$$windowStats" PState and aggregates them together without any network transfer in between.
;; This query topology definition specifies it takes as input three arguments – *url, *start-bucket, and *end-bucket –
;; and will bind a variable called *stats for the return value.
(<<query-topology topologies "get-stats-for-minute-range"
[*url *start-bucket *end-bucket :> *stats]
;; First, the query topology switches to the task containing data for this URL. Query topologies are optimized when
;; there's a leading partitioner like this, performing the routing client-side instead of when on a module task. This
;; means clients of this query topology send their requests directly to the task containing the needed data.
(|hash *url)
;; This uses a helper function to emit all ranges of data for each granularity that need to be fetched.
(ops/explode (query-granularities :m *start-bucket *end-bucket)
:> [*granularity *gstart *gend])
;; This fetches each individual WindowStats object that needs to be aggregated. sorted-map-range selects a submap from
;; the subindexed map at that position, and MAP-VALS navigates to every value of that map individually. Since the function
;; call right before this emits many times, this local-select> is executed for each of those emits. Individual WindowStats
;; objects are bound to the variable *bucket-stat.
(local-select> [(keypath *url *granularity)
(sorted-map-range *gstart *gend)
MAP-VALS]
$$window-stats
:> *bucket-stat)
;; Every query topology must have an |origin call, which indicates to move the computation back to where the query started.
(|origin)
;; This aggregates all *bucket-stat objects emitted into a single object bound to the variable *stats. Note that this
;; is the variable specified at the start of the query topology to name the return value.
;; The +combine-measurements aggregator here is the same one as used in the ETL above.
(+combine-measurements *bucket-stat :> *stats))
))
(ns rama.gallery.migrations-music-catalog-modules
(:use [com.rpl.rama]
[com.rpl.rama.path])
(:require
[clojure.string :as str]))
;; This toy module demonstrate Rama's PState migration functionality.
;;
;; Two module instances are used in order to demonstrate the evolution of a
;; PState schema. In a real application codebase there would be one module
;; instance defined for a module, and its preceding historical instances would
;; live in version control.
;;
;; This module indexes musical albums as they're added to a catalog.
;;
;; See the test file migrations_music_module.clj for how a migration is
;; initiated with a module update.
;; As with all the demos, data is represented using plain Clojure records. You can represent
;; data however you want, and we generally recommend using a library with compact serialization,
;; strong schemas, and support for evolving types (like Thrift or Protocol Buffers). We use plain
;; Clojure records in these demos to keep them as simple as possible by not having additional
;; dependencies. Rama uses Nippy for serialization, and you can extend that directly or define a custom
;; serialization through Rama to support your own representations. In all cases you always work with
;; first-class objects all the time when using Rama, whether appending to depots, processing in ETLs,
;; or querying from PStates.
(defrecord Album [artist name songs])
(defrecord Song [name featured-artists])
;; The module instances represent the same module as evolved over time, so they
;; share a name.
(def module-name "MusicCatalogModule")
;; This is the first module instance.
(defmodule ModuleInstanceA
;; We explicitly set the option for a module's name; otherwise, the default is
;; to return the class name, which would make it difficult for us to define
;; two module instances at once in the same codebase.
{:module-name module-name}
[setup topologies]
;; This depot takes in Album objects. Each Album lives on a partition
;; according to its artist field.
(declare-depot setup *albums-depot (hash-by :artist))
;; Declare a topology for processing Albums as they're appended. A
;; microbatch topology provides exactly-once processing guarantees
;; at the cost of higher latency; in the context of a hypothetical
;; music catalog service, higher latency would be tolerable as albums
;; would presumably be indexed in advance of release.
(let [mb (microbatch-topology topologies "albums")]
;; Declare a PState called $$albums.
(declare-pstate
mb
$$albums
;; The top level of our $$albums schema is a map keyed by artist. All of an
;; artist's music will live on one partition.
(map-schema
String ; artist
;; The values of our top-level map are subindexed maps keyed by
;; album name. Subindexing this map allows us to store any
;; number of albums for a given artist, and to paginate them in
;; alphabetical sort order at query time.
(map-schema
String ; album name
;; The values of our inner map are albums, each with a name
;; and songs.
(fixed-keys-schema
{:name String
:songs (vector-schema String)})
{:subindex? true})))
;; Begin defining our microbatch topology.
(<<sources mb
;; Subscribe our microbatch ETL topology to the albums depot. Each time
;; a microbatch runs, the "*microbatch" var will be emitted, representing
;; all Album records appended since the last microbatch.
(source> *albums-depot :> %microbatch)
;; For each microbatch, emit each album individually, and destructure it
;; into its fields.
(%microbatch :> {:keys [*artist *name *songs]})
;; Construct our album map to be indexed.
(hash-map :name *name :songs *songs :> *album)
;; Add the album to the index.
(local-transform> [(keypath *artist *name) (termval *album)]
$$albums))))
;; Now we begin defining our second module instance. We'll start with some
;; functions that will enable our PState migration
;; Here is a naive function for parsing a song's name and possibly its featured
;; artists from a String. We'll use it in our migration function.
(defn- parse-song
[song-str]
(let [[name features] (str/split song-str #"\s*(ft|feat)\.*")
features (->> (str/split (or features "") #",")
(mapv str/trim)
(filterv (complement empty?)))]
(->Song name features)))
;; This is our migration function. If the input object is already of the new
;; schema, i.e., its songs are Songs and not Strings, it returns it unchanged;
;; otherwise it converts each String song to a proper Song.
(defn- migrate-songs
[album]
(if (some-> album
:songs
first
string?)
(update album :songs (partial mapv parse-song))
album))
;; This is the second of our two module instances. Only the differences from
;; the previous module instance are called out in comments.
(defmodule ModuleInstanceB
{:module-name module-name}
[setup topologies]
(declare-depot setup *albums-depot (hash-by :artist))
(let [mb (microbatch-topology topologies "albums")]
(declare-pstate
mb
$$albums
(map-schema
String ; artist
(map-schema
String ; album name
;; Here is where we demonstrate Rama's migration functionality. We
;; wrap the sub-schema we want to modify with the `migrated` function.
(migrated
;; The first argument is the new schema. We are changing the songs
;; field from a List<String> to a List<Song>. Note that we cannot
;; directly migrate the below listSchema because it is not indexed
;; as a whole - it is one component of the entire in-memory album.
(fixed-keys-schema
{:name String
:songs (vector-schema Song)})
;; The second argument is the migration's ID. Rama will use this to
;; determine whether or not a subsequent migration is a new one (and
;; so requires restarting).
"parse-song-data"
;; Finally we provide our migration function (defined above). This
;; function may be run on both yet-to-be-migrated and already-
;; migrated values, so it must be idempotent.
migrate-songs)
{:subindex? true})))
(<<sources mb
(source> *albums-depot :> %microbatch)
(%microbatch :> {:keys [*artist *name *songs]})
;; Our topology code is the same except that we parse Songs from our list
;; of String song, and shadow the *songs var with the output.
(mapv parse-song *songs :> *songs)
(hash-map :name *name :songs *songs :> *album)
(local-transform> [(keypath *artist *name) (termval *album)]
$$albums))))
(ns rama.gallery.rest-api-integration-module
(:use [com.rpl.rama]
[com.rpl.rama.path])
(:require [com.rpl.rama.aggs :as aggs]
[com.rpl.rama.ops :as ops]
[taoensso.nippy :as nippy])
(:import [com.rpl.rama.integration TaskGlobalObject]
[org.asynchttpclient AsyncHttpClient Dsl]
[org.asynchttpclient.netty NettyResponse]))
;; This module demonstrates integrating Rama with an external service, in this case a REST API.
;;
;; See the test file rest_api_integration_module_test.clj for examples of interacting with this module.
(defprotocol FetchTaskGlobalClient
(task-global-client [this]))
;; This defines a "task global" object, which when used with declare-object (as shown below), creates a value that
;; can be referenced on all tasks in both ETLs and query topologies. This interface specializes the object on each
;; task with lifecycle methods "prepareForTask" and "close". This interface can be used for anything from creating
;; task-specific caches to clients to external systems. The latter use case is demonstrated here by creating an HTTP
;; client and managing its lifecycle through this interface.
;; Many external client interfaces can be shared on the same thread, or if thread-safe can be shared among all
;; threads in the same worker. The documentation for this API explores how to manage resources like that, and the
;; rama-kafka project is a real-world example of doing so. Links:
;; - https://redplanetlabs.com/docs/~/integrating.html
;; - https://github.com/redplanetlabs/rama-kafka
;; Note that in Clojure you're likely better off using a native Clojure HTTP library such as http-kit. This
;; example is using AsyncHttpClient as a demonstration of integrating with any Java API. From this example you can
;; see how you'd interact with external databases, monitoring systems, or other tools as well.
(deftype AsyncHttpClientTaskGlobal
[^{:unsynchronized-mutable true
:tag AsyncHttpClient}
client]
TaskGlobalObject
(prepareForTask [this task-id task-global-context]
(set! client (Dsl/asyncHttpClient)))
(close [this]
(.close client))
FetchTaskGlobalClient
(task-global-client [this] client))
;; Rama uses Nippy for serialization, and it doesn't work properly for deftypes. So these
;; calls tell Nippy how to serialize/deserialize the task global type.
(nippy/extend-freeze AsyncHttpClientTaskGlobal ::async-http-client
[o data-output])
(nippy/extend-thaw ::async-http-client [data-input]
(AsyncHttpClientTaskGlobal. nil))
(defn http-get-future [^AsyncHttpClient client url]
(-> client (.prepareGet url) .execute .toCompletableFuture))
(defn get-body [^NettyResponse response]
(.getResponseBody response))
;; This defines the module, whose body is a regular Clojure function implementation. All depots, ETLs,
;; PStates, and query topologies are defined via this entry point.
(defmodule RestAPIIntegrationModule
[setup topologies]
;; This depot takes in URL strings. The second argument is a "depot partitioner" that controls
;; how appended data is partitioned across the depot, affecting on which task each piece of data begins
;; processing in ETLs.
(declare-depot setup *get-depot (hash-by identity))
;; This declares a task global with the given value. Since AsyncHttpClientTaskGlobal implements the TaskGlobalObject
;; interface, the value is specialized per task. Accessing the variable *http-client in topologies always accesses the
;; value local to the task where the topology event is running.
(declare-object setup *http-client (AsyncHttpClientTaskGlobal. nil))
;; Stream topologies process appended data within a few milliseconds and guarantee all data will be fully processed.
(let [s (stream-topology topologies "get-http")]
;; PStates are durable and replicated datastores and are represented as an arbitrary combination of data structures. Reads
;; and writes to PStates go to disk and are not purely in-memory operations.
;; This PState stores the latest response for each URL, a map from a URL to the body of the HTTP response.
(declare-pstate s $$responses {String String})
;; <<sources defines the ETL logic as Rama dataflow code. Rama's dataflow API works differently than Clojure, but it has
;; the same expressiveness as any general purpose language while also being able to seamlessly distribute computation.
(<<sources s
;; This subscribes the ETL to *get-depot. The :> keyword separates the inputs and outputs of the form.
;; Because of the depot partitioner on *get-depot, computation starts on the same task where responses are
;; stored for that URL in the $$responses PState.
(source> *get-depot :> *url)
;; completable-future> integrates arbitrary asynchronous work within a topology. It ties the success/failure of
;; the asynchronous task with the success/failure of the topology. So if the asynchronous work fails or times out,
;; the topology will fail as well and the depot record will be retried. completable-future> is a non-blocking operation.
(completable-future>
(http-get-future (task-global-client *http-client) *url)
:> *netty-response)
(get-body *netty-response :> *body)
;; This records the latest response in the PState.
(local-transform> [(keypath *url) (termval *body)] $$responses)
)))
(ns rama.gallery.bank-transfer-module
(:use [com.rpl.rama]
[com.rpl.rama.path])
(:require [com.rpl.rama.aggs :as aggs]
[com.rpl.rama.ops :as ops]))
;; This module demonstrates transferring funds from one account to another. The implementation guarantees:
;; - All transfers are processed exactly once.
;; - A transfer only goes through if there are sufficient funds available.
;; - No race conditions with concurrent transfers.
;;
;; See the test file bank_transfer_module_test.clj for how a client interacts with this module to initiate
;; transfers and query for funds and transfer information.
;; As with all the demos, data is represented using plain Clojure records. You can represent
;; data however you want, and we generally recommend using a library with compact serialization,
;; strong schemas, and support for evolving types (like Thrift or Protocol Buffers). We use plain
;; Clojure records in these demos to keep them as simple as possible by not having additional
;; dependencies. Rama uses Nippy for serialization, and you can extend that directly or define a custom
;; serialization through Rama to support your own representations. In all cases you always work with
;; first-class objects all the time when using Rama, whether appending to depots, processing in ETLs,
;; or querying from PStates.
(defrecord Transfer [transfer-id from-user-id to-user-id amt])
(defrecord Deposit [user-id amt])
;; This defines the module, whose body is a regular Clojure function implementation. All depots, ETLs,
;; PStates, and query topologies are defined via this entry point.
(defmodule BankTransferModule
[setup topologies]
;; This depot takes in Transfer objects. The second argument is a "depot partitioner" that controls
;; how appended data is partitioned across the depot, affecting on which task each piece of data begins
;; processing in ETLs.
(declare-depot setup *transfer-depot (hash-by :from-user-id))
;; This depot takes in Deposit objects.
(declare-depot setup *deposit-depot (hash-by :user-id))
;; Defines the ETL as a microbatch topology. Microbatch topologies have exactly-once processing semantics, meaning
;; that even if there are failures and the work needs to be retried, the updates into PStates will be as if there
;; were no failures and every depot record was processed exactly once. The exactly-once semantics are critical
;; for this use case.
;; Microbatch topologies also have higher throughput than stream topologies with the tradeoff of update latency
;; being in the hundreds of milliseconds range rather than single-digit milliseconds range. This is a suitable latency
;; for this task.
(let [mb (microbatch-topology topologies "banking")]
;; PStates are durable and replicated datastores and are represented as an arbitrary combination of data structures. Reads
;; and writes to PStates go to disk and are not purely in-memory operations.
;; This PState stores the total funds for each user, a map from user ID to funds.
(declare-pstate mb $$funds {Long Long})
;; These two PStates store outgoing and incoming transfer information. The inner map in the data structure
;; is subindexed because it can contain an unbounded number of elements. Subindexing stores each value of
;; those maps individually and enables them to be written and queried efficiently even when they're huge.
;; Subindexed maps are always sorted, and it's also easy to do range queries on them.
(declare-pstate mb $$outgoing-transfers
{Long ; user-id
(map-schema String ; transfer-id
(fixed-keys-schema {:to-user-id Long
:amt Long
:success? Boolean})
{:subindex? true})})
(declare-pstate mb $$incoming-transfers
{Long ; user-id
(map-schema String ; transfer-id
(fixed-keys-schema {:from-user-id Long
:amt Long
:success? Boolean})
{:subindex? true})})
;; <<sources defines the ETL logic as Rama dataflow code. Rama's dataflow API works differently than Clojure, but it has
;; the same expressiveness as any general purpose language while also being able to seamlessly distribute computation.
(<<sources mb
;; This subscribes the ETL to *transfer-depot, binding the batch of all data in this microbatch to %microbatch.
;; %microbatch is an anonymous operation which when invoked emits all data for the microbatch across all partitions.
(source> *transfer-depot :> %microbatch)
;; Because of the depot partitioner on *transfer-depot, computation for each piece of data
;; starts on the same task where funds and transfer information are stored for the "from user ID"
;; in the $$funds, $$outgoing-transfers, and $$incoming-transfers PStates.
(%microbatch :> {:keys [*transfer-id *from-user-id *to-user-id *amt]})
;; First check if the user has sufficient funds for the transfer and bind that to the boolean variable
;; *success?. Note that because task threads execute events serially, there are no race conditions here
;; with concurrent transfers since other transfer requests will be queued behind this event on this task.
(local-select> [(keypath *from-user-id) (nil->val 0)] $$funds :> *funds)
(>= *funds *amt :> *success?)
;; If this transfer is valid, then deduct the funds for from-user-id from the $$funds PState.
(<<if *success?
;; This defines an anonymous operation that is used in the transform call to deduct the
;; transfer amount from the current funds.
(<<ramafn %deduct [*curr]
(:> (- *curr *amt)))
(local-transform> [(keypath *from-user-id) (term %deduct)] $$funds))
;; Record the transfer in the $$outgoing-transfers PState for from-user-id.
(local-transform> [(keypath *from-user-id *transfer-id)
(termval {:to-user-id *to-user-id
:amt *amt
:success? *success?})]
$$outgoing-transfers)
;; This switches to the task storing information for to-user-id, which may be on a different machine.
(|hash *to-user-id)
;; If this transfer is valid, then credit the funds to to-user-id in the $$funds PState. Note that microbatching
;; has exactly-once semantics across the whole microbatch, which provides the cross-partition transactionality
;; needed for this use case.
(<<if *success?
;; Aggregation is a way to specify a PState update in a slightly higher level way. It specifies the update
;; in the shape of the data structure being written to, and it takes care of initializing non-existent values.
;; In this, the +sum aggregator knows to initialize the funds to 0 if that value doesn't exist for this user yet.
(+compound $$funds {*to-user-id (aggs/+sum *amt)}))
;; Record the transfer in the $$incoming-transfers PState for to-user-id.
(local-transform> [(keypath *to-user-id *transfer-id)
(termval {:from-user-id *from-user-id
:amt *amt
:success? *success?})]
$$incoming-transfers)
;; This subscribes the topology to *deposit-depot and defines the ETL logic for it.
(source> *deposit-depot :> %microbatch)
(%microbatch :> {:keys [*user-id *amt]})
(+compound $$funds {*user-id (aggs/+sum *amt)})
)))
(ns rama.gallery.top-users-module
(:use [com.rpl.rama]
[com.rpl.rama.path])
(:require [com.rpl.rama.aggs :as aggs]
[com.rpl.rama.ops :as ops]))
;; This module demonstrates "top N" analytics in the context of computing the top spending users in
;; an e-commerce application. The module receives a stream of purchase data and incrementally maintains
;; a global list of the top 500 users by total purchase amount.
;; This module only does the analytics portion. It can be combined with code such as in ProfilesModule to
;; also handle things like account registration and profile management.
;; See the test file top_users_module_test.clj for examples of querying the top users.
(def TOP-AMOUNT 500)
;; As with all the demos, data is represented using plain Clojure records. You can represent
;; data however you want, and we generally recommend using a library with compact serialization,
;; strong schemas, and support for evolving types (like Thrift or Protocol Buffers). We use plain
;; Clojure records in these demos to keep them as simple as possible by not having additional
;; dependencies. Rama uses Nippy for serialization, and you can extend that directly or define a custom
;; serialization through Rama to support your own representations. In all cases you always work with
;; first-class objects all the time when using Rama, whether appending to depots, processing in ETLs,
;; or querying from PStates.
(defrecord Purchase [user-id purchase-cents])
;; This function implements part of the ETL below for maintaining top users. It's responsible for
;; updating the total spend amount for each user and emitting in each iteration of processing two
;; fields: userId's with a purchase and their updated total spend amount.
;; A "generator" processes a batch of data and emits a new batch of data after performing any amount
;; of computation, aggregation, joins, or other logic on it.
(defgenerator user-spend-subbatch
[microbatch]
(batch<- [*user-id *total-spend-cents]
;; This emits the input batch of data across all partitions.
(microbatch :> {:keys [*user-id *purchase-cents]})
;; Batch blocks must always declare a partitioner before aggregating. In this case, we wish
;; to partition the aggregation of total spend amounts by user ID.
(|hash *user-id)
;; This writes to the PState in the form of an aggregator, which specifies the write in the
;; shape of the data structure being written to. At the leaf is the +sum aggregator which
;; adds each purchase into the total for that user. :new-val> is a special feature
;; available in batch blocks to capture the updated values and emit them along with the keys
;; used in the path to that position in the PState. In this case, following the +compound
;; the variables *user-id and *total-spend-cents are bound for each user updated in this
;; iteration.
(+compound $$user-total-spend
{*user-id (aggs/+sum *purchase-cents
:new-val> *total-spend-cents)})))
;; This defines the module, whose body is a regular Clojure function implementation. All depots, ETLs,
;; PStates, and query topologies are defined via this entry point.
(defmodule TopUsersModule
[setup topologies]
;; This depot takes in Purchase objects. The second argument is a "depot partitioner" that controls
;; how appended data is partitioned across the depot, affecting on which task each piece of data begins
;; processing in ETLs.
(declare-depot setup *purchase-depot (hash-by :user-id))
;; Defines the ETL as a microbatch topology. Microbatch topologies have higher throughput than stream topologies
;; with the tradeoff of update latency being in the hundreds of milliseconds range rather than single-digit milliseconds
;; range. They are generally preferable for analytics-oriented use cases like this one where the extra latency
;; doesn't matter.
(let [mb (microbatch-topology topologies "topusers")]
;; PStates are durable and replicated datastores and are represented as an arbitrary combination of data structures. Reads
;; and writes to PStates go to disk and are not purely in-memory operations.
;; This PState stores the total spend amount for each user, a map from user ID to spend amount (in cents).
(declare-pstate mb $$user-total-spend {Long Long})
;; This PState stores the list of the top 500 spending users. Since it's just a single list, it's declared as a
;; global PState. Global PStates only have a single partition. Note that the schema of the PState is just a plain
;; list and not a map like almost all databases are (with a "key" being the central concept to identify a record
;; or row).
(declare-pstate mb $$top-spending-users java.util.List {:global? true})
;; <<sources defines the ETL logic as Rama dataflow code. Rama's dataflow API works differently than Clojure, but it has
;; the same expressiveness as any general purpose language while also being able to seamlessly distribute computation.
(<<sources mb
;; This subscribes the ETL to *purchase-depot, binding the batch of all data in this microbatch to %microbatch.
;; %microbatch is an anonymous operation which when invoked emits all data for the microbatch across all partitions.
(source> *purchase-depot :> %microbatch)
;; Batch blocks are an enhanced computation mode for dataflow with the same capabilities as relational languages
;; (like SQL) such as inner joins, outer joins, subqueries, and aggregation. See this section of the Rama docs
;; for more details: https://redplanetlabs.com/docs/~/intermediate-dataflow.html#_batch_blocks
(<<batch
;; First, the total spend amounts are updated in a subbatch. This subbatch emits all updated users
;; and their new total spend amounts.
(user-spend-subbatch %microbatch :> *user-id *total-spend-cents)
;; This prepares for aggregating the data by combining the two variables into a 2-tuple.
(vector *user-id *total-spend-cents :> *tuple)
;; The list of top users is stored on a global partition, so the aggregation is partitioned
;; accordingly.
(|global)
;; The +top-monotonic aggregator updates a list according to the provided specification. This instance
;; says to add data in *tuple into the aggregated list, and to keep the top 500. The aggregator
;; only keeps the latest record for each ID, which here is specified as the first element of the tuple
;; (the user ID). The "sort val" is what the aggregator uses for ranking, in this case the total spend
;; amount in the last position of the tuple.
(aggs/+top-monotonic [TOP-AMOUNT]
$$top-spending-users
*tuple
:+options {:id-fn first
:sort-val-fn last})
))))
(ns rama.gallery.profile-module
(:use [com.rpl.rama]
[com.rpl.rama.path])
(:require [com.rpl.rama.aggs :as aggs]
[com.rpl.rama.ops :as ops])
(:import [com.rpl.rama.helpers ModuleUniqueIdPState]))
;; This module demonstrates account registration, generating unique 64-bit user IDs, and editing profiles.
;; The implementation is fault-tolerant, and there are no race conditions.
;;
;; See the test file profile_modules_test.clj for how a client interacts with this module to perform user registrations
;; and profile edits.
;; As with all the demos, data is represented using plain Clojure records. You can represent
;; data however you want, and we generally recommend using a library with compact serialization,
;; strong schemas, and support for evolving types (like Thrift or Protocol Buffers). We use plain
;; Clojure records in these demos to keep them as simple as possible by not having additional
;; dependencies. Rama uses Nippy for serialization, and you can extend that directly or define a custom
;; serialization through Rama to support your own representations. In all cases you always work with
;; first-class objects all the time when using Rama, whether appending to depots, processing in ETLs,
;; or querying from PStates.
(defrecord Registration [uuid username pwd-hash])
(defrecord ProfileEdit [field value])
(defrecord ProfileEdits [user-id edits])
(defn display-name-edit [value] (->ProfileEdit :display-name value))
(defn pwd-hash-edit [value] (->ProfileEdit :pwd-hash value))
(defn height-inches-edit [value] (->ProfileEdit :height-inches value))
;; This defines the module, whose body is a regular Clojure function implementation. All depots, ETLs,
;; PStates, and query topologies are defined via this entry point.
(defmodule ProfileModule
[setup topologies]
;; This depot takes in Registration objects. The second argument is a "depot partitioner" that controls
;; how appended data is partitioned across the depot, affecting on which task each piece of data begins
;; processing in ETLs.
(declare-depot setup *registration-depot (hash-by :username))
;; This depot takes in ProfileEdits objects.
(declare-depot setup *profile-edits-depot (hash-by :user-id))
;; Stream topologies process appended data within a few milliseconds and guarantee all data will be fully processed.
;; Their low latency makes them appropriate for a use case like this.
(let [s (stream-topology topologies "profiles")
;; ModuleUniqueIdPState is a small utility from rama-helpers that abstracts away the pattern of generating
;; unique 64-bit IDs. 64-bit IDs are preferable to UUIDs because they take half the space, but since they're
;; smaller generating them randomly has too high a chance of not being globally unique. ModuleUniqueIdPState
;; uses a PState to track a task-specific counter, and it combines that counter with the task ID to generate IDs
;; that are globally unique.
id-gen (ModuleUniqueIdPState. "$$id")]
;; PStates are durable and replicated datastores and are represented as an arbitrary combination of data structures.
;; Reads and writes to PStates go to disk and are not purely in-memory operations.
;; This PState is used to assign a userId to every registered username. It also prevents race conditions in the case
;; of multiple concurrent registrations of the same username. Every registration contains a UUID that uniquely identifies
;; the registration request. The first registration records its UUID along with the generated 64-bit userId in this PState.
;; A registration request is known to be successful if the UUID used for registration is recorded in this PState.
;; Further details are described below with the ETL definition.
(declare-pstate s $$username->registration {String ; username
(fixed-keys-schema {:user-id Long
:uuid String})})
;; This PState stores all profile information for each userId.
(declare-pstate s $$profiles {Long ; user ID
(fixed-keys-schema {:username String
:pwd-hash String
:display-name String
:height-inches Long})})
;; This declares the underlying PState needed by ModuleUniqueIdPState.
(.declarePState id-gen s)
;; <<sources defines the ETL logic as Rama dataflow code. Rama's dataflow API works differently than Clojure, but it has
;; the same expressiveness as any general purpose language while also being able to seamlessly distribute computation.
(<<sources s
;; This subscribes the ETL to *registration-depot. The :> keyword separates the inputs and outputs of the form. The output
;; here is destructured to capture the fields "uuid", "username", and "pwd-hash" to Rama variables of the same name.
;; Because of the depot partitioner on *registrationDepot, computation starts on the same task where registration info
;; is stored for that username in the $$username->registration PState.
(source> *registration-depot :> {:keys [*uuid *username *pwd-hash]})
;; The first step of registration is to see if this username is already registered. So the current registration info
;; is fetched from the $$username->registration PState and bound to the variable *currInfo.
;; A critical property of Rama is that only one event can run on a task at time. So while an ETL event is running,
;; no other ETL events, PState queries, or other events can run on the task. In this case, we know that any other
;; registration requests for the same username are queued behind this event, and there are no race conditions with
;; concurrent registrations because they are run serially on this task for this username.
(local-select> (keypath *username) $$username->registration :> {*curr-uuid :uuid :as *curr-info})
;; There are two cases where this is a valid registration:
;; - *curr-info is null, meaning this is the first time a registration has been seen for this username
;; - The UUID inside *curr-info matches the registration UUID. This indicates the registration request was retried,
;; either by the stream topology due to a downstream failure (e.g. a node dying), or by the client re-appending
;; the same request to the depot due to receiving an error.
(<<if (or> (nil? *curr-info)
(= *curr-uuid *uuid))
;; This block is run when the condition to <<if was true. No block is provided for the false case since
;; a registration of an invalid username is a no-op.
;; java-macro! is a way to insert a snippet of code generated from the Java API into Clojure dataflow code.
;; ModuleUniqueIDPState defines the method "genId" to insert code to generate a globally unique ID and bind
;; it to the specified variable. The generated code increments the counter on this task by one and computes
;; the ID by combining that counter with the task ID.
(java-macro! (.genId id-gen "*user-id"))
;; This records the registration info in the PState.
(local-transform> [(keypath *username)
(multi-path [:user-id (termval *user-id)]
[:uuid (termval *uuid)])]
$$username->registration)
;; The ETL is currently partitioned by username, but now it needs to record information for a user ID. This
;; |hash call relocates computation to the task which will be used to store information for this user ID.
;; |hash always chooses the same task ID for the same user ID but evenly spreads different user IDs across
;; all tasks. The code before and after this call can run on different processes on different machines, and Rama
;; takes care of all serialization and network transfer required.
(|hash *user-id)
;; Finally, this code records the username and pwd-hash for the new user ID in the $$profiles PState.
(local-transform> [(keypath *user-id)
(multi-path [:username (termval *username)]
[:pwd-hash (termval *pwd-hash)])]
$$profiles)
;; Stream topologies can return information back to depot append clients with "ack returns". The client
;; receives the resulting "ack return" for each subscribed colocated stream topology in a map from
;; topology name to value. Here, the ack return is used to let the client know the user ID for their
;; newly registered username. If the ack return is nil, then the client knows the username registration
;; failed.
(ack-return> *user-id))
;; This subscribes the ETL to *profile-edits-depot, destructuring the fields of edit objects to the variables
;; *user-id and *edits. The depot partitioner in this case ensures that processing starts on the task where
;; we're storing information for the user ID.
(source> *profile-edits-depot :> {:keys [*user-id *edits]})
;; *edits is a list, and ops/explode emits one time for every element in that list. Each element is destructured to
;; the variables *field and *value.
(ops/explode *edits :> {:keys [*field *value]})
;; This writes the new value for each field into the $$profiles PState.
(local-transform> [(keypath *user-id *field) (termval *value)] $$profiles)
)))
=========We will now paste in the clojuredoc.========== com.rpl.rama.test
== create-ipc
*Usage:*
----
(create-ipc)
----
----
(create-ipc custom-serialization-classes)
----
Create an InProcessCluster with an optional list of RamaCustomSerialization class references.
== create-test-pstate
*Usage:*
----
(create-test-pstate schema)
----
Creates an object compatible with local-transform> and local-select> based on the given schema for use in unit tests. This object should be closed when no longer needed to clean up its resources.
== destroy-module21
*Usage:*
----
(destroy-module! ipc module-name)
----
Destroys a module.
== gen-hashing-index-keys
*Usage:*
----
(gen-hashing-index-keys num-tasks)
----
----
(gen-hashing-index-keys prefix num-tasks)
----
Generates a key that hashes to each task in a range of tasks. The nth element in the returned list partitions to the nth task. This function is deterministic and will always produce the same keys. An optional prefix can be provided. Calling this function with different prefixes will produce unique keys.
== launch-module21
*Usage:*
----
(launch-module! ipc module config)
----
Launch a module. Blocks until module is running.
== pause-microbatch-topology21
*Usage:*
----
(pause-microbatch-topology! ipc module-name topology-name)
----
Pauses the specified microbatch topology so it stops processing. Blocks until the currently running microbatch finishes. No-op if topology is already paused.
== resume-microbatch-topology21
*Usage:*
----
(resume-microbatch-topology! ipc module-name topology-name)
----
Resumes a paused microbatch topology so it starts processing again. No-op if topology is already active.
== test-pstate-select
*Usage:*
----
(test-pstate-select *path *tp)
----
Selects from the test PState using the given path and returns results as a sequence.
== test-pstate-select-one
*Usage:*
----
(test-pstate-select-one path tp)
----
Selects a single value from the PState using the given path.
== test-pstate-transform
*Usage:*
----
(test-pstate-transform *path *tp)
----
Transforms the test PState with the given path.
== update-module21
*Usage:*
----
(update-module! ipc module)
----
----
(update-module! ipc module options)
----
Update a module to a new version. Blocks until the update is complete. :objects-to-delete option must be specified if update is removing any depots or PStates. Example: (update-module! ipc my-new-module {:objects-to-delete ["$$p" "*depot2"]})
== wait-for-microbatch-processed-count
*Usage:*
----
(wait-for-microbatch-processed-count ipc module-name topology-name count)
----
Block until specified microbatch topology has finished processing at least the specified number of depot records since the topology started. If condition fails to be achieved within a timeout, an exception is thrown.
= com.rpl.rama.path
== AFTER-ELEM
Navigates on a sequence to void element at end. Used in transforms to append a single element. Virtual value navigator. For proxies transforms with this navigator produce SequenceInsertDiff .
== ALL
Navigates to every element of a collection. Works on lists, maps, and sets. For maps navigates to key/value pair with {@link java.util.List} interface. Can transform to NONE to remove element. Value navigator. For proxies transforms with this navigator produce diffs according to the type of data structure involved. For sequences produces SequenceIndexChangesDiff , SequenceIndexRemoveDiff , or a combination. For sets produces SetAddDiff , SetRemoveDiff , or a combination. For maps produces KeyDiff , KeyRemoveDiff , or a combination. When multiple diffs are produced they are combined with MultiDiff .
== ATOM
Navigates to atom value.
== BEFORE-ELEM
Navigates on a sequence to void element at beginning. Used in transforms to prepend a single element. Virtual value navigator. For proxies transforms with this navigator produce SequenceInsertDiff .
== before-index
*Usage:*
----
(before-index index)
----
Navigates on list to void element before an index. Used in transforms to insert an element in middle of list. Virtual value navigator. For proxies transforms with this navigator produce SequenceInsertDiff .
== BEGINNING
Navigates on a sequence to empty subsequence at beginning. Used in transforms to prepend multiple elements. Substructure navigator. For proxies transforms with this navigator produce SequenceInsertsDiff .
== collect
*Usage:*
----
(collect path)
----
Collects a list of values selected with given path from current point.
== collect-one
*Usage:*
----
(collect-one path)
----
Collects a single value selected with given path from current point. Exception if path selects zero values or more than one value.
== collected3F
*Usage:*
----
(collected? params & body)
----
Stops navigation if given function on collected values returns false. Otherwise, stays navigated at current point. Filter navigator.
== comp-navs
This is an advanced navigator composition function that should only be used with instances of RichNavigator . Should generally prefer path instead.
== comp-paths
*Usage:*
----
(comp-paths & apath)
----
Returns a compiled version of the given path for use with compiled-{select/transform/setval/etc.} functions.
== cond-path
*Usage:*
----
(cond-path & path-pairs)
----
Alternates condition path with subsequent navigation paths. Condition path is considered true if it selects anything. Continues navigation with navigation path for first condition path to meet this condition. Example: (cond-path #(> (count %) 2) [:a MAP-VALS] [MAP-VALS even?] :b)
== continue-then-stay
Navigates to the provided path and then to the current element. This can be used to implement post-order traversal.
== continuous-subseqs
*Usage:*
----
(continuous-subseqs bounds-fn)
----
Navigates to every continuous subsequence of elements based on bounds-fn . bounds-fn is function of element and result of bounds-fn on previous element. Truthy returns are matched into a subsequence. Example: (let [bounds (fn [elem prev] (cond (identical? :START elem) true (identical? :END elem) :END (identical? :END prev) false :else prev ))] (is (= [5 6 7] (setval (continuous-subseqs bounds) nil [:START 1 2 3 :END 5 6 7 :START 8 9 :END]))))
== declarepath
*Usage:*
----
(declarepath name)
----
Declares a var to provide a path implementation later with providepath .
== defcollector
*Usage:*
----
(defcollector name params [_ [_ structure-sym] & body])
----
Defines a collector that adds a value to collected list based on currently navigated value. Example: (defcollector MY-COLLECTOR [] (collect-val [this structure] (-> structure :a :b :c)))
== defdynamicnav
*Usage:*
----
(defdynamicnav name & args)
----
Defines a function that can choose what navigator to use at runtime based on the dynamic context. The arguments will either be static values or objects satisfying dynamic-param? . Use late-bound to produce a runtime navigator that uses the values of the dynamic params. See selected? for an illustrative example of dynamic navs.
== defnav
See this page for an overview of defining new navigators with defnav .
== defprotocolpath
*Usage:*
----
(defprotocolpath name)
----
----
(defprotocolpath name params)
----
Defines a navigator that chooses the path to take based on the type of the value at the current point. See this page for more details.
== defrichnav
== DISPENSE
Drops all collected values for subsequent navigation.
== dynamic-param3F
*Usage:*
----
(dynamic-param? object)
----
This function is used with defdynamicnav . True if object is a dynamic param.
== eachnav
Turns a navigator that takes one argument into a navigator that takes many arguments and uses the same navigator with each argument. There is no performance cost to using this. See implementation of keypath
== END
Navigates on a sequence to empty subsequence at end. Used in transforms to append multiple elements. Substructure navigator. For proxies transforms with this navigator produce SequenceInsertsDiff .
== extend-protocolpath
*Usage:*
----
(extend-protocolpath protpath & extensions)
----
Used in conjunction with defprotocolpath . See defprotocolpath .
== filterer
*Usage:*
----
(filterer & path)
----
Navigates to a view of the current sequence that only contains elements that match the given path. An element matches the selector path if calling select on that element with the path yields anything other than an empty sequence. For transformation: NONE entries in the result sequence cause corresponding entries in input to be removed. A result sequence smaller than the input sequence is equivalent to padding the result sequence with NONE at the end until the same size as the input.
== FIRST
Navigates on list to first element if not empty. If empty, stops navigation. Can transform to NONE to remove element. Value navigator. For proxies transforms with this navigator produce SequenceIndexChangeDiff or SequenceIndexRemoveDiff .
== if-path
*Usage:*
----
(if-path cond-path then-path)
----
----
(if-path cond-path then-path else-path)
----
Like cond-path , but with if semantics.
== index-nav
*Usage:*
----
(index-nav index)
----
Navigates to the index of a list if it exists. This navigates to the index, not to the value at that index. Changing the index in a transform will move the element in the resulting list. Value navigator. For proxies transforms with this navigator produce SequenceReorderDiff .
== INDEXED-VALS
indexed-vals with a starting index of 0.
== indexed-vals
*Usage:*
----
(indexed-vals start-index)
----
Navigates to pairs of [index, value] for every element of list, starting from start-index . Pairs are represented as lists of two elements. Transforms should produce new [index, value] pairs. Changing an index in a transform will move the element in the resulting list. Value navigator.
== java-path->clojure-path
*Usage:*
----
(java-path->clojure-path jpath)
----
Converts a path specified with the Java API to a Clojure path
== keypath
*Usage:*
----
(keypath & keys)
----
Navigates to value for a key in a map. For convenience, can navigate into further nested maps by providing multiple key arguments. Can transform to NONE to remove element. Value navigator. For proxies transforms with this navigator produce KeyDiff or KeyRemoveDiff .
== LAST
Navigates on list to last element if not empty. If empty, stops navigation. Can transform to NONE to remove element. Value navigator. For proxies transforms with this navigator produce SequenceIndexChangeDiff or SequenceIndexRemoveDiff .
== late-bound
*Usage:*
----
(late-bound bindings & body)
----
Macro to efficiently resolve dynamic arguments from inline compilation to construct a navigator. Binding values can be static or dynamic params, and binding vars will be resolved to the runtime values. Use with late-path or late-resolved-fn to inform inline compiler how to handle those params. Example: (defdynamicnav my-dynamic-nav [i path] (late-bound [latei i late-path (late-path path)] (comp-navs (nthpath i) (selected? late-path))))
== late-path
*Usage:*
----
(late-path param)
----
Marks a potentially dynamic param as being a path, informing the inline compiler to optimize that param as a path.
== late-resolved-fn
*Usage:*
----
(late-resolved-fn param)
----
Marks a potentially dynamic param as resolving to a function, informing the inline compiler to optimize that accordingly. If it’s an expression with all static params, it will be resolved at compile-time. Otherwise, an expression to resolve at runtime is inserted.
== local-declarepath
*Usage:*
----
(local-declarepath)
----
Declare a path whose implementation will be provided later with providepath . Used to create recursive or mutually recursive paths.
== map-key
*Usage:*
----
(map-key key)
----
Navigates to a key in a map if it exists. In transforms, changing the key is the same as removing the original key and then adding the updated key with the original value. This navigator is generally only useful in transforms. Can transform to NONE to remove element. Value navigator. For proxies transforms with this navigator produce KeyChangeDiff or KeyRemoveDiff .
== MAP-KEYS
Navigates to every key of a map. In transforms, changing a key is the same as removing the original key and then adding the updated key with the original value. Can transform to NONE to remove element. Value navigator. For proxies transforms with this navigator produce KeysRemoveDiff , KeysDiff , or a combination of the two with MultiDiff .
== MAP-VALS
Navigates to every value of a map. Can transform to NONE to remove element. Value navigator. For proxies transforms with this navigator produce KeysRemoveDiff , KeysDiff , or a combination of the two with MultiDiff .
== multi-path
*Usage:*
----
(multi-path & paths)
----
Navigates to each provided path in order, all starting from this point. Control navigator. For proxies transforms with this navigator produce MultiDiff .
== multi-transform
*Usage:*
----
(multi-transform apath structure)
----
Just like transform but expects transform functions to be specified inline in the path using term or termval . Error is thrown if navigation finishes at a non-terminal navigator. This macro will do inline caching of the path.
== multi-transformed
*Usage:*
----
(multi-transformed path)
----
Navigates to a view of the current value by transforming it with the specified path. The terminal positions of the path must be term or termval .
== must
*Usage:*
----
(must & keys)
----
Navigates to value for a key in a map only if key exists. If key does not exist, stops navigation. For convenience, can navigate into further nested maps by providing multiple key arguments. Can transform to NONE to remove element. Value navigator. For proxies transforms with this navigator produce KeyDiff or KeyRemoveDiff .
== NAME
Navigates to the name portion of the keyword or symbol.
== NAMESPACE
Navigates to the namespace portion of the keyword or symbol.
== NIL->LIST
Navigates to '() if the value is nil . Otherwise it stays navigated at the current value.
== NIL->SET
Navigates to #{} if the value is nil. Otherwise it stays navigated at the current value.
== nil->val
*Usage:*
----
(nil->val val)
----
Navigates to the provided value if the structure is nil . Otherwise it stays navigated at the structure.
== NIL->VECTOR
Navigates to [] if the value is nil. Otherwise it stays navigated at the current value.
== NONE
Global value used to indicate no elements selected during select-any . Also used in transforms to remove elements from data structures. In Rama dataflow code, use NONE> rather than (termval NONE) since NONE cannot be used in dataflow code.
== NONE->val
Navigates to the provided val if the structure is NONE . Otherwise it stays navigated at the structure. Only valid in transforms.
== NONE-ELEM
Navigates to void element of set. Used to add a single element to a set in transforms. Virtual value navigator. For proxies transforms with this navigator produce SetAddDiff .
== NONE3F
Returns true if the given value is NONE , false otherwise.
== not-selected3F
*Usage:*
----
(not-selected? & path)
----
Continues navigation if provided path executed on current value navigates to zero values. Otherwise, stops current branch of navigation. Filter navigator.
== nthpath
*Usage:*
----
(nthpath & indices)
----
Navigates to value for index in a list. For convenience, can navigate into further nested lists by providing multiple index arguments. Can transform to NONE to remove element. Value navigator. For proxies transforms with this navigator produce SequenceIndexChangeDiff or SequenceIndexRemoveDiff .
== parser
*Usage:*
----
(parser parse-fn unparse-fn)
----
Navigate to the result of running parse-fn on the value. For transforms, the transformed value then has unparse-fn run on it to get the final value at this point.
== path
*Usage:*
----
(path & path)
----
Same as calling comp-paths , except it caches the composition of the static parts of the path at compile-time for later re-use (when possible). For almost all idiomatic uses provides huge speedup. This macro is automatically used by the select/transform/setval/replace-in/etc. macros.
== pred
*Usage:*
----
(pred afn)
----
Continues navigation if provided function returns true on current value. Otherwise, stops current branch of navigation. Filter navigator.
== pred3C
*Usage:*
----
(pred< v)
----
Same as (pred #(< % v)) .
== pred3C3D
*Usage:*
----
(pred<= v)
----
Same as (pred #(<= % v)) .
== pred3D
*Usage:*
----
(pred= v)
----
Same as (pred #(= % v)) .
== pred>
*Usage:*
----
(pred> v)
----
Same as (pred #(> % v)) .
== pred>3D
*Usage:*
----
(pred>= v)
----
Same as (pred #(>= % v)) .
== providepath
*Usage:*
----
(providepath name apath)
----
Provide a path implementation for a previously declared declarepath or local-declarepath .
== putval
*Usage:*
----
(putval val)
----
Adds an external value to the collected values list. Useful when additional arguments are required to the transform function that would otherwise require partial application or a wrapper function. Example: (transform [:a :b (putval 3)] + some-map) This increments the value at path [:a :b] by 3:
== recursive-path
*Usage:*
----
(recursive-path params self-sym path)
----
Defines a path that can refer to itself recursively. Example: (def TreeValues (recursive-path [] p (if-path vector? [ALL p] STAY ))) This navigates to every leaf of a tree represented as nested vectors.
== select
*Usage:*
----
(select apath structure)
----
Navigates to and returns a sequence of all the elements specified by the path. This macro will do inline caching of the path.
== select-any
*Usage:*
----
(select-any apath structure)
----
Returns any element found or NONE if nothing selected. This is the most efficient of the various selection operations. This macro will do inline caching of the path.
== select-first
*Usage:*
----
(select-first apath structure)
----
Returns first element found. This macro will do inline caching of the path.
== select-one
*Usage:*
----
(select-one apath structure)
----
Like select, but returns either one element or nil. Throws exception if multiple elements found. This macro will do inline caching of the path.
== select-one21
*Usage:*
----
(select-one! apath structure)
----
Returns exactly one element, throws exception if zero or multiple elements found. This macro will do inline caching of the path.
== selected-any3F
*Usage:*
----
(selected-any? apath structure)
----
Returns true if any element was selected, false otherwise. This macro will do inline caching of the path.
== selected3F
*Usage:*
----
(selected? & path)
----
Continues navigation if provided path executed on current value navigates to at least one value. Otherwise, stops current branch of navigation. Filter navigator.
== set-elem
*Usage:*
----
(set-elem elem)
----
Navigates to element of set if it exists. If element does not exist, stops navigation. Can transform to NONE to remove element. Value navigator. For proxies transforms with this navigator produce SetAddDiff , SetRemoveDiff , or a combination with MultiDiff .
== setval
*Usage:*
----
(setval apath aval structure)
----
Navigates to each value specified by the path and replaces it by aval . This macro will do inline caching of the path.
== sorted-map-range
*Usage:*
----
(sorted-map-range start-key end-key)
----
----
(sorted-map-range start-key end-key options)
----
Navigates to the sorted submap bound by the keys start-key and end-key . Options map accepts :inclusive-start? and :inclusive-end? , defaulting to true and false respectively. Useful on subindexed maps . Substructure navigator. For proxies transforms with this navigator that do fine-grained updates to the submap produce KeysRemoveDiff , KeysDiff , or a combination of the two with MultiDiff . If the submap is modified in a coarse-grained way (e.g. overridden completely with termVal ), this navigator will produce a NewValueDiff .
== sorted-map-range-from
*Usage:*
----
(sorted-map-range-from start-key)
----
----
(sorted-map-range-from start-key max-amt-or-options-map)
----
Navigates to the sorted submap starting at the key start-key . Second argument can be a number for max amount to fetch or a map that accepts :max-amt and :inclusive? (defaults to true). Useful on subindexed maps . Substructure navigator. For proxies transforms with this navigator that do fine-grained updates to the submap produce KeysRemoveDiff , KeysDiff , or a combination of the two with MultiDiff . If the submap is modified in a coarse-grained way (e.g. overridden completely with termVal ), this navigator will produce a NewValueDiff .
== sorted-map-range-from-start
*Usage:*
----
(sorted-map-range-from-start max-amt)
----
Navigates to the sorted submap starting from the beginning up to max-amt elements. Useful on subindexed maps . Substructure navigator.
== sorted-map-range-to
*Usage:*
----
(sorted-map-range-to end-key)
----
----
(sorted-map-range-to end-key max-amt-or-options-map)
----
Navigates to the sorted submap up to the key end-key . Second argument can be a number for max amount to fetch (reading backwards from that key) or a map accepts :max-amt and :inclusive? (defaults to false). Useful on subindexed maps . Substructure navigator. For proxies transforms with this navigator that do fine-grained updates to the submap produce KeysRemoveDiff , KeysDiff , or a combination of the two with MultiDiff . If the submap is modified in a coarse-grained way (e.g. overridden completely with termVal ), this navigator will produce a NewValueDiff .
== sorted-map-range-to-end
*Usage:*
----
(sorted-map-range-to-end max-amt)
----
Navigates to the sorted submap of max-amt elements, ending at the end of the data structure. Useful on subindexed maps . Substructure navigator.
== sorted-set-range
*Usage:*
----
(sorted-set-range start-elem end-elem)
----
----
(sorted-set-range start-elem end-elem options)
----
Navigates to the sorted subset bound by start-elem and end-elem . Options map accepts :inclusive-start? and :inclusive-end? , defaulting to true and false respectively. Useful on subindexed sets . Substructure navigator.
== sorted-set-range-from
*Usage:*
----
(sorted-set-range-from start-elem)
----
----
(sorted-set-range-from start-elem max-amt-or-options-map)
----
Navigates to the sorted subset starting at start-elem . Second argument can be a number for max amount to fetch or a map that accepts :max-amt and :inclusive? (defaults to true ). Useful on subindexed sets . Substructure navigator.
== sorted-set-range-from-start
*Usage:*
----
(sorted-set-range-from-start max-amt)
----
Navigates to the sorted subset starting from the beginning up to max-amt elements. Useful on subindexed sets . Substructure navigator.
== sorted-set-range-to
*Usage:*
----
(sorted-set-range-to end-elem)
----
----
(sorted-set-range-to end-elem max-amt-or-options-map)
----
Navigates to the sorted subset up to end-elem . Second argument can be a number for max amount to fetch (reading backwards from that element) or a map that accepts :max-amt and :inclusive? (defaults to false ). Useful on subindexed sets . Substructure navigator.
== sorted-set-range-to-end
*Usage:*
----
(sorted-set-range-to-end max-amt)
----
Navigates to the sorted subset ending at the end, up to max-amt elements. Useful on subindexed sets . Substructure navigator.
== srange
*Usage:*
----
(srange start end)
----
Navigates to sublist bounded by two indexes. Substructure navigator. For proxies transforms with this navigator produce sequence diffs according to the rest of the transform path.
== srange-dynamic
*Usage:*
----
(srange-dynamic start-fn end-fn)
----
Navigates to sublist bounded by indexes chosen by functions. Substructure navigator. start-fn takes in the structure as input, and end-fn takes in the structure and the result of start-fn . For proxies transforms with this navigator produce sequence diffs according to the rest of the transform path.
== STAY
Stays navigated at the current point. Essentially a no-op navigator.
== STOP
Stops navigation at this point. For selection returns nothing and for transformation returns the structure unchanged
== submap
*Usage:*
----
(submap keys)
----
Navigates to a map containing subset of keys/values of starting map. Substructure navigator. For proxies transforms with this navigator that do fine-grained updates to the submap produce KeysRemoveDiff , KeysDiff , or a combination of the two with MultiDiff . If the submap is modified in a coarse-grained way (e.g. overridden completely with termval ), this navigator will produce a NewValueDiff .
== subselect
*Usage:*
----
(subselect & path)
----
Navigates to list of values navigated by provided path. Transforms on that list will update original locations of each value, no matter how nested. Control navigator. For proxies transforms with this navigator produce diffs according to rest of transform path.
== subset
*Usage:*
----
(subset elems)
----
Navigates to subset of starting set with given elements. Substructure navigator. For proxies transforms with this navigator produce SetAddDiff , SetRemoveDiff , or a combination with MultiDiff .
== term
*Usage:*
----
(term term-obj)
----
For usage with multi-transform or local-transform> , defines an endpoint in the navigation that will have the parameterized transform function run. The transform function works just like it does in transform , with collected values given as the first arguments
== termval
*Usage:*
----
(termval v)
----
Like term but specifies a val to set at the location regardless of the collected values or the value at the location. NONE is a special value that can be set to indicate a navigator should perform a removal of the key leading to that value. In Rama dataflow code NONE> should be used instead of (termval NONE)
== transform
*Usage:*
----
(transform apath transform-obj structure)
----
Navigates to each value specified by the path and replaces it by the result of running the transform function on it. Any values collected in the path will be the first arguments to the transform function. This macro will do inline caching of the path.
== transformed
*Usage:*
----
(transformed path update-fn)
----
Navigates to a view of the current value by transforming it with the specified path and update-fn .
== traverse
*Usage:*
----
(traverse apath structure)
----
Return a reducible object that traverses over structure to every element specified by the path. This macro will do inline caching of the path.
== traverse-all
*Usage:*
----
(traverse-all apath)
----
Returns a transducer that traverses over each element with the given path.
== VAL
Collects the currently navigated value.
== view
*Usage:*
----
(view afn)
----
----
(view afn & args)
----
Navigates to result of running afn on the currently navigated value. View navigator. For proxies transforms with this navigator produce NewValueDiff .
== with-fresh-collected
*Usage:*
----
(with-fresh-collected & path)
----
Continues navigating on the given path with the collected values reset to [] . Once navigation leaves the scope of with-fresh-collected , the collected values revert to what they were before.
== with-page-size
*Usage:*
----
(with-page-size page-size & path)
----
Pagination done by navigators in subpath use the specified page size. Can only be used in yielding selects.
= com.rpl.rama.aggs
== +and
*Usage:*
----
(+and $$target *val)
----
----
(+and *val :> *result)
----
Applies and operator to all input. Null or false values are considered “false”, and all other values are considered “true”. If all inputs are “true”, aggregates to last “true” value seen. If any input is “false”, aggregates to first “false” value seen. If no input, aggregates to boolean true .
== +avg
*Usage:*
----
(+avg *val :> *result)
----
Aggregates average of input. Cannot be used for aggregation into an existing PState.
== +count
*Usage:*
----
(+count $$target *val)
----
----
(+count *val :> *result)
----
Aggregates count of values.
== +first
*Usage:*
----
(+first $$target *val)
----
----
(+first *val :> *result)
----
Aggregates to the first input seen.
== +last
*Usage:*
----
(+last $$target *val)
----
----
(+last *val :> *result)
----
Aggregates to the last input seen.
== +limit
*Usage:*
----
(+limit [amt] & vars)
----
----
(+limit [amt] & vars :+options options-map)
----
Special combiner aggregator that limits incoming data to a fixed amount according to optional sort options. Only usable in batch blocks and operates during agg and post-agg phases. Output of aggregator is same as input but limited to the specified number. Available options: :sort : expression to sort emitted values by :reverse? : if sort expression is specified, emits and limits by reverse order :index-var : variable to bind the index of each emit Examples: (+limit [5] *v) (+limit [10] *v1 *v2 :+options {:sort *v1 :reverse? true :index-var *i}) (+limit [100] *v1 *v2 *v3 :+options {:sort [*v3 *v1]})
== +map-agg
*Usage:*
----
(+map-agg $$target *key *val)
----
----
(+map-agg *key *val :> *map)
----
Aggregates a map from key/value inputs
== +max
*Usage:*
----
(+max $$target *val)
----
----
(+max *val :> *result)
----
Aggregates maximum value. Aggregates nil if no input.
== +merge
*Usage:*
----
(+merge $$target *map)
----
----
(+merge *map :> *result-map)
----
Aggregates maps by merging them like Clojure’s merge function.
== +min
*Usage:*
----
(+min $$target *val)
----
----
(+min *val :> *result)
----
Aggregates minimum value. Aggregates nil if no input.
== +multi-set-agg
*Usage:*
----
(+multi-set-agg $$target *val)
----
----
(+multi-set-agg *val :> *map)
----
Aggregates values into a map from value to count.
== +NONE
Aggregator version of NONE> , causing that element to be removed from that position. Example: (+compound $$p {*k (+NONE)})
== +or
*Usage:*
----
(+or $$target *val)
----
----
(+or *val :> *result)
----
Applies or operator to all input. Null or boolean false values are considered “false”, and all other values are considered “true”. If any input is “true”, aggregates to first “true” value seen. If all inputs are “false”, aggregates to last “false” value seen. If no input, aggregates to boolean false .
== +set-agg
*Usage:*
----
(+set-agg $$target *val)
----
----
(+set-agg *val :> *set)
----
Aggregates a set.
== +set-remove-agg
*Usage:*
----
(+set-remove-agg $$target *val)
----
----
(+set-remove-agg *val :> *set)
----
Aggregator to remove an element from a set. Example: (+compound $$p {*k (+set-remove-agg *v)})
== +sum
*Usage:*
----
(+sum $$target *val)
----
----
(+sum *val :> *result)
----
Aggregates sum of values.
== +top-monotonic
*Usage:*
----
(+top-monotonic [amt] *object :> *list)
----
----
(+top-monotonic [amt] *object :+options options-map :> *list)
----
----
(+top-monotonic [amt] $$target *object)
----
----
(+top-monotonic [amt] $$target *object :+options options-map)
----
Aggregates input into list of top elements. See the extended documentation for more details. Available options: :id-fn : function to extract identifier from input objects so existing entities can be replaced during aggregation :sort-val-fn : function to extract a sort val from input objects :sort-type : can be :ascending or :descending (default) Examples: (+top-monotonic [10] *data :> *res :+options {:id-fn first :sort-val-fn second}) (+top-monotonic [10] $$topn *data :> *res :+options {:id-fn first :sort-val-fn second}) (+compound $$p {*k (+top-monotonic *data :+options {:id-fn first :sort-val-fn second})})
== +vec-agg
*Usage:*
----
(+vec-agg $$target *val)
----
----
(+vec-agg *val :> *vector)
----
Aggregates a vector.
= com.rpl.rama.ops
== agg->init-fn
*Usage:*
----
(agg->init-fn agg)
----
Extracts init function from the given accumulator or combiner . Init function is a zero-arg function returning the starting aggregation state.
== agg->update-fn
*Usage:*
----
(agg->update-fn agg)
----
Extracts update function from the given accumulator or combiner . Update function is from agg-state & args -> new-agg-state
== current-microbatch-id
*Usage:*
----
(current-microbatch-id)
----
Returns ID of current microbatch attempt. Can only be called within a microbatch topology. This can be used to achieve fault-tolerant exactly-once update semantics when updating external systems with Rama’s integration API.
== current-random-source
*Usage:*
----
(current-random-source)
----
Get shared java.util.Random instance for the task thread
== current-task-id
*Usage:*
----
(current-task-id)
----
Returns task ID where event is running.
== expand
*Usage:*
----
(expand *data :> & fields)
----
Emits every element of input list as a separate field. Useful for processing elements of fixed-size lists. Example: (ops/expand *data :> *a *b *c)
== explode
*Usage:*
----
(explode *sequence :> *element)
----
Emits once for each element of a sequence. For example: (?<- (explode [1 2 3] :> *v) (println *v)) 1 2 3 nil
== explode-indexed
*Usage:*
----
(explode-indexed *sequence :> *index *element)
----
Same as explode except also outputs the index of each element. For example: (?<- (ops/explode-indexed [:a :b :c] :> *i *v) (println *i *v)) 0 :a 1 :b 2 :c nil
== explode-map
*Usage:*
----
(explode-map *map :> *key *value)
----
Like explode except emits each key/value pair as separate elements. For example: (?<- (ops/explode-map {:a 1 :b 2 :c 3} :> *k *v) (println *k *v)) :a 1 :b 2 :c 3 nil
== module-instance-info
*Usage:*
----
(module-instance-info)
----
Returns ModuleInstanceInfo for module and worker of running event.
== range>
*Usage:*
----
(range> *start *end :> *value)
----
Like Clojure range , but emits once per element in the range. Example: (?<- (range> 10 13 :> *v) (println *v)) 10 11 12 nil
== sum
*Usage:*
----
(sum nums)
----
Returns sum of values in provided sequence.
== vget
*Usage:*
----
(vget $$pstate :> *value)
----
Extracts contents of a PState as a value. Should only be called on PStates with a top-level Class schema.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment