package storage import ( "fmt" "io" "sync" "time" ) // MultipartUploadInfo stores information about an in-progress multipart upload type MultipartUploadInfo struct { UploadID string Bucket string Key string AccountID string Initiated time.Time Parts map[int]*PartInfo PartsMutex sync.RWMutex } // PartInfo stores information about an uploaded part type PartInfo struct { PartNumber int ETag string Size int64 LastModified time.Time Data []byte // In-memory storage for simplicity } // MultipartStorage manages multipart uploads type MultipartStorage struct { uploads map[string]*MultipartUploadInfo // key: uploadID mutex sync.RWMutex } // NewMultipartStorage creates a new multipart storage manager func NewMultipartStorage() *MultipartStorage { return &MultipartStorage{ uploads: make(map[string]*MultipartUploadInfo), } } // CreateMultipartUpload initiates a new multipart upload func (m *MultipartStorage) CreateMultipartUpload(accountID, bucket, key string) (string, error) { m.mutex.Lock() defer m.mutex.Unlock() uploadID := generateUploadID() upload := &MultipartUploadInfo{ UploadID: uploadID, Bucket: bucket, Key: key, AccountID: accountID, Initiated: time.Now(), Parts: make(map[int]*PartInfo), } m.uploads[uploadID] = upload return uploadID, nil } // UploadPart uploads a single part func (m *MultipartStorage) UploadPart(uploadID string, partNumber int, data io.Reader) (string, error) { m.mutex.RLock() upload, exists := m.uploads[uploadID] m.mutex.RUnlock() if !exists { return "", fmt.Errorf("upload not found: %s", uploadID) } // Read part data partData, err := io.ReadAll(data) if err != nil { return "", fmt.Errorf("failed to read part data: %w", err) } // Calculate ETag (MD5 hash in quotes) etag := calculateETag(partData) // Store part upload.PartsMutex.Lock() upload.Parts[partNumber] = &PartInfo{ PartNumber: partNumber, ETag: etag, Size: int64(len(partData)), LastModified: time.Now(), Data: partData, } upload.PartsMutex.Unlock() return etag, nil } // ListParts lists all uploaded parts for a multipart upload func (m *MultipartStorage) ListParts(uploadID string, maxParts, partNumberMarker int) ([]PartInfo, int, bool, error) { m.mutex.RLock() upload, exists := m.uploads[uploadID] m.mutex.RUnlock() if !exists { return nil, 0, false, fmt.Errorf("upload not found: %s", uploadID) } upload.PartsMutex.RLock() defer upload.PartsMutex.RUnlock() // Collect and sort parts var parts []PartInfo for partNum, part := range upload.Parts { if partNum > partNumberMarker { parts = append(parts, *part) } } // Sort by part number sortPartsByNumber(parts) // Apply pagination isTruncated := false nextMarker := 0 if len(parts) > maxParts { isTruncated = true nextMarker = parts[maxParts-1].PartNumber parts = parts[:maxParts] } return parts, nextMarker, isTruncated, nil } // GetUpload retrieves upload information func (m *MultipartStorage) GetUpload(uploadID string) (*MultipartUploadInfo, error) { m.mutex.RLock() defer m.mutex.RUnlock() upload, exists := m.uploads[uploadID] if !exists { return nil, fmt.Errorf("upload not found: %s", uploadID) } return upload, nil } // AbortMultipartUpload cancels a multipart upload func (m *MultipartStorage) AbortMultipartUpload(uploadID string) error { m.mutex.Lock() defer m.mutex.Unlock() if _, exists := m.uploads[uploadID]; !exists { return fmt.Errorf("upload not found: %s", uploadID) } delete(m.uploads, uploadID) return nil } // CompleteMultipartUpload finalizes a multipart upload func (m *MultipartStorage) CompleteMultipartUpload(uploadID string, parts []int) ([]byte, string, error) { m.mutex.Lock() upload, exists := m.uploads[uploadID] if !exists { m.mutex.Unlock() return nil, "", fmt.Errorf("upload not found: %s", uploadID) } // Remove from active uploads delete(m.uploads, uploadID) m.mutex.Unlock() upload.PartsMutex.RLock() defer upload.PartsMutex.RUnlock() // Validate all parts exist for _, partNum := range parts { if _, exists := upload.Parts[partNum]; !exists { return nil, "", fmt.Errorf("part %d not found", partNum) } } // Combine all parts in order var combinedData []byte for _, partNum := range parts { part := upload.Parts[partNum] combinedData = append(combinedData, part.Data...) } // Calculate final ETag etag := calculateETag(combinedData) return combinedData, etag, nil } // ListMultipartUploads lists all in-progress uploads for a bucket func (m *MultipartStorage) ListMultipartUploads(accountID, bucket string, maxUploads int, keyMarker, uploadIDMarker string) ([]MultipartUploadInfo, string, string, bool, error) { m.mutex.RLock() defer m.mutex.RUnlock() var uploads []MultipartUploadInfo for _, upload := range m.uploads { if upload.AccountID == accountID && upload.Bucket == bucket { // Apply marker filters if keyMarker != "" && upload.Key <= keyMarker { continue } if uploadIDMarker != "" && upload.UploadID <= uploadIDMarker { continue } uploads = append(uploads, *upload) } } // Sort uploads sortUploadsByKey(uploads) // Apply pagination isTruncated := false nextKeyMarker := "" nextUploadIDMarker := "" if len(uploads) > maxUploads { isTruncated = true nextKeyMarker = uploads[maxUploads-1].Key nextUploadIDMarker = uploads[maxUploads-1].UploadID uploads = uploads[:maxUploads] } return uploads, nextKeyMarker, nextUploadIDMarker, isTruncated, nil } // Helper functions func generateUploadID() string { return fmt.Sprintf("%d-%s", time.Now().UnixNano(), randomString(32)) } func calculateETag(data []byte) string { // Simple hash for demonstration - in production use MD5 hash := 0 for _, b := range data { hash = hash*31 + int(b) } return fmt.Sprintf("\"%x\"", hash) } func sortPartsByNumber(parts []PartInfo) { // Simple bubble sort for demonstration n := len(parts) for i := 0; i < n-1; i++ { for j := 0; j < n-i-1; j++ { if parts[j].PartNumber > parts[j+1].PartNumber { parts[j], parts[j+1] = parts[j+1], parts[j] } } } } func sortUploadsByKey(uploads []MultipartUploadInfo) { // Simple bubble sort for demonstration n := len(uploads) for i := 0; i < n-1; i++ { for j := 0; j < n-i-1; j++ { if uploads[j].Key > uploads[j+1].Key { uploads[j], uploads[j+1] = uploads[j+1], uploads[j] } } } } func randomString(length int) string { const charset = "abcdefghijklmnopqrstuvwxyz0123456789" result := make([]byte, length) for i := range result { result[i] = charset[time.Now().UnixNano()%int64(len(charset))] } return string(result) }