Created
May 2, 2017 14:50
-
-
Save akshaydeo/65dacbb8276e0645715469ada4a82078 to your computer and use it in GitHub Desktop.
CQLExtractor files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cqextractor | |
import ( | |
"fmt" | |
"time" | |
"github.com/media-net/cargo/cassandra" | |
"github.com/media-net/cargo/utils" | |
"github.com/gocql/gocql" | |
"github.com/media-net/cargo/logger" | |
"strings" | |
"sync" | |
) | |
const ( | |
METADATA_TABLE_QUERY_FORMAT = "CREATE TABLE IF NOT EXISTS cqmd_%s (state_id text, page_state text, time_added timestamp, PRIMARY KEY(state_id, time_added)) WITH CLUSTERING ORDER BY (time_added DESC)" | |
GET_LATEST_TIME_ADDED = "SELECT max(time_added) FROM cqmd_%s" | |
GET_LATEST_STATE_QUERY_FORMAT = "SELECT page_state FROM cqmd_%s WHERE time_added = ? ALLOW FILTERING" | |
SAVE_LATEST_STATE_QUERY_FORMAT = "INSERT INTO cqmd_%s (state_id, page_state, time_added) VALUES (?,?,?)" | |
FLUSH_STATE_QUERY_FORMAT = "DELETE FROM cqmd_%s WHERE state_id = '1' and time_added < ?" | |
) | |
type CQLExtractorConfig struct { | |
Id string | |
Keyspace string | |
Addresses []string | |
TableName string | |
RecordLimit int | |
ThrottleTimeInMs int | |
/* | |
These dates are for batch extraction from cassandra, | |
not for the real time | |
TODO shall we write different extractor? | |
"2017-04-18 10:00:23" | |
*/ | |
StartDate string | |
EndDate string | |
} | |
// cassandra executor | |
type CQLExtractor struct { | |
config *CQLExtractorConfig | |
cassandra *cassandra.Cassandra | |
DataChan chan interface{} `json:"-"` | |
ErrorChan chan error `json:"-"` | |
Key string `json:"key"` | |
TableName string `json:"event_type"` | |
LastPageState string `json:"last_page_state"` | |
RecordLimit int `json:"limit"` | |
ThrottleTimeInMs int `json:"sleep_time_ms"` | |
shouldStop bool | |
sync.Mutex | |
/* | |
These dates are for batch extraction from cassandra, | |
not for the real time | |
StartDate is inclusive, EndDate is exclusive | |
*/ | |
StartDate string `json:"start_date"` | |
EndDate string `json:"end_date"` | |
} | |
func NewCQLExtractor(config CQLExtractorConfig) (*CQLExtractor, error) { | |
var err error | |
ext := new(CQLExtractor) | |
ext.config = &config | |
ext.DataChan = make(chan interface{}) | |
ext.ErrorChan = make(chan error) | |
ext.Key = fmt.Sprintf("%s_%s", config.TableName, config.Id) | |
ext.cassandra, err = cassandra.Connect(config.Keyspace, config.Addresses) | |
if err != nil { | |
return nil, err | |
} | |
ext.TableName = config.TableName | |
err = ext.cassandra.Exec(fmt.Sprintf(METADATA_TABLE_QUERY_FORMAT, ext.Key)) | |
if err != nil { | |
return nil, err | |
} | |
ext.LastPageState, err = ext.getSavedPageState() | |
if err != nil { | |
return nil, err | |
} | |
ext.RecordLimit = config.RecordLimit | |
ext.ThrottleTimeInMs = config.ThrottleTimeInMs | |
ext.StartDate = config.StartDate | |
ext.EndDate = config.EndDate | |
return ext, err | |
} | |
func (c *CQLExtractor) getSavedPageState() (string, error) { | |
result := map[string]interface{}{} | |
err := c.cassandra.GetSession().Query(fmt.Sprintf(GET_LATEST_TIME_ADDED, c.Key)).MapScan(result) | |
if err != nil { | |
if strings.Contains(err.Error(), "not found") { | |
return "", nil | |
} | |
logger.E("Error while getting saved page state", err) | |
return "", err | |
} | |
logger.D("##testingthisshit", result["system.max(time_added)"]) | |
err = c.cassandra.GetSession().Query(fmt.Sprintf(GET_LATEST_STATE_QUERY_FORMAT, c.Key), result["system.max(time_added)"].(time.Time)).MapScan(result) | |
if err != nil { | |
if strings.Contains(err.Error(), "not found") { | |
return "", nil | |
} | |
logger.E("Error while getting saved page state", err) | |
return "", err | |
} | |
return string(utils.DecodeBase64(result["page_state"].(string))), nil | |
} | |
func (c *CQLExtractor) savePageState(state string) error { | |
now := time.Now() | |
logger.D("Saving latest state", state) | |
err := c.cassandra.Exec(fmt.Sprintf(SAVE_LATEST_STATE_QUERY_FORMAT, c.Key), "1", state, now) | |
if err != nil { | |
logger.E("Error while saving latest state") | |
return err | |
} | |
// now flushing the older states | |
err = c.cassandra.Exec(fmt.Sprintf(FLUSH_STATE_QUERY_FORMAT, c.Key), now) | |
if err != nil { | |
logger.E("Error while flushing the older states", err) | |
} | |
return nil | |
} | |
func (c *CQLExtractor) Extract() (<-chan interface{}, <-chan error) { | |
go func() { | |
ticker := time.NewTimer(time.Duration(c.ThrottleTimeInMs) * time.Millisecond) | |
for { | |
select { | |
case <-ticker.C: | |
c.extractData() | |
ticker.Reset(time.Duration(c.ThrottleTimeInMs) * time.Millisecond) | |
} | |
c.Lock() | |
if c.shouldStop { | |
c.Unlock() | |
return | |
} | |
c.Unlock() | |
} | |
}() | |
return c.DataChan, c.ErrorChan | |
} | |
func (c *CQLExtractor) getQuery() string { | |
if c.StartDate == "" || c.EndDate == "" { | |
return fmt.Sprintf("SELECT * from %s ALLOW FILTERING", c.TableName) | |
} else { | |
return fmt.Sprintf("SELECT * from %s WHERE time_added >= '%s' and time_added < '%s' ALLOW FILTERING", | |
c.TableName, c.StartDate, c.EndDate) | |
} | |
} | |
func (c *CQLExtractor) extractData() { | |
var iter *gocql.Iter | |
iter = c.cassandra.GetSession().Query(c.getQuery()).PageState([]byte(c.LastPageState)).PageSize(c.RecordLimit).Iter() | |
logger.D(iter.PageState()) | |
dataMap, err := iter.SliceMap() | |
if err != nil { | |
c.ErrorChan <- err | |
return | |
} | |
dataMapLen := len(dataMap) | |
if dataMapLen < 1 { | |
//logger.D("Got", dataMapLen, "records") | |
return | |
} | |
if dataMapLen < c.RecordLimit { | |
logger.W("Got less records than the page limit, refiring query for lower limit", dataMapLen) | |
iter = c.cassandra.GetSession().Query(c.getQuery()).PageState([]byte(c.LastPageState)).PageSize(dataMapLen).Iter() | |
dataMap, err = iter.SliceMap() | |
if err != nil { | |
c.ErrorChan <- err | |
return | |
} | |
} | |
logger.D(iter.PageState()) | |
c.LastPageState = string(iter.PageState()) | |
for i := 0; i < dataMapLen; i++ { | |
val := dataMap[i] | |
msg, err := utils.ToJson(&val) | |
if err != nil { | |
c.ErrorChan <- err | |
} | |
c.DataChan <- *msg | |
} | |
err = c.savePageState(utils.Base64([]byte(c.LastPageState))) | |
if err != nil { | |
c.ErrorChan <- err | |
} | |
err = iter.Close() | |
if err != nil { | |
c.ErrorChan <- err | |
} | |
} | |
func (c *CQLExtractor) Stop() { | |
c.Lock() | |
defer c.Unlock() | |
c.shouldStop = true | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package extractor | |
import ( | |
. "github.com/onsi/ginkgo" | |
. "github.com/onsi/gomega" | |
. "github.com/media-net/cargo/etl/extractor/cqextractor" | |
"github.com/media-net/cargo/cassandra" | |
"github.com/media-net/cargo/logger" | |
"github.com/media-net/cargo/utils" | |
"math/rand" | |
"time" | |
"fmt" | |
) | |
const ( | |
ENTRY_COUNT = 100 | |
TABLE_NAME = "test1" | |
) | |
var _ = FDescribe("Extractors", func() { | |
var e *CQLExtractor | |
var c *cassandra.Cassandra | |
BeforeEach(func() { | |
var err error | |
c, err = cassandra.Connect("test", []string{"127.0.0.1:9042"}) | |
Expect(err).NotTo(HaveOccurred()) | |
// dropping the table | |
c.Exec("DROP TABLE test1") | |
c.Exec("DROP TABLE cqmd_test1_test") | |
c.Exec("DROP TABLE cqmd_test_test") | |
// pushing records into cassandra | |
err = c.Exec("CREATE TABLE test1 (data int, time_added timestamp, PRIMARY KEY(data, time_added)) WITH CLUSTERING ORDER BY (time_added ASC)") | |
Expect(err).NotTo(HaveOccurred()) | |
// pushing raw data | |
for i := 0; i < ENTRY_COUNT; i++ { | |
err = c.Exec("INSERT INTO test1 (data, time_added) VALUES (?,?)", i, time.Now()) | |
Expect(err).NotTo(HaveOccurred()) | |
} | |
e, err = NewCQLExtractor(CQLExtractorConfig{ | |
Id: "test", | |
TableName: "test1", | |
RecordLimit: 1, | |
ThrottleTimeInMs: 100, | |
Keyspace: "test", | |
Addresses: []string{"127.0.0.1:9042"}, | |
}) | |
Expect(err).NotTo(HaveOccurred()) | |
}) | |
Describe("CQLExtractor", func() { | |
It("should extract the records as per the defined config", func() { | |
varmap := make([]int8, ENTRY_COUNT) | |
count := 0 | |
tmp := struct { | |
Data int `json:"data"` | |
}{} | |
data, err := e.Extract() | |
for count < ENTRY_COUNT { | |
select { | |
case d := <-data: | |
err := utils.FromJson(d.(string), &tmp) | |
Expect(err).NotTo(HaveOccurred()) | |
if varmap[tmp.Data] == 1 { | |
Fail("Got the same record twice") | |
} | |
varmap[tmp.Data] = 1 | |
count += 1 | |
case e := <-err: | |
Fail(e.Error()) | |
} | |
} | |
}) | |
It("should save the state of the last read messages and should be used across different sessions", func() { | |
var err error | |
varmap := make([]int8, ENTRY_COUNT) | |
count := 0 | |
tmp := struct { | |
Data int `json:"data"` | |
}{} | |
data, errChan := e.Extract() | |
firstIterLength := rand.Intn(ENTRY_COUNT) | |
for count < firstIterLength { | |
select { | |
case d := <-data: | |
err := utils.FromJson(d.(string), &tmp) | |
Expect(err).NotTo(HaveOccurred()) | |
if varmap[tmp.Data] == 1 { | |
Fail("Got the same record twice") | |
} | |
varmap[tmp.Data] = 1 | |
logger.D(varmap[tmp.Data]) | |
count += 1 | |
case e := <-errChan: | |
Fail(e.Error()) | |
} | |
} | |
// now renewing the session | |
e, err = NewCQLExtractor(CQLExtractorConfig{ | |
Id: "test", | |
TableName: "test1", | |
RecordLimit: 1, | |
ThrottleTimeInMs: 100, | |
Keyspace: "test", | |
Addresses: []string{"127.0.0.1:9042"}, | |
}) | |
Expect(err).NotTo(HaveOccurred()) | |
data, errChan = e.Extract() | |
for count < ENTRY_COUNT { | |
select { | |
case d := <-data: | |
err := utils.FromJson(d.(string), &tmp) | |
Expect(err).NotTo(HaveOccurred()) | |
if varmap[tmp.Data] == 1 { | |
Fail("Got the same record twice") | |
} | |
varmap[tmp.Data] = 1 | |
count += 1 | |
case e := <-errChan: | |
Fail(e.Error()) | |
} | |
} | |
}) | |
It("should not affect the records after restarting", func() { | |
varmap := make([]int8, ENTRY_COUNT) | |
count := 0 | |
tmp := struct { | |
Data int `json:"data"` | |
}{} | |
data, errChan := e.Extract() | |
firstIterLength := rand.Intn(ENTRY_COUNT) | |
for count < firstIterLength { | |
select { | |
case d := <-data: | |
err := utils.FromJson(d.(string), &tmp) | |
Expect(err).NotTo(HaveOccurred()) | |
if varmap[tmp.Data] == 1 { | |
Fail("Got the same record twice") | |
} | |
varmap[tmp.Data] = 1 | |
logger.D(varmap[tmp.Data]) | |
count += 1 | |
case e := <-errChan: | |
Fail(e.Error()) | |
} | |
} | |
for count < ENTRY_COUNT { | |
select { | |
case d := <-data: | |
err := utils.FromJson(d.(string), &tmp) | |
Expect(err).NotTo(HaveOccurred()) | |
if varmap[tmp.Data] == 1 { | |
Fail("Got the same record twice") | |
} | |
varmap[tmp.Data] = 1 | |
count += 1 | |
case e := <-errChan: | |
Fail(e.Error()) | |
} | |
} | |
}) | |
Context("with uneven record sets fetching", func() { | |
BeforeEach(func() { | |
var err error | |
e, err = NewCQLExtractor(CQLExtractorConfig{ | |
Id: "test", | |
TableName: "test1", | |
RecordLimit: 10, | |
ThrottleTimeInMs: 100, | |
Keyspace: "test", | |
Addresses: []string{"127.0.0.1:9042"}, | |
}) | |
Expect(err).NotTo(HaveOccurred()) | |
}) | |
// failing test case | |
FIt("should save last page state properly", func() { | |
varmap := make([]int8, 2*ENTRY_COUNT) | |
count := 0 | |
tmp := struct { | |
Data int `json:"data"` | |
}{} | |
data, errChan := e.Extract() | |
By("Fetching all the pushed records") | |
for count < ENTRY_COUNT { | |
select { | |
case d := <-data: | |
err := utils.FromJson(d.(string), &tmp) | |
Expect(err).NotTo(HaveOccurred()) | |
fmt.Println("Got", tmp.Data) | |
if varmap[tmp.Data] == 1 { | |
Fail("Got the same record twice") | |
} | |
varmap[tmp.Data] = 1 | |
count += 1 | |
case e := <-errChan: | |
Fail(e.Error()) | |
} | |
} | |
By("Comparing the fetched records to the count") | |
Expect(count).To(Equal(100)) | |
By("Pushing some extra records inside cassandra") | |
for i := 0; i < ENTRY_COUNT; i++ { | |
err := c.Exec("INSERT INTO test1 (data, time_added) VALUES (?,?)", ENTRY_COUNT+i, time.Now()) | |
Expect(err).NotTo(HaveOccurred()) | |
} | |
By("Fetching them out") | |
for count < 2*ENTRY_COUNT { | |
select { | |
case d := <-data: | |
err := utils.FromJson(d.(string), &tmp) | |
Expect(err).NotTo(HaveOccurred()) | |
fmt.Println("Got", tmp.Data) | |
if varmap[tmp.Data] == 1 { | |
Fail("Got the same record twice") | |
} | |
varmap[tmp.Data] = 1 | |
count += 1 | |
case e := <-errChan: | |
Fail(e.Error()) | |
} | |
} | |
Expect(count).To(Equal(200)) | |
}) | |
}) | |
}) | |
AfterEach(func() { | |
// removing all records from cassandra | |
c.Exec("DROP TABLE test1") | |
c.Exec("DROP TABLE cqmd_test1_test") | |
c.Exec("DROP TABLE cqmd_test_test") | |
c.Close() | |
}) | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment