123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- package storage
- import (
- "fmt"
- "io"
- "sync"
- "time"
- )
- // MultipartUploadInfo stores information about an in-progress multipart upload
- type MultipartUploadInfo struct {
- UploadID string
- Bucket string
- Key string
- AccountID string
- Initiated time.Time
- Parts map[int]*PartInfo
- PartsMutex sync.RWMutex
- }
- // PartInfo stores information about an uploaded part
- type PartInfo struct {
- PartNumber int
- ETag string
- Size int64
- LastModified time.Time
- Data []byte // In-memory storage for simplicity
- }
- // MultipartStorage manages multipart uploads
- type MultipartStorage struct {
- uploads map[string]*MultipartUploadInfo // key: uploadID
- mutex sync.RWMutex
- }
- // NewMultipartStorage creates a new multipart storage manager
- func NewMultipartStorage() *MultipartStorage {
- return &MultipartStorage{
- uploads: make(map[string]*MultipartUploadInfo),
- }
- }
- // CreateMultipartUpload initiates a new multipart upload
- func (m *MultipartStorage) CreateMultipartUpload(accountID, bucket, key string) (string, error) {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- uploadID := generateUploadID()
- upload := &MultipartUploadInfo{
- UploadID: uploadID,
- Bucket: bucket,
- Key: key,
- AccountID: accountID,
- Initiated: time.Now(),
- Parts: make(map[int]*PartInfo),
- }
- m.uploads[uploadID] = upload
- return uploadID, nil
- }
- // UploadPart uploads a single part
- func (m *MultipartStorage) UploadPart(uploadID string, partNumber int, data io.Reader) (string, error) {
- m.mutex.RLock()
- upload, exists := m.uploads[uploadID]
- m.mutex.RUnlock()
- if !exists {
- return "", fmt.Errorf("upload not found: %s", uploadID)
- }
- // Read part data
- partData, err := io.ReadAll(data)
- if err != nil {
- return "", fmt.Errorf("failed to read part data: %w", err)
- }
- // Calculate ETag (MD5 hash in quotes)
- etag := calculateETag(partData)
- // Store part
- upload.PartsMutex.Lock()
- upload.Parts[partNumber] = &PartInfo{
- PartNumber: partNumber,
- ETag: etag,
- Size: int64(len(partData)),
- LastModified: time.Now(),
- Data: partData,
- }
- upload.PartsMutex.Unlock()
- return etag, nil
- }
- // ListParts lists all uploaded parts for a multipart upload
- func (m *MultipartStorage) ListParts(uploadID string, maxParts, partNumberMarker int) ([]PartInfo, int, bool, error) {
- m.mutex.RLock()
- upload, exists := m.uploads[uploadID]
- m.mutex.RUnlock()
- if !exists {
- return nil, 0, false, fmt.Errorf("upload not found: %s", uploadID)
- }
- upload.PartsMutex.RLock()
- defer upload.PartsMutex.RUnlock()
- // Collect and sort parts
- var parts []PartInfo
- for partNum, part := range upload.Parts {
- if partNum > partNumberMarker {
- parts = append(parts, *part)
- }
- }
- // Sort by part number
- sortPartsByNumber(parts)
- // Apply pagination
- isTruncated := false
- nextMarker := 0
- if len(parts) > maxParts {
- isTruncated = true
- nextMarker = parts[maxParts-1].PartNumber
- parts = parts[:maxParts]
- }
- return parts, nextMarker, isTruncated, nil
- }
- // GetUpload retrieves upload information
- func (m *MultipartStorage) GetUpload(uploadID string) (*MultipartUploadInfo, error) {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- upload, exists := m.uploads[uploadID]
- if !exists {
- return nil, fmt.Errorf("upload not found: %s", uploadID)
- }
- return upload, nil
- }
- // AbortMultipartUpload cancels a multipart upload
- func (m *MultipartStorage) AbortMultipartUpload(uploadID string) error {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- if _, exists := m.uploads[uploadID]; !exists {
- return fmt.Errorf("upload not found: %s", uploadID)
- }
- delete(m.uploads, uploadID)
- return nil
- }
- // CompleteMultipartUpload finalizes a multipart upload
- func (m *MultipartStorage) CompleteMultipartUpload(uploadID string, parts []int) ([]byte, string, error) {
- m.mutex.Lock()
- upload, exists := m.uploads[uploadID]
- if !exists {
- m.mutex.Unlock()
- return nil, "", fmt.Errorf("upload not found: %s", uploadID)
- }
- // Remove from active uploads
- delete(m.uploads, uploadID)
- m.mutex.Unlock()
- upload.PartsMutex.RLock()
- defer upload.PartsMutex.RUnlock()
- // Validate all parts exist
- for _, partNum := range parts {
- if _, exists := upload.Parts[partNum]; !exists {
- return nil, "", fmt.Errorf("part %d not found", partNum)
- }
- }
- // Combine all parts in order
- var combinedData []byte
- for _, partNum := range parts {
- part := upload.Parts[partNum]
- combinedData = append(combinedData, part.Data...)
- }
- // Calculate final ETag
- etag := calculateETag(combinedData)
- return combinedData, etag, nil
- }
- // ListMultipartUploads lists all in-progress uploads for a bucket
- func (m *MultipartStorage) ListMultipartUploads(accountID, bucket string, maxUploads int, keyMarker, uploadIDMarker string) ([]MultipartUploadInfo, string, string, bool, error) {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- var uploads []MultipartUploadInfo
- for _, upload := range m.uploads {
- if upload.AccountID == accountID && upload.Bucket == bucket {
- // Apply marker filters
- if keyMarker != "" && upload.Key <= keyMarker {
- continue
- }
- if uploadIDMarker != "" && upload.UploadID <= uploadIDMarker {
- continue
- }
- uploads = append(uploads, *upload)
- }
- }
- // Sort uploads
- sortUploadsByKey(uploads)
- // Apply pagination
- isTruncated := false
- nextKeyMarker := ""
- nextUploadIDMarker := ""
- if len(uploads) > maxUploads {
- isTruncated = true
- nextKeyMarker = uploads[maxUploads-1].Key
- nextUploadIDMarker = uploads[maxUploads-1].UploadID
- uploads = uploads[:maxUploads]
- }
- return uploads, nextKeyMarker, nextUploadIDMarker, isTruncated, nil
- }
- // Helper functions
- func generateUploadID() string {
- return fmt.Sprintf("%d-%s", time.Now().UnixNano(), randomString(32))
- }
- func calculateETag(data []byte) string {
- // Simple hash for demonstration - in production use MD5
- hash := 0
- for _, b := range data {
- hash = hash*31 + int(b)
- }
- return fmt.Sprintf("\"%x\"", hash)
- }
- func sortPartsByNumber(parts []PartInfo) {
- // Simple bubble sort for demonstration
- n := len(parts)
- for i := 0; i < n-1; i++ {
- for j := 0; j < n-i-1; j++ {
- if parts[j].PartNumber > parts[j+1].PartNumber {
- parts[j], parts[j+1] = parts[j+1], parts[j]
- }
- }
- }
- }
- func sortUploadsByKey(uploads []MultipartUploadInfo) {
- // Simple bubble sort for demonstration
- n := len(uploads)
- for i := 0; i < n-1; i++ {
- for j := 0; j < n-i-1; j++ {
- if uploads[j].Key > uploads[j+1].Key {
- uploads[j], uploads[j+1] = uploads[j+1], uploads[j]
- }
- }
- }
- }
- func randomString(length int) string {
- const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
- result := make([]byte, length)
- for i := range result {
- result[i] = charset[time.Now().UnixNano()%int64(len(charset))]
- }
- return string(result)
- }
|