dbleveldb.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package dbleveldb
  2. import (
  3. "encoding/json"
  4. "log"
  5. "path/filepath"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/syndtr/goleveldb/leveldb"
  10. "github.com/syndtr/goleveldb/leveldb/util"
  11. "imuslab.com/zoraxy/mod/database/dbinc"
  12. )
  13. // Ensure the DB struct implements the Backend interface
  14. var _ dbinc.Backend = (*DB)(nil)
  15. type DB struct {
  16. db *leveldb.DB
  17. Table sync.Map //For emulating table creation
  18. batch leveldb.Batch //Batch write
  19. writeFlushTicker *time.Ticker //Ticker for flushing data into disk
  20. writeFlushStop chan bool //Stop channel for write flush ticker
  21. }
  22. func NewDB(path string) (*DB, error) {
  23. //If the path is not a directory (e.g. /tmp/dbfile.db), convert the filename to directory
  24. if filepath.Ext(path) != "" {
  25. path = strings.ReplaceAll(path, ".", "_")
  26. }
  27. db, err := leveldb.OpenFile(path, nil)
  28. if err != nil {
  29. return nil, err
  30. }
  31. thisDB := &DB{
  32. db: db,
  33. Table: sync.Map{},
  34. batch: leveldb.Batch{},
  35. }
  36. //Create a ticker to flush data into disk every 1 seconds
  37. writeFlushTicker := time.NewTicker(1 * time.Second)
  38. writeFlushStop := make(chan bool)
  39. go func() {
  40. for {
  41. select {
  42. case <-writeFlushTicker.C:
  43. if thisDB.batch.Len() == 0 {
  44. //No flushing needed
  45. continue
  46. }
  47. err = db.Write(&thisDB.batch, nil)
  48. if err != nil {
  49. log.Println("[LevelDB] Failed to flush data into disk: ", err)
  50. }
  51. thisDB.batch.Reset()
  52. case <-writeFlushStop:
  53. return
  54. }
  55. }
  56. }()
  57. thisDB.writeFlushTicker = writeFlushTicker
  58. thisDB.writeFlushStop = writeFlushStop
  59. return thisDB, nil
  60. }
  61. func (d *DB) NewTable(tableName string) error {
  62. //Create a table entry in the sync.Map
  63. d.Table.Store(tableName, true)
  64. return nil
  65. }
  66. func (d *DB) TableExists(tableName string) bool {
  67. _, ok := d.Table.Load(tableName)
  68. return ok
  69. }
  70. func (d *DB) DropTable(tableName string) error {
  71. d.Table.Delete(tableName)
  72. iter := d.db.NewIterator(nil, nil)
  73. defer iter.Release()
  74. for iter.Next() {
  75. key := iter.Key()
  76. if filepath.Dir(string(key)) == tableName {
  77. err := d.db.Delete(key, nil)
  78. if err != nil {
  79. return err
  80. }
  81. }
  82. }
  83. return nil
  84. }
  85. func (d *DB) Write(tableName string, key string, value interface{}) error {
  86. data, err := json.Marshal(value)
  87. if err != nil {
  88. return err
  89. }
  90. d.batch.Put([]byte(filepath.ToSlash(filepath.Join(tableName, key))), data)
  91. return nil
  92. }
  93. func (d *DB) Read(tableName string, key string, assignee interface{}) error {
  94. data, err := d.db.Get([]byte(filepath.ToSlash(filepath.Join(tableName, key))), nil)
  95. if err != nil {
  96. return err
  97. }
  98. return json.Unmarshal(data, assignee)
  99. }
  100. func (d *DB) KeyExists(tableName string, key string) bool {
  101. _, err := d.db.Get([]byte(filepath.ToSlash(filepath.Join(tableName, key))), nil)
  102. return err == nil
  103. }
  104. func (d *DB) Delete(tableName string, key string) error {
  105. return d.db.Delete([]byte(filepath.ToSlash(filepath.Join(tableName, key))), nil)
  106. }
  107. func (d *DB) ListTable(tableName string) ([][][]byte, error) {
  108. iter := d.db.NewIterator(util.BytesPrefix([]byte(tableName+"/")), nil)
  109. defer iter.Release()
  110. var result [][][]byte
  111. for iter.Next() {
  112. key := iter.Key()
  113. //The key contains the table name as prefix. Trim it before returning
  114. value := iter.Value()
  115. result = append(result, [][]byte{[]byte(strings.TrimPrefix(string(key), tableName+"/")), value})
  116. }
  117. err := iter.Error()
  118. if err != nil {
  119. return nil, err
  120. }
  121. return result, nil
  122. }
  123. func (d *DB) Close() {
  124. //Write the remaining data in batch back into disk
  125. d.writeFlushStop <- true
  126. d.writeFlushTicker.Stop()
  127. d.db.Write(&d.batch, nil)
  128. d.db.Close()
  129. }