s3.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003
  1. package handler
  2. import (
  3. "encoding/xml"
  4. "fmt"
  5. "io"
  6. "log"
  7. "net/http"
  8. "strings"
  9. "time"
  10. "aws-sts-mock/internal/service"
  11. "aws-sts-mock/internal/storage"
  12. "aws-sts-mock/pkg/checksum"
  13. "aws-sts-mock/pkg/s3"
  14. "aws-sts-mock/pkg/sigv4"
  15. )
  16. // S3Handler handles S3 HTTP requests with user isolation
  17. type S3Handler struct {
  18. service *service.S3Service
  19. }
  20. // NewS3Handler creates a new S3 handler
  21. func NewS3Handler(service *service.S3Service) *S3Handler {
  22. return &S3Handler{
  23. service: service,
  24. }
  25. }
  26. // Handle routes S3 requests to appropriate handlers
  27. func (h *S3Handler) Handle(w http.ResponseWriter, r *http.Request) {
  28. path := strings.TrimPrefix(r.URL.Path, "/")
  29. parts := strings.SplitN(path, "/", 2)
  30. // Root path - list buckets
  31. if path == "" {
  32. if r.Method == "GET" {
  33. h.handleListBuckets(w, r)
  34. } else {
  35. h.writeError(w, "MethodNotAllowed", "Method not allowed", "", http.StatusMethodNotAllowed)
  36. }
  37. return
  38. }
  39. bucketName := parts[0]
  40. objectKey := ""
  41. if len(parts) > 1 {
  42. objectKey = parts[1]
  43. }
  44. // Bucket operations (no object key)
  45. if objectKey == "" {
  46. h.handleBucketOperation(w, r, bucketName)
  47. return
  48. }
  49. // Object operations
  50. h.handleObjectOperation(w, r, bucketName, objectKey)
  51. }
  52. // getUserFromContext extracts user credentials from request context
  53. func (h *S3Handler) getUserFromContext(r *http.Request) (sigv4.AWSCredentials, error) {
  54. creds, ok := r.Context().Value(sigv4.CredentialsContextKey).(sigv4.AWSCredentials)
  55. if !ok {
  56. return sigv4.AWSCredentials{}, fmt.Errorf("failed to retrieve credentials")
  57. }
  58. return creds, nil
  59. }
  60. func (h *S3Handler) handleBucketOperation(w http.ResponseWriter, r *http.Request, bucketName string) {
  61. query := r.URL.Query()
  62. // Check for list multipart uploads
  63. if query.Has("uploads") && r.Method == "GET" {
  64. h.handleListMultipartUploads(w, r, bucketName)
  65. return
  66. }
  67. // Check for delete query parameter (for DeleteObjects)
  68. if r.Method == "POST" && query.Get("delete") != "" {
  69. h.handleDeleteObjects(w, r, bucketName)
  70. return
  71. }
  72. switch r.Method {
  73. case "PUT":
  74. h.handleCreateBucket(w, r, bucketName)
  75. case "GET":
  76. h.handleListObjects(w, r, bucketName)
  77. case "DELETE":
  78. h.handleDeleteBucket(w, r, bucketName)
  79. case "HEAD":
  80. h.handleHeadBucket(w, r, bucketName)
  81. default:
  82. h.writeError(w, "MethodNotAllowed", "Method not allowed", bucketName, http.StatusMethodNotAllowed)
  83. }
  84. }
  85. func (h *S3Handler) handleObjectOperation(w http.ResponseWriter, r *http.Request, bucketName, objectKey string) {
  86. query := r.URL.Query()
  87. // Check for multipart operations
  88. if query.Has("uploads") && r.Method == "POST" {
  89. h.handleCreateMultipartUpload(w, r, bucketName, objectKey)
  90. return
  91. }
  92. uploadID := query.Get("uploadId")
  93. if uploadID != "" {
  94. if r.Method == "PUT" && query.Has("partNumber") {
  95. h.handleUploadPart(w, r, bucketName, objectKey)
  96. return
  97. }
  98. if r.Method == "POST" {
  99. h.handleCompleteMultipartUpload(w, r, bucketName, objectKey)
  100. return
  101. }
  102. if r.Method == "DELETE" {
  103. h.handleAbortMultipartUpload(w, r, bucketName, objectKey)
  104. return
  105. }
  106. if r.Method == "GET" {
  107. h.handleListParts(w, r, bucketName, objectKey)
  108. return
  109. }
  110. }
  111. // Regular object operations
  112. switch r.Method {
  113. case "PUT":
  114. h.handlePutObject(w, r, bucketName, objectKey)
  115. case "GET":
  116. h.handleGetObject(w, r, bucketName, objectKey)
  117. case "DELETE":
  118. h.handleDeleteObject(w, r, bucketName, objectKey)
  119. case "HEAD":
  120. h.handleHeadObject(w, r, bucketName, objectKey)
  121. default:
  122. h.writeError(w, "MethodNotAllowed", "Method not allowed", objectKey, http.StatusMethodNotAllowed)
  123. }
  124. }
  125. func (h *S3Handler) handleListBuckets(w http.ResponseWriter, r *http.Request) {
  126. creds, err := h.getUserFromContext(r)
  127. if err != nil {
  128. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  129. return
  130. }
  131. result, err := h.service.ListBuckets(creds.AccountID, creds.AccessKeyID)
  132. if err != nil {
  133. log.Printf("Error listing buckets for user %s: %v", creds.AccountID, err)
  134. h.writeError(w, "InternalError", "Failed to list buckets", "", http.StatusInternalServerError)
  135. return
  136. }
  137. result.XMLName = xml.Name{Space: "http://s3.amazonaws.com/doc/2006-03-01/", Local: "ListAllMyBucketsResult"}
  138. w.Header().Set("Content-Type", "application/xml")
  139. w.Header().Set("x-amz-request-id", generateRequestId())
  140. w.WriteHeader(http.StatusOK)
  141. encoder := xml.NewEncoder(w)
  142. encoder.Indent("", " ")
  143. if err := encoder.Encode(result); err != nil {
  144. log.Printf("Error encoding response: %v", err)
  145. }
  146. }
  147. func (h *S3Handler) handleCreateBucket(w http.ResponseWriter, r *http.Request, bucketName string) {
  148. creds, err := h.getUserFromContext(r)
  149. if err != nil {
  150. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  151. return
  152. }
  153. err = h.service.CreateBucket(creds.AccountID, bucketName)
  154. if err != nil {
  155. if err == storage.ErrBucketAlreadyExists {
  156. h.writeError(w, "BucketAlreadyExists", "The requested bucket name is not available", bucketName, http.StatusConflict)
  157. return
  158. }
  159. log.Printf("Failed to create bucket %s for user %s: %v", bucketName, creds.AccountID, err)
  160. h.writeError(w, "InternalError", "Failed to create bucket", bucketName, http.StatusInternalServerError)
  161. return
  162. }
  163. log.Printf("Created bucket: %s for user: %s", bucketName, creds.AccountID)
  164. w.Header().Set("Location", "/"+bucketName)
  165. w.Header().Set("x-amz-request-id", generateRequestId())
  166. w.WriteHeader(http.StatusOK)
  167. }
  168. func (h *S3Handler) handleListObjects(w http.ResponseWriter, r *http.Request, bucketName string) {
  169. creds, err := h.getUserFromContext(r)
  170. if err != nil {
  171. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  172. return
  173. }
  174. log.Printf("ListObjects request for bucket: %s, user: %s", bucketName, creds.AccountID)
  175. objects, err := h.service.ListObjects(creds.AccountID, bucketName)
  176. if err != nil {
  177. if err == storage.ErrBucketNotFound {
  178. h.writeError(w, "NoSuchBucket", "The specified bucket does not exist", bucketName, http.StatusNotFound)
  179. return
  180. }
  181. log.Printf("Failed to list objects in bucket %s for user %s: %v", bucketName, creds.AccountID, err)
  182. h.writeError(w, "InternalError", "Failed to list objects", bucketName, http.StatusInternalServerError)
  183. return
  184. }
  185. // Build object list XML
  186. var objectsXML []string
  187. for _, obj := range objects {
  188. objectsXML = append(objectsXML, fmt.Sprintf(` <Contents>
  189. <Key>%s</Key>
  190. <LastModified>%s</LastModified>
  191. <ETag>"%s"</ETag>
  192. <Size>%d</Size>
  193. <StorageClass>STANDARD</StorageClass>
  194. </Contents>`, obj.Key, obj.LastModified.UTC().Format(time.RFC3339), obj.ETag, obj.Size))
  195. }
  196. w.Header().Set("Content-Type", "application/xml")
  197. w.Header().Set("x-amz-request-id", generateRequestId())
  198. w.WriteHeader(http.StatusOK)
  199. response := fmt.Sprintf(`<?xml version="1.0" encoding="UTF-8"?>
  200. <ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
  201. <Name>%s</Name>
  202. <Prefix></Prefix>
  203. <Marker></Marker>
  204. <MaxKeys>1000</MaxKeys>
  205. <IsTruncated>false</IsTruncated>
  206. %s
  207. </ListBucketResult>`, bucketName, strings.Join(objectsXML, "\n"))
  208. w.Write([]byte(response))
  209. }
  210. func (h *S3Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request, bucketName string) {
  211. creds, err := h.getUserFromContext(r)
  212. if err != nil {
  213. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  214. return
  215. }
  216. // Check if bucket exists
  217. exists, err := h.service.BucketExists(creds.AccountID, bucketName)
  218. if err != nil {
  219. log.Printf("Error checking bucket existence %s for user %s: %v", bucketName, creds.AccountID, err)
  220. h.writeError(w, "InternalError", "Failed to check bucket", bucketName, http.StatusInternalServerError)
  221. return
  222. }
  223. if !exists {
  224. h.writeError(w, "NoSuchBucket", "The specified bucket does not exist", bucketName, http.StatusNotFound)
  225. return
  226. }
  227. // Check if bucket is empty
  228. isEmpty, err := h.service.BucketIsEmpty(creds.AccountID, bucketName)
  229. if err != nil {
  230. log.Printf("Error checking if bucket %s is empty for user %s: %v", bucketName, creds.AccountID, err)
  231. h.writeError(w, "InternalError", "Failed to check bucket contents", bucketName, http.StatusInternalServerError)
  232. return
  233. }
  234. if !isEmpty {
  235. h.writeError(w, "BucketNotEmpty", "The bucket you tried to delete is not empty", bucketName, http.StatusConflict)
  236. return
  237. }
  238. // Delete the bucket
  239. if err := h.service.DeleteBucket(creds.AccountID, bucketName); err != nil {
  240. log.Printf("Failed to delete bucket %s for user %s: %v", bucketName, creds.AccountID, err)
  241. if err == storage.ErrBucketNotFound {
  242. h.writeError(w, "NoSuchBucket", "The specified bucket does not exist", bucketName, http.StatusNotFound)
  243. return
  244. }
  245. h.writeError(w, "InternalError", "Failed to delete bucket", bucketName, http.StatusInternalServerError)
  246. return
  247. }
  248. log.Printf("Deleted bucket: %s for user: %s", bucketName, creds.AccountID)
  249. w.Header().Set("x-amz-request-id", generateRequestId())
  250. w.WriteHeader(http.StatusNoContent)
  251. }
  252. func (h *S3Handler) handleHeadBucket(w http.ResponseWriter, r *http.Request, bucketName string) {
  253. creds, err := h.getUserFromContext(r)
  254. if err != nil {
  255. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  256. return
  257. }
  258. exists, err := h.service.BucketExists(creds.AccountID, bucketName)
  259. if err != nil {
  260. w.WriteHeader(http.StatusInternalServerError)
  261. return
  262. }
  263. if !exists {
  264. w.WriteHeader(http.StatusNotFound)
  265. return
  266. }
  267. w.Header().Set("x-amz-request-id", generateRequestId())
  268. w.WriteHeader(http.StatusOK)
  269. }
  270. func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, bucketName, objectKey string) {
  271. creds, err := h.getUserFromContext(r)
  272. if err != nil {
  273. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  274. return
  275. }
  276. // Extract checksum headers
  277. checksumHeaders := extractChecksumHeaders(r)
  278. validator := checksum.NewValidator(checksumHeaders)
  279. // Validate and store the object
  280. result, data, err := validator.ValidateStream(r.Body)
  281. if err != nil {
  282. log.Printf("Failed to read object data %s/%s for user %s: %v", bucketName, objectKey, creds.AccountID, err)
  283. h.writeError(w, "InternalError", "Failed to read object data", objectKey, http.StatusInternalServerError)
  284. return
  285. }
  286. // Check if validation failed
  287. if !result.Valid {
  288. log.Printf("Checksum validation failed for %s/%s: %s", bucketName, objectKey, result.ErrorMessage)
  289. h.writeError(w, "InvalidDigest", result.ErrorMessage, objectKey, http.StatusBadRequest)
  290. return
  291. }
  292. // Store the object
  293. reader := strings.NewReader(string(data))
  294. err = h.service.PutObject(creds.AccountID, bucketName, objectKey, reader)
  295. if err != nil {
  296. if err == storage.ErrBucketNotFound {
  297. h.writeError(w, "NoSuchBucket", "The specified bucket does not exist", bucketName, http.StatusNotFound)
  298. return
  299. }
  300. log.Printf("Failed to put object %s/%s for user %s: %v", bucketName, objectKey, creds.AccountID, err)
  301. h.writeError(w, "InternalError", "Failed to create object", objectKey, http.StatusInternalServerError)
  302. return
  303. }
  304. log.Printf("Created object: %s/%s for user: %s (validated with %s)",
  305. bucketName, objectKey, creds.AccountID, result.ValidatedWith)
  306. // Set response headers including checksums
  307. responseHeaders := result.GetResponseHeaders()
  308. for key, value := range responseHeaders {
  309. w.Header().Set(key, value)
  310. }
  311. w.Header().Set("x-amz-request-id", generateRequestId())
  312. w.WriteHeader(http.StatusOK)
  313. }
  314. func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, bucketName, objectKey string) {
  315. creds, err := h.getUserFromContext(r)
  316. if err != nil {
  317. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  318. return
  319. }
  320. file, info, err := h.service.GetObject(creds.AccountID, bucketName, objectKey)
  321. if err != nil {
  322. if err == storage.ErrObjectNotFound {
  323. h.writeError(w, "NoSuchKey", "The specified key does not exist", objectKey, http.StatusNotFound)
  324. return
  325. }
  326. log.Printf("Failed to get object %s/%s for user %s: %v", bucketName, objectKey, creds.AccountID, err)
  327. h.writeError(w, "InternalError", "Failed to retrieve object", objectKey, http.StatusInternalServerError)
  328. return
  329. }
  330. defer file.Close()
  331. // Read file data to compute checksums
  332. data, err := io.ReadAll(file)
  333. if err != nil {
  334. log.Printf("Failed to read object data %s/%s: %v", bucketName, objectKey, err)
  335. h.writeError(w, "InternalError", "Failed to read object data", objectKey, http.StatusInternalServerError)
  336. return
  337. }
  338. // Compute checksums for the response
  339. validator := checksum.NewValidator(map[string]string{})
  340. result := validator.ComputeChecksums(data)
  341. log.Printf("Retrieved object: %s/%s for user: %s", bucketName, objectKey, creds.AccountID)
  342. // Set standard headers
  343. w.Header().Set("Content-Type", getContentType(objectKey))
  344. w.Header().Set("Content-Length", fmt.Sprintf("%d", info.Size))
  345. w.Header().Set("Last-Modified", info.LastModified.UTC().Format(http.TimeFormat))
  346. w.Header().Set("x-amz-request-id", generateRequestId())
  347. // Set checksum headers
  348. w.Header().Set("ETag", fmt.Sprintf(`"%s"`, result.MD5))
  349. w.Header().Set("x-amz-checksum-crc32", result.CRC32)
  350. w.Header().Set("x-amz-checksum-crc32c", result.CRC32C)
  351. w.Header().Set("x-amz-checksum-sha1", result.SHA1)
  352. w.Header().Set("x-amz-checksum-sha256", result.SHA256)
  353. w.WriteHeader(http.StatusOK)
  354. w.Write(data)
  355. }
  356. func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, bucketName, objectKey string) {
  357. creds, err := h.getUserFromContext(r)
  358. if err != nil {
  359. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  360. return
  361. }
  362. err = h.service.DeleteObject(creds.AccountID, bucketName, objectKey)
  363. if err != nil {
  364. log.Printf("Failed to delete object %s/%s for user %s: %v", bucketName, objectKey, creds.AccountID, err)
  365. }
  366. log.Printf("Deleted object: %s/%s for user: %s", bucketName, objectKey, creds.AccountID)
  367. w.Header().Set("x-amz-request-id", generateRequestId())
  368. w.WriteHeader(http.StatusNoContent)
  369. }
  370. func (h *S3Handler) handleHeadObject(w http.ResponseWriter, r *http.Request, bucketName, objectKey string) {
  371. creds, err := h.getUserFromContext(r)
  372. if err != nil {
  373. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  374. return
  375. }
  376. info, err := h.service.GetObjectInfo(creds.AccountID, bucketName, objectKey)
  377. if err != nil {
  378. if err == storage.ErrObjectNotFound {
  379. w.WriteHeader(http.StatusNotFound)
  380. return
  381. }
  382. w.WriteHeader(http.StatusInternalServerError)
  383. return
  384. }
  385. w.Header().Set("Content-Type", getContentType(objectKey))
  386. w.Header().Set("Content-Length", fmt.Sprintf("%d", info.Size))
  387. w.Header().Set("Last-Modified", info.LastModified.UTC().Format(http.TimeFormat))
  388. w.Header().Set("ETag", fmt.Sprintf("\"%s\"", info.ETag))
  389. w.Header().Set("x-amz-request-id", generateRequestId())
  390. w.WriteHeader(http.StatusOK)
  391. }
  392. func (h *S3Handler) writeError(w http.ResponseWriter, code, message, resource string, statusCode int) {
  393. errorResp := s3.S3Error{
  394. XMLName: xml.Name{Space: "", Local: "Error"},
  395. Code: code,
  396. Message: message,
  397. Resource: resource,
  398. RequestId: generateRequestId(),
  399. }
  400. w.Header().Set("Content-Type", "application/xml")
  401. w.Header().Set("x-amz-request-id", errorResp.RequestId)
  402. w.WriteHeader(statusCode)
  403. encoder := xml.NewEncoder(w)
  404. encoder.Indent("", " ")
  405. if err := encoder.Encode(errorResp); err != nil {
  406. log.Printf("Error encoding S3 error response: %v", err)
  407. }
  408. }
  409. // 123
  410. // Update handleDeleteBucket in handler/s3_handler.go
  411. // Update handleDeleteObjects to use the batch service method
  412. func (h *S3Handler) handleDeleteObjects(w http.ResponseWriter, r *http.Request, bucketName string) {
  413. creds, err := h.getUserFromContext(r)
  414. if err != nil {
  415. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  416. return
  417. }
  418. // Check if bucket exists
  419. exists, err := h.service.BucketExists(creds.AccountID, bucketName)
  420. if err != nil {
  421. log.Printf("Error checking bucket existence %s for user %s: %v", bucketName, creds.AccountID, err)
  422. h.writeError(w, "InternalError", "Failed to check bucket", bucketName, http.StatusInternalServerError)
  423. return
  424. }
  425. if !exists {
  426. h.writeError(w, "NoSuchBucket", "The specified bucket does not exist", bucketName, http.StatusNotFound)
  427. return
  428. }
  429. // Parse the delete request
  430. var deleteReq s3.DeleteObjectsRequest
  431. if err := xml.NewDecoder(r.Body).Decode(&deleteReq); err != nil {
  432. log.Printf("Error decoding delete objects request: %v", err)
  433. h.writeError(w, "MalformedXML", "The XML you provided was not well-formed", "", http.StatusBadRequest)
  434. return
  435. }
  436. // Validate request
  437. if len(deleteReq.Objects) == 0 {
  438. h.writeError(w, "MalformedXML", "No objects specified for deletion", "", http.StatusBadRequest)
  439. return
  440. }
  441. if len(deleteReq.Objects) > 1000 {
  442. h.writeError(w, "InvalidRequest", "You cannot delete more than 1000 objects in a single request", "", http.StatusBadRequest)
  443. return
  444. }
  445. // Extract keys
  446. keys := make([]string, len(deleteReq.Objects))
  447. for i, obj := range deleteReq.Objects {
  448. keys[i] = obj.Key
  449. }
  450. // Process deletions using service
  451. deletionErrors := h.service.DeleteObjects(creds.AccountID, bucketName, keys)
  452. // Build response
  453. response := s3.DeleteObjectsResponse{
  454. XMLName: xml.Name{Space: "http://s3.amazonaws.com/doc/2006-03-01/", Local: "DeleteResult"},
  455. Deleted: []s3.DeletedObject{},
  456. Errors: []s3.DeleteError{},
  457. }
  458. for _, obj := range deleteReq.Objects {
  459. if err, hasError := deletionErrors[obj.Key]; hasError {
  460. log.Printf("Failed to delete object %s/%s for user %s: %v", bucketName, obj.Key, creds.AccountID, err)
  461. // Determine error code based on error type
  462. errorCode := "InternalError"
  463. errorMessage := "We encountered an internal error. Please try again."
  464. if err == storage.ErrObjectNotFound {
  465. // S3 doesn't return an error for missing objects in batch delete
  466. // Just add to deleted list
  467. if !deleteReq.Quiet {
  468. response.Deleted = append(response.Deleted, s3.DeletedObject{
  469. Key: obj.Key,
  470. })
  471. }
  472. continue
  473. }
  474. response.Errors = append(response.Errors, s3.DeleteError{
  475. Key: obj.Key,
  476. VersionId: obj.VersionId,
  477. Code: errorCode,
  478. Message: errorMessage,
  479. })
  480. } else {
  481. // Add to deleted list if not in quiet mode
  482. if !deleteReq.Quiet {
  483. response.Deleted = append(response.Deleted, s3.DeletedObject{
  484. Key: obj.Key,
  485. VersionId: obj.VersionId,
  486. })
  487. }
  488. }
  489. }
  490. log.Printf("Batch deleted %d objects (attempted: %d) from bucket %s for user: %s",
  491. len(deleteReq.Objects)-len(response.Errors), len(deleteReq.Objects), bucketName, creds.AccountID)
  492. // Write response
  493. w.Header().Set("Content-Type", "application/xml")
  494. w.Header().Set("x-amz-request-id", generateRequestId())
  495. w.WriteHeader(http.StatusOK)
  496. encoder := xml.NewEncoder(w)
  497. encoder.Indent("", " ")
  498. if err := encoder.Encode(response); err != nil {
  499. log.Printf("Error encoding delete objects response: %v", err)
  500. }
  501. }
  502. // Add these methods to S3Handler
  503. func (h *S3Handler) handleCreateMultipartUpload(w http.ResponseWriter, r *http.Request, bucketName, objectKey string) {
  504. creds, err := h.getUserFromContext(r)
  505. if err != nil {
  506. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  507. return
  508. }
  509. uploadID, err := h.service.CreateMultipartUpload(creds.AccountID, bucketName, objectKey)
  510. if err != nil {
  511. if err == storage.ErrBucketNotFound {
  512. h.writeError(w, "NoSuchBucket", "The specified bucket does not exist", bucketName, http.StatusNotFound)
  513. return
  514. }
  515. log.Printf("Failed to create multipart upload for %s/%s: %v", bucketName, objectKey, err)
  516. h.writeError(w, "InternalError", "Failed to initiate multipart upload", objectKey, http.StatusInternalServerError)
  517. return
  518. }
  519. log.Printf("Created multipart upload: %s for %s/%s, user: %s", uploadID, bucketName, objectKey, creds.AccountID)
  520. result := s3.InitiateMultipartUploadResult{
  521. XMLName: xml.Name{Space: "http://s3.amazonaws.com/doc/2006-03-01/", Local: "InitiateMultipartUploadResult"},
  522. Bucket: bucketName,
  523. Key: objectKey,
  524. UploadId: uploadID,
  525. }
  526. w.Header().Set("Content-Type", "application/xml")
  527. w.Header().Set("x-amz-request-id", generateRequestId())
  528. w.WriteHeader(http.StatusOK)
  529. encoder := xml.NewEncoder(w)
  530. encoder.Indent("", " ")
  531. if err := encoder.Encode(result); err != nil {
  532. log.Printf("Error encoding response: %v", err)
  533. }
  534. }
  535. func (h *S3Handler) handleUploadPart(w http.ResponseWriter, r *http.Request, bucketName, objectKey string) {
  536. creds, err := h.getUserFromContext(r)
  537. if err != nil {
  538. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  539. return
  540. }
  541. // Get query parameters
  542. uploadID := r.URL.Query().Get("uploadId")
  543. partNumberStr := r.URL.Query().Get("partNumber")
  544. if uploadID == "" || partNumberStr == "" {
  545. h.writeError(w, "InvalidRequest", "Missing uploadId or partNumber", "", http.StatusBadRequest)
  546. return
  547. }
  548. partNumber := 0
  549. fmt.Sscanf(partNumberStr, "%d", &partNumber)
  550. // Verify upload belongs to user
  551. upload, err := h.service.GetMultipartUpload(uploadID)
  552. if err != nil {
  553. h.writeError(w, "NoSuchUpload", "The specified upload does not exist", uploadID, http.StatusNotFound)
  554. return
  555. }
  556. if upload.AccountID != creds.AccountID || upload.Bucket != bucketName || upload.Key != objectKey {
  557. h.writeError(w, "AccessDenied", "Access denied to this upload", uploadID, http.StatusForbidden)
  558. return
  559. }
  560. // Extract checksum headers and validate
  561. checksumHeaders := extractChecksumHeaders(r)
  562. validator := checksum.NewValidator(checksumHeaders)
  563. // Validate the part data
  564. result, data, err := validator.ValidateStream(r.Body)
  565. if err != nil {
  566. log.Printf("Failed to read part data for upload %s: %v", uploadID, err)
  567. h.writeError(w, "InternalError", "Failed to read part data", "", http.StatusInternalServerError)
  568. return
  569. }
  570. // Check if validation failed
  571. if !result.Valid {
  572. log.Printf("Checksum validation failed for part %d of upload %s: %s",
  573. partNumber, uploadID, result.ErrorMessage)
  574. h.writeError(w, "InvalidDigest", result.ErrorMessage, "", http.StatusBadRequest)
  575. return
  576. }
  577. // Upload the part
  578. reader := strings.NewReader(string(data))
  579. etag, err := h.service.UploadPart(uploadID, partNumber, reader)
  580. if err != nil {
  581. log.Printf("Failed to upload part %d for upload %s: %v", partNumber, uploadID, err)
  582. h.writeError(w, "InternalError", "Failed to upload part", "", http.StatusInternalServerError)
  583. return
  584. }
  585. log.Printf("Uploaded part %d for upload %s, user: %s (validated with %s)",
  586. partNumber, uploadID, creds.AccountID, result.ValidatedWith)
  587. // Set response headers including checksums
  588. w.Header().Set("ETag", etag)
  589. responseHeaders := result.GetResponseHeaders()
  590. for key, value := range responseHeaders {
  591. if key != "ETag" { // Don't override the ETag
  592. w.Header().Set(key, value)
  593. }
  594. }
  595. w.Header().Set("x-amz-request-id", generateRequestId())
  596. w.WriteHeader(http.StatusOK)
  597. }
  598. func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http.Request, bucketName, objectKey string) {
  599. creds, err := h.getUserFromContext(r)
  600. if err != nil {
  601. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  602. return
  603. }
  604. uploadID := r.URL.Query().Get("uploadId")
  605. if uploadID == "" {
  606. h.writeError(w, "InvalidRequest", "Missing uploadId", "", http.StatusBadRequest)
  607. return
  608. }
  609. // Verify upload belongs to user
  610. upload, err := h.service.GetMultipartUpload(uploadID)
  611. if err != nil {
  612. h.writeError(w, "NoSuchUpload", "The specified upload does not exist", uploadID, http.StatusNotFound)
  613. return
  614. }
  615. if upload.AccountID != creds.AccountID || upload.Bucket != bucketName || upload.Key != objectKey {
  616. h.writeError(w, "AccessDenied", "Access denied to this upload", uploadID, http.StatusForbidden)
  617. return
  618. }
  619. // Parse request
  620. var req s3.CompleteMultipartUploadRequest
  621. if err := xml.NewDecoder(r.Body).Decode(&req); err != nil {
  622. log.Printf("Error decoding complete multipart upload request: %v", err)
  623. h.writeError(w, "MalformedXML", "The XML you provided was not well-formed", "", http.StatusBadRequest)
  624. return
  625. }
  626. // Extract part numbers
  627. parts := make([]int, len(req.Parts))
  628. for i, part := range req.Parts {
  629. parts[i] = part.PartNumber
  630. }
  631. // Complete the upload
  632. if err := h.service.CompleteMultipartUpload(uploadID, parts); err != nil {
  633. log.Printf("Failed to complete multipart upload %s: %v", uploadID, err)
  634. h.writeError(w, "InternalError", "Failed to complete multipart upload", "", http.StatusInternalServerError)
  635. return
  636. }
  637. log.Printf("Completed multipart upload: %s for %s/%s, user: %s", uploadID, bucketName, objectKey, creds.AccountID)
  638. result := s3.CompleteMultipartUploadResult{
  639. XMLName: xml.Name{Space: "http://s3.amazonaws.com/doc/2006-03-01/", Local: "CompleteMultipartUploadResult"},
  640. Location: fmt.Sprintf("/%s/%s", bucketName, objectKey),
  641. Bucket: bucketName,
  642. Key: objectKey,
  643. ETag: fmt.Sprintf("\"%d\"", time.Now().Unix()),
  644. }
  645. w.Header().Set("Content-Type", "application/xml")
  646. w.Header().Set("x-amz-request-id", generateRequestId())
  647. w.WriteHeader(http.StatusOK)
  648. encoder := xml.NewEncoder(w)
  649. encoder.Indent("", " ")
  650. if err := encoder.Encode(result); err != nil {
  651. log.Printf("Error encoding response: %v", err)
  652. }
  653. }
  654. func (h *S3Handler) handleAbortMultipartUpload(w http.ResponseWriter, r *http.Request, bucketName, objectKey string) {
  655. creds, err := h.getUserFromContext(r)
  656. if err != nil {
  657. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  658. return
  659. }
  660. uploadID := r.URL.Query().Get("uploadId")
  661. if uploadID == "" {
  662. h.writeError(w, "InvalidRequest", "Missing uploadId", "", http.StatusBadRequest)
  663. return
  664. }
  665. // Verify upload belongs to user
  666. upload, err := h.service.GetMultipartUpload(uploadID)
  667. if err != nil {
  668. h.writeError(w, "NoSuchUpload", "The specified upload does not exist", uploadID, http.StatusNotFound)
  669. return
  670. }
  671. if upload.AccountID != creds.AccountID || upload.Bucket != bucketName || upload.Key != objectKey {
  672. h.writeError(w, "AccessDenied", "Access denied to this upload", uploadID, http.StatusForbidden)
  673. return
  674. }
  675. // Abort the upload
  676. if err := h.service.AbortMultipartUpload(uploadID); err != nil {
  677. log.Printf("Failed to abort multipart upload %s: %v", uploadID, err)
  678. h.writeError(w, "InternalError", "Failed to abort multipart upload", "", http.StatusInternalServerError)
  679. return
  680. }
  681. log.Printf("Aborted multipart upload: %s for %s/%s, user: %s", uploadID, bucketName, objectKey, creds.AccountID)
  682. w.Header().Set("x-amz-request-id", generateRequestId())
  683. w.WriteHeader(http.StatusNoContent)
  684. }
  685. func (h *S3Handler) handleListParts(w http.ResponseWriter, r *http.Request, bucketName, objectKey string) {
  686. creds, err := h.getUserFromContext(r)
  687. if err != nil {
  688. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  689. return
  690. }
  691. uploadID := r.URL.Query().Get("uploadId")
  692. if uploadID == "" {
  693. h.writeError(w, "InvalidRequest", "Missing uploadId", "", http.StatusBadRequest)
  694. return
  695. }
  696. // Verify upload belongs to user
  697. upload, err := h.service.GetMultipartUpload(uploadID)
  698. if err != nil {
  699. h.writeError(w, "NoSuchUpload", "The specified upload does not exist", uploadID, http.StatusNotFound)
  700. return
  701. }
  702. if upload.AccountID != creds.AccountID || upload.Bucket != bucketName || upload.Key != objectKey {
  703. h.writeError(w, "AccessDenied", "Access denied to this upload", uploadID, http.StatusForbidden)
  704. return
  705. }
  706. // Parse query parameters
  707. maxParts := 1000
  708. partNumberMarker := 0
  709. if maxPartsStr := r.URL.Query().Get("max-parts"); maxPartsStr != "" {
  710. fmt.Sscanf(maxPartsStr, "%d", &maxParts)
  711. }
  712. if markerStr := r.URL.Query().Get("part-number-marker"); markerStr != "" {
  713. fmt.Sscanf(markerStr, "%d", &partNumberMarker)
  714. }
  715. // List parts
  716. parts, nextMarker, isTruncated, err := h.service.ListParts(uploadID, maxParts, partNumberMarker)
  717. if err != nil {
  718. log.Printf("Failed to list parts for upload %s: %v", uploadID, err)
  719. h.writeError(w, "InternalError", "Failed to list parts", "", http.StatusInternalServerError)
  720. return
  721. }
  722. // Build response
  723. s3Parts := make([]s3.Part, len(parts))
  724. for i, part := range parts {
  725. s3Parts[i] = s3.Part{
  726. PartNumber: part.PartNumber,
  727. LastModified: part.LastModified.UTC().Format(time.RFC3339),
  728. ETag: part.ETag,
  729. Size: part.Size,
  730. }
  731. }
  732. result := s3.ListPartsResult{
  733. XMLName: xml.Name{Space: "http://s3.amazonaws.com/doc/2006-03-01/", Local: "ListPartsResult"},
  734. Bucket: bucketName,
  735. Key: objectKey,
  736. UploadId: uploadID,
  737. Initiator: s3.Initiator{ID: creds.AccountID, DisplayName: creds.AccessKeyID},
  738. Owner: s3.Owner{ID: creds.AccountID, DisplayName: creds.AccessKeyID},
  739. StorageClass: "STANDARD",
  740. PartNumberMarker: partNumberMarker,
  741. NextPartNumberMarker: nextMarker,
  742. MaxParts: maxParts,
  743. IsTruncated: isTruncated,
  744. Parts: s3Parts,
  745. }
  746. w.Header().Set("Content-Type", "application/xml")
  747. w.Header().Set("x-amz-request-id", generateRequestId())
  748. w.WriteHeader(http.StatusOK)
  749. encoder := xml.NewEncoder(w)
  750. encoder.Indent("", " ")
  751. if err := encoder.Encode(result); err != nil {
  752. log.Printf("Error encoding response: %v", err)
  753. }
  754. }
  755. func (h *S3Handler) handleListMultipartUploads(w http.ResponseWriter, r *http.Request, bucketName string) {
  756. creds, err := h.getUserFromContext(r)
  757. if err != nil {
  758. h.writeError(w, "AccessDenied", "Failed to retrieve credentials", "", http.StatusForbidden)
  759. return
  760. }
  761. // Parse query parameters
  762. maxUploads := 1000
  763. keyMarker := r.URL.Query().Get("key-marker")
  764. uploadIDMarker := r.URL.Query().Get("upload-id-marker")
  765. prefix := r.URL.Query().Get("prefix")
  766. if maxUploadsStr := r.URL.Query().Get("max-uploads"); maxUploadsStr != "" {
  767. fmt.Sscanf(maxUploadsStr, "%d", &maxUploads)
  768. }
  769. // List uploads
  770. uploads, nextKeyMarker, nextUploadIDMarker, isTruncated, err := h.service.ListMultipartUploads(
  771. creds.AccountID, bucketName, maxUploads, keyMarker, uploadIDMarker,
  772. )
  773. if err != nil {
  774. if err == storage.ErrBucketNotFound {
  775. h.writeError(w, "NoSuchBucket", "The specified bucket does not exist", bucketName, http.StatusNotFound)
  776. return
  777. }
  778. log.Printf("Failed to list multipart uploads for bucket %s: %v", bucketName, err)
  779. h.writeError(w, "InternalError", "Failed to list uploads", "", http.StatusInternalServerError)
  780. return
  781. }
  782. // Filter by prefix if provided
  783. if prefix != "" {
  784. filtered := []storage.MultipartUploadInfo{}
  785. for _, upload := range uploads {
  786. if len(upload.Key) >= len(prefix) && upload.Key[:len(prefix)] == prefix {
  787. filtered = append(filtered, upload)
  788. }
  789. }
  790. uploads = filtered
  791. }
  792. // Build response
  793. s3Uploads := make([]s3.MultipartUpload, len(uploads))
  794. for i, upload := range uploads {
  795. s3Uploads[i] = s3.MultipartUpload{
  796. Key: upload.Key,
  797. UploadId: upload.UploadID,
  798. Initiator: s3.Initiator{ID: creds.AccountID, DisplayName: creds.AccessKeyID},
  799. Owner: s3.Owner{ID: creds.AccountID, DisplayName: creds.AccessKeyID},
  800. StorageClass: "STANDARD",
  801. Initiated: upload.Initiated.UTC().Format(time.RFC3339),
  802. }
  803. }
  804. result := s3.ListMultipartUploadsResult{
  805. XMLName: xml.Name{Space: "http://s3.amazonaws.com/doc/2006-03-01/", Local: "ListMultipartUploadsResult"},
  806. Bucket: bucketName,
  807. KeyMarker: keyMarker,
  808. UploadIdMarker: uploadIDMarker,
  809. NextKeyMarker: nextKeyMarker,
  810. NextUploadIdMarker: nextUploadIDMarker,
  811. MaxUploads: maxUploads,
  812. IsTruncated: isTruncated,
  813. Uploads: s3Uploads,
  814. Prefix: prefix,
  815. }
  816. w.Header().Set("Content-Type", "application/xml")
  817. w.Header().Set("x-amz-request-id", generateRequestId())
  818. w.WriteHeader(http.StatusOK)
  819. encoder := xml.NewEncoder(w)
  820. encoder.Indent("", " ")
  821. if err := encoder.Encode(result); err != nil {
  822. log.Printf("Error encoding response: %v", err)
  823. }
  824. }
  825. func extractChecksumHeaders(r *http.Request) map[string]string {
  826. headers := make(map[string]string)
  827. // Extract all checksum-related headers
  828. if val := r.Header.Get("Content-MD5"); val != "" {
  829. headers["Content-MD5"] = val
  830. }
  831. if val := r.Header.Get("x-amz-sdk-checksum-algorithm"); val != "" {
  832. headers["x-amz-sdk-checksum-algorithm"] = val
  833. }
  834. if val := r.Header.Get("x-amz-checksum-crc32"); val != "" {
  835. headers["x-amz-checksum-crc32"] = val
  836. }
  837. if val := r.Header.Get("x-amz-checksum-crc32c"); val != "" {
  838. headers["x-amz-checksum-crc32c"] = val
  839. }
  840. if val := r.Header.Get("x-amz-checksum-crc64nvme"); val != "" {
  841. headers["x-amz-checksum-crc64nvme"] = val
  842. }
  843. if val := r.Header.Get("x-amz-checksum-sha1"); val != "" {
  844. headers["x-amz-checksum-sha1"] = val
  845. }
  846. if val := r.Header.Get("x-amz-checksum-sha256"); val != "" {
  847. headers["x-amz-checksum-sha256"] = val
  848. }
  849. return headers
  850. }