Created
December 21, 2021 09:47
-
-
Save bluebrown/f0401fd7c1941985101a35085c041502 to your computer and use it in GitHub Desktop.
Go HTTP Batch Request Handler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ( | |
"bufio" | |
"bytes" | |
"fmt" | |
"io" | |
"log" | |
"mime/multipart" | |
"net/http" | |
"net/http/httptest" | |
"net/textproto" | |
"sync" | |
) | |
/* | |
the batch handler implements a multipart/mixed request handler | |
like found here https://developers.google.com/gmail/api/guides/batch | |
but each subrequest must include the HTTP/1.1 protocol header | |
the Server struct contains at least the router to handle the requests | |
for example *mux.Router | |
the router should have the routes with the subrequests paths registered | |
Example Request: | |
POST /batch HTTP/1.1 | |
Host: localhost:8080 | |
Content-Type: multipart/mixed; boundary=boundary | |
--boundary | |
Content-Type: application/http | |
Content-ID: <item1> | |
POST /items HTTP/1.1 | |
Content-Type: application/json | |
Content-length: 58 | |
Accept: application/json | |
{ "name": "batch1", "description": "batch1 description" } | |
--boundary | |
Content-Type: application/http | |
Content-ID: <item2> | |
POST /items HTTP/1.1 | |
Content-Type: application/json | |
Content-length: 58 | |
Accept: application/json | |
{ "name": "broken", "descr | |
--boundary-- | |
*/ | |
func (s *Server) HandleBatch() http.HandlerFunc { | |
// used to collect the operations to do | |
type BatchOperation struct { | |
ContentID string | |
Request *http.Request | |
} | |
// used to collect the responses | |
type BatchResult struct { | |
ContentID string | |
Response *http.Response | |
} | |
// the actual handler function | |
return func(w http.ResponseWriter, r *http.Request) { | |
// read the multipart body | |
reader, err := r.MultipartReader() | |
if err != nil { | |
http.Error(w, fmt.Sprintf("could create a multipart reader %v\n", err), http.StatusInternalServerError) | |
} | |
// prepare a multipart response | |
body := &bytes.Buffer{} | |
writer := multipart.NewWriter(body) | |
ops := []BatchOperation{} | |
counter := 0 | |
// read each part save the for later to process | |
// them concurrently once all parts are read | |
// and the general request was valid | |
for { | |
// count the parts and return error if its more than 100 | |
counter++ | |
if counter > 100 { | |
http.Error(w, "too many parts: max 100 parts are allowed", http.StatusBadRequest) | |
return | |
} | |
requestPart, err := reader.NextPart() | |
// if we've reached the end, break out of the loop | |
if err == io.EOF { | |
break | |
} | |
// if part is not readable, return with error | |
if err != nil { | |
http.Error(w, fmt.Sprintf("part %d is not readable: %v\n", counter, err), http.StatusUnprocessableEntity) | |
return | |
} | |
// check if content type is http | |
if requestPart.Header.Get("Content-Type") != "application/http" { | |
http.Error(w, fmt.Sprintf("content type of part %d is not application/http", counter), http.StatusBadRequest) | |
return | |
} | |
// parse the body of the part into a request | |
req, err := http.ReadRequest(bufio.NewReader(requestPart)) | |
if err != nil { | |
http.Error(w, fmt.Sprintf("could not parse the sub request of part %d: %v\n", counter, err), http.StatusUnprocessableEntity) | |
return | |
} | |
// add the request to the batch | |
ops = append(ops, BatchOperation{ | |
ContentID: requestPart.Header.Get("Content-ID"), | |
Request: req, | |
}) | |
} | |
// read the batch results from the channel | |
// and create new parts for the response | |
resChan := make(chan BatchResult, len(ops)) | |
wg1 := sync.WaitGroup{} | |
wg1.Add(1) | |
go func() { | |
defer wg1.Done() | |
for result := range resChan { | |
// prepare a new response part | |
responsePart, err := writer.CreatePart(textproto.MIMEHeader{ | |
"Content-Type": {"application/http"}, | |
"Content-ID": {"response-" + result.ContentID}, | |
}) | |
if err != nil { | |
log.Printf("skipping: could not prepare a response part for subrequest %s %v\n", result.ContentID, err) | |
return | |
} | |
// write the response to the response part | |
err = result.Response.Write(responsePart) | |
if err != nil { | |
log.Printf("skipping: could not write response part for subrequest %s %v\n", result.ContentID, err) | |
return | |
} | |
} | |
}() | |
// make all the requests concurrently | |
wg2 := &sync.WaitGroup{} | |
for _, op := range ops { | |
wg2.Add(1) | |
go func(operation BatchOperation) { | |
defer wg2.Done() | |
// record the response | |
rr := httptest.NewRecorder() | |
s.Router.ServeHTTP(rr, operation.Request) | |
response := rr.Result() | |
response.ContentLength = int64(rr.Body.Len()) | |
// dispatch the response | |
resChan <- BatchResult{ | |
ContentID: operation.ContentID, | |
Response: response, | |
} | |
}(op) | |
} | |
// wait for all requests to finish | |
wg2.Wait() | |
// then close the channel | |
close(resChan) | |
// and wait for the part writter goroutine to finish | |
wg1.Wait() | |
// finally write the multipart response to the client | |
boundary := writer.Boundary() | |
w.Header().Set("Content-Type", "multipart/mixed; boundary="+boundary) | |
_, err = w.Write(body.Bytes()) | |
if err != nil { | |
log.Printf("could not write multipart response %v\n", err) | |
} | |
w.Write([]byte("\r\n--" + boundary + "--\r\n")) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment