multipart.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package storage
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "time"
  7. )
  8. // MultipartUploadInfo stores information about an in-progress multipart upload
  9. type MultipartUploadInfo struct {
  10. UploadID string
  11. Bucket string
  12. Key string
  13. AccountID string
  14. Initiated time.Time
  15. Parts map[int]*PartInfo
  16. PartsMutex sync.RWMutex
  17. }
  18. // PartInfo stores information about an uploaded part
  19. type PartInfo struct {
  20. PartNumber int
  21. ETag string
  22. Size int64
  23. LastModified time.Time
  24. Data []byte // In-memory storage for simplicity
  25. }
  26. // MultipartStorage manages multipart uploads
  27. type MultipartStorage struct {
  28. uploads map[string]*MultipartUploadInfo // key: uploadID
  29. mutex sync.RWMutex
  30. }
  31. // NewMultipartStorage creates a new multipart storage manager
  32. func NewMultipartStorage() *MultipartStorage {
  33. return &MultipartStorage{
  34. uploads: make(map[string]*MultipartUploadInfo),
  35. }
  36. }
  37. // CreateMultipartUpload initiates a new multipart upload
  38. func (m *MultipartStorage) CreateMultipartUpload(accountID, bucket, key string) (string, error) {
  39. m.mutex.Lock()
  40. defer m.mutex.Unlock()
  41. uploadID := generateUploadID()
  42. upload := &MultipartUploadInfo{
  43. UploadID: uploadID,
  44. Bucket: bucket,
  45. Key: key,
  46. AccountID: accountID,
  47. Initiated: time.Now(),
  48. Parts: make(map[int]*PartInfo),
  49. }
  50. m.uploads[uploadID] = upload
  51. return uploadID, nil
  52. }
  53. // UploadPart uploads a single part
  54. func (m *MultipartStorage) UploadPart(uploadID string, partNumber int, data io.Reader) (string, error) {
  55. m.mutex.RLock()
  56. upload, exists := m.uploads[uploadID]
  57. m.mutex.RUnlock()
  58. if !exists {
  59. return "", fmt.Errorf("upload not found: %s", uploadID)
  60. }
  61. // Read part data
  62. partData, err := io.ReadAll(data)
  63. if err != nil {
  64. return "", fmt.Errorf("failed to read part data: %w", err)
  65. }
  66. // Calculate ETag (MD5 hash in quotes)
  67. etag := calculateETag(partData)
  68. // Store part
  69. upload.PartsMutex.Lock()
  70. upload.Parts[partNumber] = &PartInfo{
  71. PartNumber: partNumber,
  72. ETag: etag,
  73. Size: int64(len(partData)),
  74. LastModified: time.Now(),
  75. Data: partData,
  76. }
  77. upload.PartsMutex.Unlock()
  78. return etag, nil
  79. }
  80. // ListParts lists all uploaded parts for a multipart upload
  81. func (m *MultipartStorage) ListParts(uploadID string, maxParts, partNumberMarker int) ([]PartInfo, int, bool, error) {
  82. m.mutex.RLock()
  83. upload, exists := m.uploads[uploadID]
  84. m.mutex.RUnlock()
  85. if !exists {
  86. return nil, 0, false, fmt.Errorf("upload not found: %s", uploadID)
  87. }
  88. upload.PartsMutex.RLock()
  89. defer upload.PartsMutex.RUnlock()
  90. // Collect and sort parts
  91. var parts []PartInfo
  92. for partNum, part := range upload.Parts {
  93. if partNum > partNumberMarker {
  94. parts = append(parts, *part)
  95. }
  96. }
  97. // Sort by part number
  98. sortPartsByNumber(parts)
  99. // Apply pagination
  100. isTruncated := false
  101. nextMarker := 0
  102. if len(parts) > maxParts {
  103. isTruncated = true
  104. nextMarker = parts[maxParts-1].PartNumber
  105. parts = parts[:maxParts]
  106. }
  107. return parts, nextMarker, isTruncated, nil
  108. }
  109. // GetUpload retrieves upload information
  110. func (m *MultipartStorage) GetUpload(uploadID string) (*MultipartUploadInfo, error) {
  111. m.mutex.RLock()
  112. defer m.mutex.RUnlock()
  113. upload, exists := m.uploads[uploadID]
  114. if !exists {
  115. return nil, fmt.Errorf("upload not found: %s", uploadID)
  116. }
  117. return upload, nil
  118. }
  119. // AbortMultipartUpload cancels a multipart upload
  120. func (m *MultipartStorage) AbortMultipartUpload(uploadID string) error {
  121. m.mutex.Lock()
  122. defer m.mutex.Unlock()
  123. if _, exists := m.uploads[uploadID]; !exists {
  124. return fmt.Errorf("upload not found: %s", uploadID)
  125. }
  126. delete(m.uploads, uploadID)
  127. return nil
  128. }
  129. // CompleteMultipartUpload finalizes a multipart upload
  130. func (m *MultipartStorage) CompleteMultipartUpload(uploadID string, parts []int) ([]byte, string, error) {
  131. m.mutex.Lock()
  132. upload, exists := m.uploads[uploadID]
  133. if !exists {
  134. m.mutex.Unlock()
  135. return nil, "", fmt.Errorf("upload not found: %s", uploadID)
  136. }
  137. // Remove from active uploads
  138. delete(m.uploads, uploadID)
  139. m.mutex.Unlock()
  140. upload.PartsMutex.RLock()
  141. defer upload.PartsMutex.RUnlock()
  142. // Validate all parts exist
  143. for _, partNum := range parts {
  144. if _, exists := upload.Parts[partNum]; !exists {
  145. return nil, "", fmt.Errorf("part %d not found", partNum)
  146. }
  147. }
  148. // Combine all parts in order
  149. var combinedData []byte
  150. for _, partNum := range parts {
  151. part := upload.Parts[partNum]
  152. combinedData = append(combinedData, part.Data...)
  153. }
  154. // Calculate final ETag
  155. etag := calculateETag(combinedData)
  156. return combinedData, etag, nil
  157. }
  158. // ListMultipartUploads lists all in-progress uploads for a bucket
  159. func (m *MultipartStorage) ListMultipartUploads(accountID, bucket string, maxUploads int, keyMarker, uploadIDMarker string) ([]MultipartUploadInfo, string, string, bool, error) {
  160. m.mutex.RLock()
  161. defer m.mutex.RUnlock()
  162. var uploads []MultipartUploadInfo
  163. for _, upload := range m.uploads {
  164. if upload.AccountID == accountID && upload.Bucket == bucket {
  165. // Apply marker filters
  166. if keyMarker != "" && upload.Key <= keyMarker {
  167. continue
  168. }
  169. if uploadIDMarker != "" && upload.UploadID <= uploadIDMarker {
  170. continue
  171. }
  172. uploads = append(uploads, *upload)
  173. }
  174. }
  175. // Sort uploads
  176. sortUploadsByKey(uploads)
  177. // Apply pagination
  178. isTruncated := false
  179. nextKeyMarker := ""
  180. nextUploadIDMarker := ""
  181. if len(uploads) > maxUploads {
  182. isTruncated = true
  183. nextKeyMarker = uploads[maxUploads-1].Key
  184. nextUploadIDMarker = uploads[maxUploads-1].UploadID
  185. uploads = uploads[:maxUploads]
  186. }
  187. return uploads, nextKeyMarker, nextUploadIDMarker, isTruncated, nil
  188. }
  189. // Helper functions
  190. func generateUploadID() string {
  191. return fmt.Sprintf("%d-%s", time.Now().UnixNano(), randomString(32))
  192. }
  193. func calculateETag(data []byte) string {
  194. // Simple hash for demonstration - in production use MD5
  195. hash := 0
  196. for _, b := range data {
  197. hash = hash*31 + int(b)
  198. }
  199. return fmt.Sprintf("\"%x\"", hash)
  200. }
  201. func sortPartsByNumber(parts []PartInfo) {
  202. // Simple bubble sort for demonstration
  203. n := len(parts)
  204. for i := 0; i < n-1; i++ {
  205. for j := 0; j < n-i-1; j++ {
  206. if parts[j].PartNumber > parts[j+1].PartNumber {
  207. parts[j], parts[j+1] = parts[j+1], parts[j]
  208. }
  209. }
  210. }
  211. }
  212. func sortUploadsByKey(uploads []MultipartUploadInfo) {
  213. // Simple bubble sort for demonstration
  214. n := len(uploads)
  215. for i := 0; i < n-1; i++ {
  216. for j := 0; j < n-i-1; j++ {
  217. if uploads[j].Key > uploads[j+1].Key {
  218. uploads[j], uploads[j+1] = uploads[j+1], uploads[j]
  219. }
  220. }
  221. }
  222. }
  223. func randomString(length int) string {
  224. const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
  225. result := make([]byte, length)
  226. for i := range result {
  227. result[i] = charset[time.Now().UnixNano()%int64(len(charset))]
  228. }
  229. return string(result)
  230. }