I have to store around 250 numerical values per second, per client, which is around 900k numbers per hour. It probably will not be a full-day recording (probably between 5-10 hours a day), but I will partition my data based on the client id and the day the reading is made. The maximum row length comes at about 22-23M which is still manageable. Neverteless, my scheme looks like this:
CREATE TABLE measurement (
clientid text,
date text,
event_time timestamp,
value int,
PRIMARY KEY ((clientid,date), event_time)
);
The keyspace has a replication factor of 2, just for testing, the snitch is GossipingPropertyFileSnitch
and NetworkTopologyStrategy
. I know that replication factor 3 is more production standard.
Next up, I created a small cluster on the companies servers, three bare metal virtualized machines with 2 CPUs x 2 cores and 16GB of RAM and a lot of space. I'm in gigabit LAN with them. The cluster is operational, based on the nodetool.
Here is the code I'm using to test my setup:
Cluster cluster = Cluster.builder()
.addContactPoint("192.168.1.100")
.addContactPoint("192.168.1.102")
.build();
Session session = cluster.connect();
DateTime time = DateTime.now();
BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true);
try {
ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts
String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)";
PreparedStatement preparedStatement = session.prepare(insertQuery);
BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also
//generating the entries
for (int i = 0; i < 900000; i++) { //900000 entries is an hour worth of measurements
time = time.plus(4); //4ms between each entry
BoundStatement bound = preparedStatement.bind("1", "2014-01-01", time.toDate(), 1); //value not important
batch.add(bound);
//The batch statement must have 65535 statements at most
if (batch.size() >= 65534) {
queryQueue.put(batch);
batch = new BatchStatement();
}
}
queryQueue.put(batch); //the last batch, perhaps shorter than 65535
//storing the data
System.out.println("Starting storing");
while (!queryQueue.isEmpty()) {
pool.execute(() -> {
try {
long threadId = Thread.currentThread().getId();
System.out.println("Started: " + threadId);
BatchStatement statement = queryQueue.take();
long start2 = System.currentTimeMillis();
session.execute(statement);
System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2));
} catch (Exception ex) {
System.out.println(ex.toString());
}
});
}
pool.shutdown();
pool.awaitTermination(120,TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println(ex.toString());
} finally {
session.close();
cluster.close();
}
I came up with the code by reading posts here and on other blogs and websites. As I understood it is important for the client to use multiple threads, that's why I have done this. I also tried using async operations.
The bottom line result is this, no matter which approach I use, one batch executes in 5-6 seconds, although it might take up to 10. It takes the same if I enter just one batch (so, only ~65k columns) or if I use a dumb single thread application. Honestly, I expected a bit more. Especially since I get more or less similar performance on my laptop with a local instance.
The second, maybe more important issue, are the exceptions I am facing in an unpredictable manner. These two:
com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra
timeout during write query at consistency ONE (1 replica were required
but only 0 acknowledged the write)
and
com.datastax.driver.core.exceptions.NoHostAvailableException: All
host(s) tried for query failed (tried: /192.168.1.102:9042
(com.datastax.driver.core.TransportException: [/192.168.1.102:9042]
Connection has been closed), /192.168.1.100:9042
(com.datastax.driver.core.TransportException: [/192.168.1.100:9042]
Connection has been closed), /192.168.1.101:9042
(com.datastax.driver.core.TransportException: [/192.168.1.101:9042]
Connection has been closed))
In the bottom line, am I doing something wrong? Should I reorganize the way I load data, or change the scheme. I tried reducing the row length (so I have 12-hour rows) but that didn't make a big difference.
==============================
Update:
I was rude and forgot to paste an example of the code I used after the question was answered. It works reasonably well, however I'm continuing my research with KairosDB and binary transfer with Astyanax. It looks like I can get much better performance with them over CQL, although KairosDB can have some issues when it is in overload (but I'm working on it) and Astyanax is a bit verbose to use for my taste. Nevertheless, here is the code, I'm maybe mistaken somewhere.
The semaphore slot number has no effect on performance when going above 5000, its almost constant.
String insertQuery = "insert into keyspace.measurement (userid,time_by_hour,time,value) values (?, ?, ?, ?)";
PreparedStatement preparedStatement = session.prepare(insertQuery);
Semaphore semaphore = new Semaphore(15000);
System.out.println("Starting " + Thread.currentThread().getId());
DateTime time = DateTime.parse("2015-01-05T12:00:00");
//generating the entries
long start = System.currentTimeMillis();
for (int i = 0; i < 900000; i++) {
BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important
semaphore.acquire();
ResultSetFuture resultSetFuture = session.executeAsync(statement);
Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) {
semaphore.release();
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("Error: " + throwable.toString());
semaphore.release();
}
});
time = time.plus(4); //4ms between each entry
}
See Question&Answers more detail:
os