user_aware.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. package storage
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. )
  11. // UserAwareStorage wraps Storage with user isolation using accountID/bucketID structure
  12. type UserAwareStorage struct {
  13. baseDir string
  14. multipart *MultipartStorage
  15. mutex sync.RWMutex
  16. }
  17. // NewUserAwareStorage creates a storage with user isolation
  18. func NewUserAwareStorage(baseDir string) (*UserAwareStorage, error) {
  19. if err := os.MkdirAll(baseDir, 0755); err != nil {
  20. return nil, err
  21. }
  22. return &UserAwareStorage{
  23. baseDir: baseDir,
  24. multipart: NewMultipartStorage(),
  25. }, nil
  26. }
  27. // getUserDir returns the directory for a specific user
  28. func (s *UserAwareStorage) getUserDir(accountID string) string {
  29. return filepath.Join(s.baseDir, accountID)
  30. }
  31. // getBucketID generates a bucket ID from bucket name
  32. func (s *UserAwareStorage) getBucketID(bucketName string) string {
  33. // Use MD5 hash of bucket name as bucket ID
  34. // This ensures consistent bucket IDs for same bucket names
  35. return bucketName
  36. }
  37. // initUserStorage initializes storage for a user
  38. func (s *UserAwareStorage) initUserStorage(accountID string) error {
  39. userDir := s.getUserDir(accountID)
  40. return os.MkdirAll(userDir, 0755)
  41. }
  42. // CreateBucketForUser creates a bucket for a specific user
  43. func (s *UserAwareStorage) CreateBucketForUser(accountID, bucketName string) (string, error) {
  44. if err := s.initUserStorage(accountID); err != nil {
  45. return "", err
  46. }
  47. bucketID := s.getBucketID(bucketName)
  48. bucketPath := filepath.Join(s.getUserDir(accountID), bucketID)
  49. if _, err := os.Stat(bucketPath); err == nil {
  50. return "", ErrBucketAlreadyExists
  51. }
  52. if err := os.MkdirAll(bucketPath, 0755); err != nil {
  53. return "", err
  54. }
  55. return bucketID, nil
  56. }
  57. // DeleteBucketForUser deletes a bucket for a specific user
  58. func (s *UserAwareStorage) DeleteBucketForUser(accountID, bucketName string) error {
  59. bucketID := s.getBucketID(bucketName)
  60. bucketPath := filepath.Join(s.getUserDir(accountID), bucketID)
  61. return os.RemoveAll(bucketPath)
  62. }
  63. // BucketExistsForUser checks if a bucket exists for a specific user
  64. func (s *UserAwareStorage) BucketExistsForUser(accountID, bucketName string) (bool, error) {
  65. bucketID := s.getBucketID(bucketName)
  66. bucketPath := filepath.Join(s.getUserDir(accountID), bucketID)
  67. _, err := os.Stat(bucketPath)
  68. if os.IsNotExist(err) {
  69. return false, nil
  70. }
  71. if err != nil {
  72. return false, err
  73. }
  74. return true, nil
  75. }
  76. // GetBucketIDForUser gets the bucket ID for a bucket name
  77. func (s *UserAwareStorage) GetBucketIDForUser(accountID, bucketName string) (string, error) {
  78. bucketID := s.getBucketID(bucketName)
  79. bucketPath := filepath.Join(s.getUserDir(accountID), bucketID)
  80. if _, err := os.Stat(bucketPath); os.IsNotExist(err) {
  81. return "", ErrBucketNotFound
  82. }
  83. return bucketID, nil
  84. }
  85. // ListBucketsForUser lists all buckets for a specific user
  86. // This method doesn't work well with the new structure since we only have bucket IDs
  87. // We need to rely on the KV database for bucket name mappings
  88. func (s *UserAwareStorage) ListBucketsForUser(accountID string) ([]BucketInfo, error) {
  89. userDir := s.getUserDir(accountID)
  90. // If user directory doesn't exist, return empty list
  91. if _, err := os.Stat(userDir); os.IsNotExist(err) {
  92. return []BucketInfo{}, nil
  93. }
  94. entries, err := os.ReadDir(userDir)
  95. if err != nil {
  96. return nil, err
  97. }
  98. var buckets []BucketInfo
  99. for _, entry := range entries {
  100. if entry.IsDir() {
  101. info, err := entry.Info()
  102. if err != nil {
  103. continue
  104. }
  105. // entry.Name() is the bucket ID (hash)
  106. buckets = append(buckets, BucketInfo{
  107. Name: entry.Name(), // This is bucket ID, not bucket name
  108. CreationDate: info.ModTime(),
  109. })
  110. }
  111. }
  112. return buckets, nil
  113. }
  114. // PutObjectForUser stores an object for a specific user
  115. func (s *UserAwareStorage) PutObjectForUser(accountID, bucketName, key string, reader io.Reader) error {
  116. bucketID := s.getBucketID(bucketName)
  117. bucketPath := filepath.Join(s.getUserDir(accountID), bucketID)
  118. if _, err := os.Stat(bucketPath); os.IsNotExist(err) {
  119. return ErrBucketNotFound
  120. }
  121. objectPath := filepath.Join(bucketPath, key)
  122. objectDir := filepath.Dir(objectPath)
  123. if err := os.MkdirAll(objectDir, 0755); err != nil {
  124. return err
  125. }
  126. file, err := os.Create(objectPath)
  127. if err != nil {
  128. return err
  129. }
  130. defer file.Close()
  131. _, err = io.Copy(file, reader)
  132. return err
  133. }
  134. // GetObjectForUser retrieves an object for a specific user
  135. func (s *UserAwareStorage) GetObjectForUser(accountID, bucketName, key string) (io.ReadCloser, *ObjectInfo, error) {
  136. bucketID := s.getBucketID(bucketName)
  137. objectPath := filepath.Join(s.getUserDir(accountID), bucketID, key)
  138. fileInfo, err := os.Stat(objectPath)
  139. if os.IsNotExist(err) {
  140. return nil, nil, ErrObjectNotFound
  141. }
  142. if err != nil {
  143. return nil, nil, err
  144. }
  145. file, err := os.Open(objectPath)
  146. if err != nil {
  147. return nil, nil, err
  148. }
  149. info := &ObjectInfo{
  150. Key: key,
  151. Size: fileInfo.Size(),
  152. LastModified: fileInfo.ModTime(),
  153. ETag: generateETag(fileInfo),
  154. }
  155. return file, info, nil
  156. }
  157. // GetObjectByBucketIDForUser retrieves an object using bucket ID directly
  158. func (s *UserAwareStorage) GetObjectByBucketIDForUser(accountID, bucketID, key string) (io.ReadCloser, *ObjectInfo, error) {
  159. objectPath := filepath.Join(s.getUserDir(accountID), bucketID, key)
  160. fileInfo, err := os.Stat(objectPath)
  161. if os.IsNotExist(err) {
  162. return nil, nil, ErrObjectNotFound
  163. }
  164. if err != nil {
  165. return nil, nil, err
  166. }
  167. file, err := os.Open(objectPath)
  168. if err != nil {
  169. return nil, nil, err
  170. }
  171. info := &ObjectInfo{
  172. Key: key,
  173. Size: fileInfo.Size(),
  174. LastModified: fileInfo.ModTime(),
  175. ETag: generateETag(fileInfo),
  176. }
  177. return file, info, nil
  178. }
  179. // DeleteObjectForUser removes an object for a specific user
  180. func (s *UserAwareStorage) DeleteObjectForUser(accountID, bucketName, key string) error {
  181. bucketID := s.getBucketID(bucketName)
  182. objectPath := filepath.Join(s.getUserDir(accountID), bucketID, key)
  183. err := os.Remove(objectPath)
  184. if os.IsNotExist(err) {
  185. return nil
  186. }
  187. return err
  188. }
  189. // ObjectExistsForUser checks if an object exists for a specific user
  190. func (s *UserAwareStorage) ObjectExistsForUser(accountID, bucketName, key string) (bool, error) {
  191. bucketID := s.getBucketID(bucketName)
  192. objectPath := filepath.Join(s.getUserDir(accountID), bucketID, key)
  193. _, err := os.Stat(objectPath)
  194. if os.IsNotExist(err) {
  195. return false, nil
  196. }
  197. if err != nil {
  198. return false, err
  199. }
  200. return true, nil
  201. }
  202. // GetObjectInfoForUser retrieves object metadata for a specific user
  203. func (s *UserAwareStorage) GetObjectInfoForUser(accountID, bucketName, key string) (*ObjectInfo, error) {
  204. bucketID := s.getBucketID(bucketName)
  205. objectPath := filepath.Join(s.getUserDir(accountID), bucketID, key)
  206. fileInfo, err := os.Stat(objectPath)
  207. if os.IsNotExist(err) {
  208. return nil, ErrObjectNotFound
  209. }
  210. if err != nil {
  211. return nil, err
  212. }
  213. return &ObjectInfo{
  214. Key: key,
  215. Size: fileInfo.Size(),
  216. LastModified: fileInfo.ModTime(),
  217. ETag: generateETag(fileInfo),
  218. }, nil
  219. }
  220. // ListObjectsForUser lists all objects in a bucket for a specific user
  221. func (s *UserAwareStorage) ListObjectsForUser(accountID, bucketName string) ([]ObjectInfo, error) {
  222. bucketID := s.getBucketID(bucketName)
  223. bucketPath := filepath.Join(s.getUserDir(accountID), bucketID)
  224. if _, err := os.Stat(bucketPath); os.IsNotExist(err) {
  225. return nil, ErrBucketNotFound
  226. }
  227. var objects []ObjectInfo
  228. // Walk the bucket directory recursively
  229. err := filepath.Walk(bucketPath, func(path string, info os.FileInfo, err error) error {
  230. if err != nil {
  231. return nil // Skip errors
  232. }
  233. if !info.IsDir() {
  234. // Get relative path from bucket
  235. relPath, err := filepath.Rel(bucketPath, path)
  236. if err != nil {
  237. return nil
  238. }
  239. // Convert path separators to forward slashes for S3 compatibility
  240. key := filepath.ToSlash(relPath)
  241. objects = append(objects, ObjectInfo{
  242. Key: key,
  243. Size: info.Size(),
  244. LastModified: info.ModTime(),
  245. ETag: generateETag(info),
  246. })
  247. }
  248. return nil
  249. })
  250. if err != nil {
  251. return nil, err
  252. }
  253. return objects, nil
  254. }
  255. // ListObjectsByBucketIDForUser lists all objects using bucket ID directly
  256. func (s *UserAwareStorage) ListObjectsByBucketIDForUser(accountID, bucketID string) ([]ObjectInfo, error) {
  257. bucketPath := filepath.Join(s.getUserDir(accountID), bucketID)
  258. if _, err := os.Stat(bucketPath); os.IsNotExist(err) {
  259. return nil, ErrBucketNotFound
  260. }
  261. var objects []ObjectInfo
  262. // Walk the bucket directory recursively
  263. err := filepath.Walk(bucketPath, func(path string, info os.FileInfo, err error) error {
  264. if err != nil {
  265. return nil // Skip errors
  266. }
  267. if !info.IsDir() {
  268. // Get relative path from bucket
  269. relPath, err := filepath.Rel(bucketPath, path)
  270. if err != nil {
  271. return nil
  272. }
  273. // Convert path separators to forward slashes for S3 compatibility
  274. key := filepath.ToSlash(relPath)
  275. objects = append(objects, ObjectInfo{
  276. Key: key,
  277. Size: info.Size(),
  278. LastModified: info.ModTime(),
  279. ETag: generateETag(info),
  280. })
  281. }
  282. return nil
  283. })
  284. if err != nil {
  285. return nil, err
  286. }
  287. return objects, nil
  288. }
  289. // GetStorageStatsForUser returns storage statistics for a user
  290. func (s *UserAwareStorage) GetStorageStatsForUser(accountID string) (*StorageStats, error) {
  291. userDir := s.getUserDir(accountID)
  292. stats := &StorageStats{
  293. AccountID: accountID,
  294. }
  295. if _, err := os.Stat(userDir); os.IsNotExist(err) {
  296. return stats, nil
  297. }
  298. // Walk the user directory to calculate stats
  299. entries, err := os.ReadDir(userDir)
  300. if err != nil {
  301. return stats, err
  302. }
  303. for _, entry := range entries {
  304. if entry.IsDir() {
  305. stats.BucketCount++
  306. // Count objects in this bucket
  307. bucketPath := filepath.Join(userDir, entry.Name())
  308. filepath.Walk(bucketPath, func(path string, info os.FileInfo, err error) error {
  309. if err != nil || info.IsDir() {
  310. return nil
  311. }
  312. stats.ObjectCount++
  313. stats.TotalSize += info.Size()
  314. return nil
  315. })
  316. }
  317. }
  318. return stats, nil
  319. }
  320. // Helper function to sanitize paths and prevent directory traversal
  321. func sanitizePath(path string) string {
  322. // Remove any ../ or ./ patterns
  323. path = strings.ReplaceAll(path, "../", "")
  324. path = strings.ReplaceAll(path, "./", "")
  325. return path
  326. }
  327. // Add these methods to UserAwareStorage
  328. // CreateMultipartUpload initiates a multipart upload
  329. func (s *UserAwareStorage) CreateMultipartUpload(accountID, bucket, key string) (string, error) {
  330. return s.multipart.CreateMultipartUpload(accountID, bucket, key)
  331. }
  332. // UploadPart uploads a part for a multipart upload
  333. func (s *UserAwareStorage) UploadPart(uploadID string, partNumber int, data io.Reader) (string, error) {
  334. return s.multipart.UploadPart(uploadID, partNumber, data)
  335. }
  336. // ListParts lists parts of a multipart upload
  337. func (s *UserAwareStorage) ListParts(uploadID string, maxParts, partNumberMarker int) ([]PartInfo, int, bool, error) {
  338. return s.multipart.ListParts(uploadID, maxParts, partNumberMarker)
  339. }
  340. // GetMultipartUpload retrieves multipart upload info
  341. func (s *UserAwareStorage) GetMultipartUpload(uploadID string) (*MultipartUploadInfo, error) {
  342. return s.multipart.GetUpload(uploadID)
  343. }
  344. // AbortMultipartUpload cancels a multipart upload
  345. func (s *UserAwareStorage) AbortMultipartUpload(uploadID string) error {
  346. return s.multipart.AbortMultipartUpload(uploadID)
  347. }
  348. // CompleteMultipartUpload finalizes a multipart upload
  349. func (s *UserAwareStorage) CompleteMultipartUpload(uploadID string, parts []int) error {
  350. // CRITICAL FIX: Get upload info BEFORE completing (which deletes it from active uploads)
  351. upload, err := s.multipart.GetUpload(uploadID)
  352. if err != nil {
  353. return fmt.Errorf("upload not found: %s", uploadID)
  354. }
  355. // Get combined data (this also deletes the upload from active uploads map)
  356. data, etag, err := s.multipart.CompleteMultipartUpload(uploadID, parts)
  357. if err != nil {
  358. return err
  359. }
  360. // Store the final object
  361. reader := bytes.NewReader(data)
  362. if err := s.PutObjectForUser(upload.AccountID, upload.Bucket, upload.Key, reader); err != nil {
  363. return err
  364. }
  365. _ = etag // ETag could be stored in object metadata if needed
  366. return nil
  367. }
  368. // ListMultipartUploads lists in-progress multipart uploads
  369. func (s *UserAwareStorage) ListMultipartUploads(accountID, bucket string, maxUploads int, keyMarker, uploadIDMarker string) ([]MultipartUploadInfo, string, string, bool, error) {
  370. return s.multipart.ListMultipartUploads(accountID, bucket, maxUploads, keyMarker, uploadIDMarker)
  371. }