Page tree
Skip to end of metadata
Go to start of metadata

Due the nature of Cassandra to distribute across many nodes, and even across different DCs / Availability zones it looks very attractive to use Cassandra tables as a queue - store data in one location, and process it in another one. So Cassandra will solve the transportation layer by itself, and no any other components will be needed.

Goal

Implement a queue analogue based on Cassandra tables.

Concerns

In two words - tombstones and compaction.

Reference: Cassandra anti-patterns: Queues and queue-like datasets

But as they were said - Note that it’s possible to improve on this hypothetical queue scenario. Specifically, when knowing what the last entry was, a consumer can specify the start column and thus somewhat mitigate the effect of tombstones by not having to either 1) start scanning at the beginning of the row and 2) collect and keep all the irrelevant tombstones in memory.

The idea

  • Separate data and statuses to prevent any updates in the Data table
  • Make status records as small as possible
  • Use rounded timestamp for the Status table as a partition key. For example round it by 5 minutes.

  Cassandra_queue-like_design_schema

  • Process data records one by one
  • Change the status record before processing to prevent collisions with other processor instances ("Lock" the record)
  • Change the status again after the data record was processed ("Unlock" the record)

Cassandra_queue-like_design_processing

Sample Data table schema (for RAW Request)
 CREATE TABLE IF NOT EXISTS request (
 id timeuuid,
 body text,
 header MAP<text, text>,
 host text,
 ip inet,
 method text,
 path text,
 query text,
 PRIMARY KEY (id, host, path, method, ip)
) with compaction = {
 'class' : 'DateTieredCompactionStrategy'
};
Sample Status schema
CREATE TABLE IF NOT EXISTS request_status (
 ts timestamp, // 5 minutes rounded timestamp
 status smallint,
 id timeuuid,
 error text,
 PRIMARY KEY (ts, status, id)
) with compaction = {
 'class' : 'DateTieredCompactionStrategy'
};

Sample Batch statement to update Status record
BEGIN BATCH
  // Delete old status
  DELETE FROM request_status
   WHERE ts = 1483056000000 AND status = 0
    AND id = 9892b2ff-ce0c-11e6-b3cf-9801a78e9aeb IF EXISTS;

  // Insert new status
  INSERT INTO capture.request_status (ts, status, id)
   VALUES (1483056000000, 1, 9892b2ff-ce0c-11e6-b3cf-9801a78e9aeb) IF NOT EXISTS;
APPLY BATCH;

For the data consistency always use LOCAL_QUORUM for all statements

  • No labels