Created
September 5, 2013 03:20
-
-
Save ksauzz/6445675 to your computer and use it in GitHub Desktop.
riak-java-client v1.4.0 2i sample
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.ByteArrayInputStream; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
import java.io.ObjectInputStream; | |
import java.io.ObjectOutputStream; | |
import java.io.Serializable; | |
import java.util.List; | |
import com.basho.riak.client.IRiakClient; | |
import com.basho.riak.client.IRiakObject; | |
import com.basho.riak.client.IndexEntry; | |
import com.basho.riak.client.RiakFactory; | |
import com.basho.riak.client.bucket.Bucket; | |
import com.basho.riak.client.builders.RiakObjectBuilder; | |
import com.basho.riak.client.cap.VClock; | |
import com.basho.riak.client.convert.ConversionException; | |
import com.basho.riak.client.convert.Converter; | |
import com.basho.riak.client.convert.NoKeySpecifiedException; | |
import com.basho.riak.client.convert.RiakIndex; | |
import com.basho.riak.client.convert.RiakIndexConverter; | |
import com.basho.riak.client.convert.RiakKey; | |
import com.basho.riak.client.query.StreamingOperation; | |
import com.basho.riak.client.query.indexes.BinIndex; | |
import com.basho.riak.client.query.indexes.IntIndex; | |
import com.basho.riak.client.query.indexes.RiakIndexes; | |
import com.basho.riak.client.raw.config.Configuration; | |
import com.basho.riak.client.raw.http.HTTPClientConfig; | |
import com.basho.riak.client.raw.pbc.PBClientConfig; | |
public class Riak2iSample { | |
private static final int MAX_CONNECTION_SIZE = 10; | |
private static final int INIT_CONNECTION_SIZE = 10; | |
private static final int BUFFER_KB = 16; | |
private static final int IDLE_CONN_TIMEOUT_MIL = 2000; | |
private static final int CONNECTION_TIMEOUT_MIL = 2000; | |
private static final int REQUEST_TIMEOUT_MIL = 2000; | |
private static final String BUCKET_NAME = "person"; | |
public static void main(String[] args) throws Exception { | |
// Configuration conf = PBClientConfig.defaults(); | |
// Configuration conf = HTTPClientConfig.defaults(); | |
Configuration conf = new PBClientConfig.Builder() | |
.withHost("127.0.0.1") | |
.withPort(8087) | |
.withConnectionTimeoutMillis(CONNECTION_TIMEOUT_MIL) | |
.withIdleConnectionTTLMillis(IDLE_CONN_TIMEOUT_MIL) | |
.withSocketBufferSizeKb(BUFFER_KB) | |
.withRequestTimeoutMillis(REQUEST_TIMEOUT_MIL) | |
.withInitialPoolSize(INIT_CONNECTION_SIZE) | |
.withPoolSize(MAX_CONNECTION_SIZE) | |
.build(); | |
IRiakClient client = RiakFactory.newClient(conf); | |
Bucket bucket = client.createBucket(BUCKET_NAME).execute(); | |
bucket.store(new Person("alice", "HR", 24)).withConverter(new PersonConverter()).execute(); | |
bucket.store(new Person("bob", "Dev", 28)).withConverter(new PersonConverter()).execute(); | |
bucket.store(new Person("clare", "HR", 30)).withConverter(new PersonConverter()).execute(); | |
bucket.store(new Person("dizzy", "Dev", 33)).withConverter(new PersonConverter()).execute(); | |
bucket.store(new Person("eric", "Dev", 27)).withConverter(new PersonConverter()).execute(); | |
bucket.store(new Person("flora", "Dev", 27)).withConverter(new PersonConverter()).execute(); | |
bucket.store(new Person("greg", "Board", 35)).withConverter(new PersonConverter()).execute(); | |
// Dev divisionのみ取得 | |
List<String> DevList = bucket.fetchIndex(BinIndex.named("division")).withValue("Dev").execute(); | |
for (String key : DevList) { | |
Person person = new Person(); | |
person.setName(key); | |
log("Dev: " + bucket.fetch(person).withConverter(new PersonConverter()).execute().toString()); | |
} | |
// HR divisionのみ取得. | |
List<String> HRList = bucket.fetchIndex(BinIndex.named("division")).withValue("HR").execute(); | |
for (String key : HRList) { | |
Person person = new Person(); | |
person.setName(key); | |
log("HR: " + bucket.fetch(person).withConverter(new PersonConverter()).execute().toString()); | |
} | |
// Range queryで特定範囲の値をもつkeyのみ取得 | |
List<String> AgeList = bucket.fetchIndex(IntIndex.named("age")).from(27).to(29).execute(); | |
for (String key : AgeList) { | |
Person person = new Person(); | |
person.setName(key); | |
log("Range from 27 to 29: " + bucket.fetch(person).withConverter(new PersonConverter()).execute().toString()); | |
} | |
// Streaming - Dev divisionのみ取得 | |
StreamingOperation<IndexEntry> streamDev = bucket.fetchIndex(BinIndex.named("division")).withValue("Dev").executeStreaming(); | |
for (IndexEntry index : streamDev) { | |
Person person = new Person(); | |
person.setName(index.getObjectKey()); | |
log("Streaming... Dev: " + bucket.fetch(person).withConverter(new PersonConverter()).execute().toString()); | |
} | |
for (String key : bucket.keys()) { | |
bucket.delete(key).execute(); | |
} | |
client.shutdown(); | |
} | |
private static void log(String log) { | |
System.out.println(log); | |
} | |
public static class PersonConverter implements Converter<Person> { | |
private final RiakIndexConverter<Person> indexConverter = new RiakIndexConverter<Person>(); | |
@Override | |
public IRiakObject fromDomain(Person domainObject, VClock vclock) | |
throws ConversionException { | |
if (domainObject.getName() == null) { throw new NoKeySpecifiedException(domainObject); } | |
byte[] value = serialize(domainObject); | |
return RiakObjectBuilder.newBuilder(BUCKET_NAME, domainObject.getName()) | |
.withValue(value) | |
.withIndexes(indexConverter.getIndexes(domainObject)) | |
.withVClock(vclock) | |
.build(); | |
} | |
// 好みのシリアライザを使ってください。 msgpack, ProtocolBuffer, Kryo, thrift etc.... | |
private byte[] serialize(Person domainObject) { | |
try { | |
ByteArrayOutputStream stream = new ByteArrayOutputStream(); | |
new ObjectOutputStream(stream).writeObject(domainObject); | |
return stream.toByteArray(); | |
} catch (IOException ie) { | |
throw new ConversionException(ie); | |
} | |
} | |
@Override | |
public Person toDomain(IRiakObject riakObject) | |
throws ConversionException { | |
try { | |
ByteArrayInputStream stream = new ByteArrayInputStream(riakObject.getValue()); | |
return (Person) new ObjectInputStream(stream).readObject(); | |
} catch (Exception e) { | |
throw new ConversionException(e); | |
} | |
} | |
} | |
public static class Person implements Serializable { | |
private static final long serialVersionUID = 1L; | |
@RiakKey | |
private String name; | |
@RiakIndex(name="division") | |
private transient String division; // Indexとしてのみ使う値はtransient | |
@RiakIndex(name="age") | |
private int age; | |
public Person(String name, String division, int age) { | |
this.name = name; | |
this.division = division; | |
this.age = age; | |
} | |
public Person() {} | |
public String getName() { | |
return name; | |
} | |
public void setName(String name) { | |
this.name = name; | |
} | |
public String getDivision() { | |
return division; | |
} | |
public void setDivision(String division) { | |
this.division = division; | |
} | |
public int getAge() { | |
return age; | |
} | |
public void setAge(int age) { | |
this.age = age; | |
} | |
@Override | |
public String toString() { | |
return new StringBuilder() | |
.append("name:").append(name) .append(", ") | |
.append("age:").append(age) | |
.toString(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment