Last active
December 20, 2023 08:57
-
-
Save hagen1778/9487f71d9a19e04d521d764b576a0ec2 to your computer and use it in GitHub Desktop.
vmctl trace requests
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/app/vmctl/main.go b/app/vmctl/main.go | |
index 95743081f..151e9115b 100644 | |
--- a/app/vmctl/main.go | |
+++ b/app/vmctl/main.go | |
@@ -230,7 +230,7 @@ func main() { | |
if err != nil { | |
return fmt.Errorf("error initilize auth config for destination: %s", dstAddr) | |
} | |
- dstHTTPClient := &http.Client{Transport: &http.Transport{DisableKeepAlives: disableKeepAlive}} | |
+ dstHTTPClient := &http.Client{Transport: &http.Transport{DisableKeepAlives: disableKeepAlive, ResponseHeaderTimeout: time.Second * 60}} | |
p := vmNativeProcessor{ | |
rateLimit: c.Int64(vmRateLimit), | |
@@ -286,7 +286,7 @@ func main() { | |
return cli.Exit(fmt.Errorf("cannot open exported block at path=%q err=%w", blockPath, err), 1) | |
} | |
var blocksCount uint64 | |
- if err := stream.Parse(f, isBlockGzipped, func(block *stream.Block) error { | |
+ if err := stream.Parse(f, nil, isBlockGzipped, func(block *stream.Block) error { | |
atomic.AddUint64(&blocksCount, 1) | |
return nil | |
}); err != nil { | |
diff --git a/app/vmctl/vm_native.go b/app/vmctl/vm_native.go | |
index 3cbea821d..d26f00d6f 100644 | |
--- a/app/vmctl/vm_native.go | |
+++ b/app/vmctl/vm_native.go | |
@@ -5,8 +5,10 @@ import ( | |
"fmt" | |
"io" | |
"log" | |
+ "net/url" | |
"strings" | |
"sync" | |
+ "sync/atomic" | |
"time" | |
"github.com/cheggaaa/pb/v3" | |
@@ -114,6 +116,8 @@ func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dst | |
return nil | |
} | |
+var iterationCounter uint64 | |
+ | |
func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error { | |
reader, err := p.src.ExportPipe(ctx, srcURL, f) | |
if err != nil { | |
@@ -125,10 +129,18 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU | |
reader = bar.NewProxyReader(reader) | |
} | |
+ iteration := atomic.AddUint64(&iterationCounter, 1) | |
+ dstURL += fmt.Sprintf("?query=%s", url.QueryEscape(f.Match)) | |
+ dstURL += fmt.Sprintf("&iteration=%d", iteration) | |
+ | |
pr, pw := io.Pipe() | |
importCh := make(chan error) | |
go func() { | |
- importCh <- p.dst.ImportPipe(ctx, dstURL, pr) | |
+ importErr := p.dst.ImportPipe(ctx, dstURL, pr) | |
+ if importErr != nil && strings.Contains(importErr.Error(), "timeout awaiting response headers") { | |
+ pr.Close() | |
+ } | |
+ importCh <- importErr | |
close(importCh) | |
}() | |
@@ -152,7 +164,12 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU | |
return err | |
} | |
- return <-importCh | |
+ start := time.Now() | |
+ log.Printf("%q: request finished - waiting for import to finish", dstURL) | |
+ importRes := <-importCh | |
+ log.Printf("%q: request completed in %v with error: %s", dstURL, time.Since(start), importRes) | |
+ return importRes | |
+ | |
} | |
func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, ranges [][]time.Time, silent bool) error { | |
diff --git a/app/vminsert/native/request_handler.go b/app/vminsert/native/request_handler.go | |
index f5e7cd714..801873696 100644 | |
--- a/app/vminsert/native/request_handler.go | |
+++ b/app/vminsert/native/request_handler.go | |
@@ -27,7 +27,7 @@ func InsertHandler(req *http.Request) error { | |
return err | |
} | |
isGzip := req.Header.Get("Content-Encoding") == "gzip" | |
- return stream.Parse(req.Body, isGzip, func(block *stream.Block) error { | |
+ return stream.Parse(req.Body, req, isGzip, func(block *stream.Block) error { | |
return insertRows(block, extraLabels) | |
}) | |
} | |
diff --git a/lib/protoparser/native/stream/streamparser.go b/lib/protoparser/native/stream/streamparser.go | |
index 9cf4bafc9..71be78157 100644 | |
--- a/lib/protoparser/native/stream/streamparser.go | |
+++ b/lib/protoparser/native/stream/streamparser.go | |
@@ -3,8 +3,11 @@ package stream | |
import ( | |
"bufio" | |
"fmt" | |
+ "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" | |
"io" | |
+ "net/http" | |
"sync" | |
+ "time" | |
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" | |
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" | |
@@ -19,7 +22,7 @@ import ( | |
// The callback can be called concurrently multiple times for streamed data from r. | |
// | |
// callback shouldn't hold block after returning. | |
-func Parse(r io.Reader, isGzip bool, callback func(block *Block) error) error { | |
+func Parse(r io.Reader, req *http.Request, isGzip bool, callback func(block *Block) error) error { | |
wcr := writeconcurrencylimiter.GetReader(r) | |
defer writeconcurrencylimiter.PutReader(wcr) | |
r = wcr | |
@@ -48,19 +51,27 @@ func Parse(r io.Reader, isGzip bool, callback func(block *Block) error) error { | |
// Read native blocks and feed workers with work. | |
sizeBuf := make([]byte, 4) | |
+ processingStart := time.Now() | |
+ | |
+ defer func() { | |
+ logger.Infof("%q (%s): whole processing took %v", req.URL.RawQuery, tr.String(), time.Since(processingStart)) | |
+ }() | |
+ | |
ctx := &streamContext{} | |
for { | |
uw := getUnmarshalWork() | |
uw.tr = tr | |
uw.ctx = ctx | |
uw.callback = callback | |
- | |
// Read uw.metricNameBuf | |
if _, err := io.ReadFull(br, sizeBuf); err != nil { | |
if err == io.EOF { | |
+ finishStart := time.Now() | |
+ logger.Infof("%q (%s): EOF received - finishing the work", req.URL.RawQuery, tr.String()) | |
// End of stream | |
putUnmarshalWork(uw) | |
ctx.wg.Wait() | |
+ logger.Infof("%q (%s): last processing is done in %v", req.URL.RawQuery, tr.String(), time.Since(finishStart)) | |
return ctx.err | |
} | |
readErrors.Inc() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment