🔨 Split the requests package

This commit is contained in:
Aykhan Shahsuvarov 2024-09-10 03:49:43 +04:00
parent 82bf31182f
commit 1ec4254468
5 changed files with 403 additions and 350 deletions

View File

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"net/url" "net/url"
"os"
"sync" "sync"
"time" "time"
@ -13,231 +12,10 @@ import (
customerrors "github.com/aykhans/dodo/custom_errors" customerrors "github.com/aykhans/dodo/custom_errors"
"github.com/aykhans/dodo/readers" "github.com/aykhans/dodo/readers"
"github.com/aykhans/dodo/utils" "github.com/aykhans/dodo/utils"
"github.com/jedib0t/go-pretty/v6/progress"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"github.com/valyala/fasthttp/fasthttpproxy" "github.com/valyala/fasthttp/fasthttpproxy"
) )
type Response struct {
StatusCode int
Error error
Time time.Duration
}
type Responses []Response
type ClientDoFunc func(ctx context.Context, request *fasthttp.Request) (*fasthttp.Response, error)
// Print prints the responses in a tabular format, including information such as
// response count, minimum time, maximum time, and average time.
func (respones *Responses) Print() {
var (
totalMinDuration time.Duration = (*respones)[0].Time
totalMaxDuration time.Duration = (*respones)[0].Time
totalDuration time.Duration
totalCount int = len(*respones)
)
mergedResponses := make(map[string][]time.Duration)
for _, response := range *respones {
if response.Time < totalMinDuration {
totalMinDuration = response.Time
}
if response.Time > totalMaxDuration {
totalMaxDuration = response.Time
}
totalDuration += response.Time
if response.Error != nil {
mergedResponses[response.Error.Error()] = append(
mergedResponses[response.Error.Error()],
response.Time,
)
} else {
mergedResponses[fmt.Sprintf("%d", response.StatusCode)] = append(
mergedResponses[fmt.Sprintf("%d", response.StatusCode)],
response.Time,
)
}
}
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.SetStyle(table.StyleLight)
t.SetAllowedRowLength(125)
t.AppendHeader(table.Row{
"Response",
"Count",
"Min Time",
"Max Time",
"Average Time",
})
for key, durations := range mergedResponses {
t.AppendRow(table.Row{
key,
len(durations),
utils.MinDuration(durations...),
utils.MaxDuration(durations...),
utils.AvgDuration(durations...),
})
t.AppendSeparator()
}
t.AppendRow(table.Row{
"Total",
totalCount,
totalMinDuration,
totalMaxDuration,
totalDuration / time.Duration(totalCount),
})
t.Render()
}
// Run executes the HTTP requests based on the provided request configuration.
// It checks for internet connection and returns an error if there is no connection.
// If the context is canceled while checking proxies, it returns the ErrInterrupt.
// If the context is canceled while sending requests, it returns the response objects obtained so far.
func Run(ctx context.Context, requestConfig *config.RequestConfig) (Responses, error) {
checkConnectionCtx, checkConnectionCtxCancel := context.WithTimeout(ctx, 8*time.Second)
if !checkConnection(checkConnectionCtx) {
checkConnectionCtxCancel()
return nil, customerrors.ErrNoInternet
}
checkConnectionCtxCancel()
clientDoFunc := getClientDoFunc(
ctx,
requestConfig.Timeout,
requestConfig.Proxies,
requestConfig.GetValidDodosCountForProxies(),
requestConfig.Yes,
requestConfig.URL,
)
if clientDoFunc == nil {
return nil, customerrors.ErrInterrupt
}
request := newRequest(
requestConfig.URL,
requestConfig.Headers,
requestConfig.Cookies,
requestConfig.Params,
requestConfig.Method,
requestConfig.Body,
)
defer fasthttp.ReleaseRequest(request)
responses := releaseDodos(
ctx,
request,
clientDoFunc,
requestConfig.GetValidDodosCountForRequests(),
requestConfig.RequestCount,
)
if ctx.Err() != nil && len(responses) == 0 {
return nil, customerrors.ErrInterrupt
}
return responses, nil
}
// releaseDodos sends multiple HTTP requests concurrently using multiple "dodos" (goroutines).
// It takes a mainRequest as the base request, timeout duration for each request, clientDoFunc for customizing the client behavior,
// dodosCount as the number of goroutines to be used, and requestCount as the total number of requests to be sent.
// It returns the responses received from all the requests.
func releaseDodos(
ctx context.Context,
mainRequest *fasthttp.Request,
clientDoFunc ClientDoFunc,
dodosCount int,
requestCount int,
) Responses {
var (
wg sync.WaitGroup
streamWG sync.WaitGroup
requestCountPerDodo int
)
wg.Add(dodosCount)
streamWG.Add(1)
responses := make([][]Response, dodosCount)
increase := make(chan int64, requestCount)
streamCtx, streamCtxCancel := context.WithCancel(context.Background())
go streamProgress(streamCtx, &streamWG, int64(requestCount), "Dodos Working🔥", increase)
for i := 0; i < dodosCount; i++ {
if i+1 == dodosCount {
requestCountPerDodo = requestCount -
(i * requestCount / dodosCount)
} else {
requestCountPerDodo = ((i + 1) * requestCount / dodosCount) -
(i * requestCount / dodosCount)
}
dodoSpecificRequest := &fasthttp.Request{}
mainRequest.CopyTo(dodoSpecificRequest)
go sendRequest(
ctx,
dodoSpecificRequest,
&responses[i],
increase,
requestCountPerDodo,
clientDoFunc,
&wg,
)
}
wg.Wait()
streamCtxCancel()
streamWG.Wait()
return utils.Flatten(responses)
}
func sendRequest(
ctx context.Context,
request *fasthttp.Request,
responseData *[]Response,
increase chan<- int64,
requestCount int,
clientDo ClientDoFunc,
wg *sync.WaitGroup,
) {
defer fasthttp.ReleaseRequest(request)
defer wg.Done()
for range requestCount {
if ctx.Err() != nil {
return
}
func() {
startTime := time.Now()
response, err := clientDo(ctx, request)
completedTime := time.Since(startTime)
if err != nil {
if err == customerrors.ErrInterrupt {
return
}
*responseData = append(*responseData, Response{
StatusCode: 0,
Error: err,
Time: completedTime,
})
increase <- 1
return
}
defer fasthttp.ReleaseResponse(response)
*responseData = append(*responseData, Response{
StatusCode: response.StatusCode(),
Error: nil,
Time: completedTime,
})
increase <- 1
}()
}
}
// getClientDoFunc returns a ClientDoFunc function that can be used to make HTTP requests. // getClientDoFunc returns a ClientDoFunc function that can be used to make HTTP requests.
// //
// The function first checks if there are any proxies available. If there are, it retrieves the active proxy clients // The function first checks if there are any proxies available. If there are, it retrieves the active proxy clients
@ -587,131 +365,3 @@ func getSharedClientDoFunc(
} }
} }
} }
// newRequest creates a new fasthttp.Request object with the provided parameters.
// It sets the request URI, host header, headers, cookies, params, method, and body.
func newRequest(
URL *url.URL,
Headers map[string]string,
Cookies map[string]string,
Params map[string]string,
Method string,
Body string,
) *fasthttp.Request {
request := fasthttp.AcquireRequest()
request.SetRequestURI(URL.Path)
// Set the host of the request to the host header
// If the host header is not set, the request will fail
// If there is host header in the headers, it will be overwritten
request.Header.Set("Host", URL.Host)
setRequestHeaders(request, Headers)
setRequestCookies(request, Cookies)
setRequestParams(request, Params)
setRequestMethod(request, Method)
setRequestBody(request, Body)
if URL.Scheme == "https" {
request.URI().SetScheme("https")
}
return request
}
// setRequestHeaders sets the headers of the given request with the provided key-value pairs.
func setRequestHeaders(req *fasthttp.Request, headers map[string]string) {
req.Header.Set("User-Agent", config.DefaultUserAgent)
for key, value := range headers {
req.Header.Set(key, value)
}
}
// setRequestCookies sets the cookies in the given request.
func setRequestCookies(req *fasthttp.Request, cookies map[string]string) {
for key, value := range cookies {
req.Header.SetCookie(key, value)
}
}
// setRequestParams sets the query parameters of the given request based on the provided map of key-value pairs.
func setRequestParams(req *fasthttp.Request, params map[string]string) {
urlParams := url.Values{}
for key, value := range params {
urlParams.Add(key, value)
}
req.URI().SetQueryString(urlParams.Encode())
}
// setRequestMethod sets the HTTP request method for the given request.
func setRequestMethod(req *fasthttp.Request, method string) {
req.Header.SetMethod(method)
}
// setRequestBody sets the request body of the given fasthttp.Request object.
// The body parameter is a string that will be converted to a byte slice and set as the request body.
func setRequestBody(req *fasthttp.Request, body string) {
req.SetBody([]byte(body))
}
// streamProgress streams the progress of a task to the console using a progress bar.
// It listens for increments on the provided channel and updates the progress bar accordingly.
//
// The function will stop and mark the progress as errored if the context is cancelled.
// It will also stop and mark the progress as done when the total number of increments is reached.
func streamProgress(
ctx context.Context,
wg *sync.WaitGroup,
total int64,
message string,
increase <-chan int64,
) {
defer wg.Done()
pw := progress.NewWriter()
pw.SetTrackerPosition(progress.PositionRight)
pw.SetStyle(progress.StyleBlocks)
pw.SetTrackerLength(40)
pw.SetUpdateFrequency(time.Millisecond * 250)
go pw.Render()
dodosTracker := progress.Tracker{
Message: message,
Total: total,
}
pw.AppendTracker(&dodosTracker)
for {
select {
case <-ctx.Done():
fmt.Printf("\r")
dodosTracker.MarkAsErrored()
time.Sleep(time.Millisecond * 300)
pw.Stop()
return
case value := <-increase:
dodosTracker.Increment(value)
}
}
}
// checkConnection checks the internet connection by making requests to different websites.
// It returns true if the connection is successful, otherwise false.
func checkConnection(ctx context.Context) bool {
ch := make(chan bool)
go func() {
_, _, err := fasthttp.Get(nil, "https://www.google.com")
if err != nil {
_, _, err = fasthttp.Get(nil, "https://www.bing.com")
if err != nil {
_, _, err = fasthttp.Get(nil, "https://www.yahoo.com")
ch <- err == nil
}
ch <- true
}
ch <- true
}()
select {
case <-ctx.Done():
return false
case res := <-ch:
return res
}
}

75
requests/helper.go Normal file
View File

@ -0,0 +1,75 @@
package requests
import (
"context"
"fmt"
"sync"
"time"
"github.com/jedib0t/go-pretty/v6/progress"
"github.com/valyala/fasthttp"
)
// streamProgress streams the progress of a task to the console using a progress bar.
// It listens for increments on the provided channel and updates the progress bar accordingly.
//
// The function will stop and mark the progress as errored if the context is cancelled.
// It will also stop and mark the progress as done when the total number of increments is reached.
func streamProgress(
ctx context.Context,
wg *sync.WaitGroup,
total int64,
message string,
increase <-chan int64,
) {
defer wg.Done()
pw := progress.NewWriter()
pw.SetTrackerPosition(progress.PositionRight)
pw.SetStyle(progress.StyleBlocks)
pw.SetTrackerLength(40)
pw.SetUpdateFrequency(time.Millisecond * 250)
go pw.Render()
dodosTracker := progress.Tracker{
Message: message,
Total: total,
}
pw.AppendTracker(&dodosTracker)
for {
select {
case <-ctx.Done():
fmt.Printf("\r")
dodosTracker.MarkAsErrored()
time.Sleep(time.Millisecond * 300)
pw.Stop()
return
case value := <-increase:
dodosTracker.Increment(value)
}
}
}
// checkConnection checks the internet connection by making requests to different websites.
// It returns true if the connection is successful, otherwise false.
func checkConnection(ctx context.Context) bool {
ch := make(chan bool)
go func() {
_, _, err := fasthttp.Get(nil, "https://www.google.com")
if err != nil {
_, _, err = fasthttp.Get(nil, "https://www.bing.com")
if err != nil {
_, _, err = fasthttp.Get(nil, "https://www.yahoo.com")
ch <- err == nil
}
ch <- true
}
ch <- true
}()
select {
case <-ctx.Done():
return false
case res := <-ch:
return res
}
}

72
requests/request.go Normal file
View File

@ -0,0 +1,72 @@
package requests
import (
"net/url"
"github.com/aykhans/dodo/config"
"github.com/valyala/fasthttp"
)
// newRequest creates a new fasthttp.Request object with the provided parameters.
// It sets the request URI, host header, headers, cookies, params, method, and body.
func newRequest(
URL *url.URL,
Headers map[string]string,
Cookies map[string]string,
Params map[string]string,
Method string,
Body string,
) *fasthttp.Request {
request := fasthttp.AcquireRequest()
request.SetRequestURI(URL.Path)
// Set the host of the request to the host header
// If the host header is not set, the request will fail
// If there is host header in the headers, it will be overwritten
request.Header.Set("Host", URL.Host)
setRequestHeaders(request, Headers)
setRequestCookies(request, Cookies)
setRequestParams(request, Params)
setRequestMethod(request, Method)
setRequestBody(request, Body)
if URL.Scheme == "https" {
request.URI().SetScheme("https")
}
return request
}
// setRequestHeaders sets the headers of the given request with the provided key-value pairs.
func setRequestHeaders(req *fasthttp.Request, headers map[string]string) {
req.Header.Set("User-Agent", config.DefaultUserAgent)
for key, value := range headers {
req.Header.Set(key, value)
}
}
// setRequestCookies sets the cookies in the given request.
func setRequestCookies(req *fasthttp.Request, cookies map[string]string) {
for key, value := range cookies {
req.Header.SetCookie(key, value)
}
}
// setRequestParams sets the query parameters of the given request based on the provided map of key-value pairs.
func setRequestParams(req *fasthttp.Request, params map[string]string) {
urlParams := url.Values{}
for key, value := range params {
urlParams.Add(key, value)
}
req.URI().SetQueryString(urlParams.Encode())
}
// setRequestMethod sets the HTTP request method for the given request.
func setRequestMethod(req *fasthttp.Request, method string) {
req.Header.SetMethod(method)
}
// setRequestBody sets the request body of the given fasthttp.Request object.
// The body parameter is a string that will be converted to a byte slice and set as the request body.
func setRequestBody(req *fasthttp.Request, body string) {
req.SetBody([]byte(body))
}

86
requests/response.go Normal file
View File

@ -0,0 +1,86 @@
package requests
import (
"context"
"fmt"
"os"
"time"
"github.com/aykhans/dodo/utils"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/valyala/fasthttp"
)
type Response struct {
StatusCode int
Error error
Time time.Duration
}
type Responses []Response
type ClientDoFunc func(ctx context.Context, request *fasthttp.Request) (*fasthttp.Response, error)
// Print prints the responses in a tabular format, including information such as
// response count, minimum time, maximum time, and average time.
func (respones *Responses) Print() {
var (
totalMinDuration time.Duration = (*respones)[0].Time
totalMaxDuration time.Duration = (*respones)[0].Time
totalDuration time.Duration
totalCount int = len(*respones)
)
mergedResponses := make(map[string][]time.Duration)
for _, response := range *respones {
if response.Time < totalMinDuration {
totalMinDuration = response.Time
}
if response.Time > totalMaxDuration {
totalMaxDuration = response.Time
}
totalDuration += response.Time
if response.Error != nil {
mergedResponses[response.Error.Error()] = append(
mergedResponses[response.Error.Error()],
response.Time,
)
} else {
mergedResponses[fmt.Sprintf("%d", response.StatusCode)] = append(
mergedResponses[fmt.Sprintf("%d", response.StatusCode)],
response.Time,
)
}
}
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.SetStyle(table.StyleLight)
t.SetAllowedRowLength(125)
t.AppendHeader(table.Row{
"Response",
"Count",
"Min Time",
"Max Time",
"Average Time",
})
for key, durations := range mergedResponses {
t.AppendRow(table.Row{
key,
len(durations),
utils.MinDuration(durations...),
utils.MaxDuration(durations...),
utils.AvgDuration(durations...),
})
t.AppendSeparator()
}
t.AppendRow(table.Row{
"Total",
totalCount,
totalMinDuration,
totalMaxDuration,
totalDuration / time.Duration(totalCount),
})
t.Render()
}

170
requests/run.go Normal file
View File

@ -0,0 +1,170 @@
package requests
import (
"context"
"sync"
"time"
"github.com/aykhans/dodo/config"
customerrors "github.com/aykhans/dodo/custom_errors"
"github.com/aykhans/dodo/utils"
"github.com/valyala/fasthttp"
)
// Run executes the HTTP requests based on the provided request configuration.
// It checks for internet connection and returns an error if there is no connection.
// If the context is canceled while checking proxies, it returns the ErrInterrupt.
// If the context is canceled while sending requests, it returns the response objects obtained so far.
func Run(ctx context.Context, requestConfig *config.RequestConfig) (Responses, error) {
checkConnectionCtx, checkConnectionCtxCancel := context.WithTimeout(ctx, 8*time.Second)
if !checkConnection(checkConnectionCtx) {
checkConnectionCtxCancel()
return nil, customerrors.ErrNoInternet
}
checkConnectionCtxCancel()
clientDoFunc := getClientDoFunc(
ctx,
requestConfig.Timeout,
requestConfig.Proxies,
requestConfig.GetValidDodosCountForProxies(),
requestConfig.Yes,
requestConfig.URL,
)
if clientDoFunc == nil {
return nil, customerrors.ErrInterrupt
}
request := newRequest(
requestConfig.URL,
requestConfig.Headers,
requestConfig.Cookies,
requestConfig.Params,
requestConfig.Method,
requestConfig.Body,
)
defer fasthttp.ReleaseRequest(request)
responses := releaseDodos(
ctx,
request,
clientDoFunc,
requestConfig.GetValidDodosCountForRequests(),
requestConfig.RequestCount,
)
if ctx.Err() != nil && len(responses) == 0 {
return nil, customerrors.ErrInterrupt
}
return responses, nil
}
// releaseDodos sends multiple HTTP requests concurrently using multiple "dodos" (goroutines).
// It takes a mainRequest as the base request, timeout duration for each request, clientDoFunc for customizing the client behavior,
// dodosCount as the number of goroutines to be used, and requestCount as the total number of requests to be sent.
// It returns the responses received from all the requests.
func releaseDodos(
ctx context.Context,
mainRequest *fasthttp.Request,
clientDoFunc ClientDoFunc,
dodosCount int,
requestCount int,
) Responses {
var (
wg sync.WaitGroup
streamWG sync.WaitGroup
requestCountPerDodo int
)
wg.Add(dodosCount)
streamWG.Add(1)
responses := make([][]Response, dodosCount)
increase := make(chan int64, requestCount)
streamCtx, streamCtxCancel := context.WithCancel(context.Background())
go streamProgress(streamCtx, &streamWG, int64(requestCount), "Dodos Working🔥", increase)
for i := 0; i < dodosCount; i++ {
if i+1 == dodosCount {
requestCountPerDodo = requestCount -
(i * requestCount / dodosCount)
} else {
requestCountPerDodo = ((i + 1) * requestCount / dodosCount) -
(i * requestCount / dodosCount)
}
dodoSpecificRequest := &fasthttp.Request{}
mainRequest.CopyTo(dodoSpecificRequest)
go sendRequest(
ctx,
dodoSpecificRequest,
&responses[i],
increase,
requestCountPerDodo,
clientDoFunc,
&wg,
)
}
wg.Wait()
streamCtxCancel()
streamWG.Wait()
return utils.Flatten(responses)
}
// sendRequest sends an HTTP request using the provided clientDo function and handles the response.
//
// Parameters:
// - ctx: The context to control cancellation and timeout.
// - request: The HTTP request to be sent.
// - responseData: A slice to store the response data.
// - increase: A channel to signal the completion of a request.
// - requestCount: The number of requests to be sent.
// - clientDo: A function to execute the HTTP request.
// - wg: A wait group to signal the completion of the function.
//
// The function sends the specified number of requests, handles errors, and appends the response data
// to the responseData slice.
func sendRequest(
ctx context.Context,
request *fasthttp.Request,
responseData *[]Response,
increase chan<- int64,
requestCount int,
clientDo ClientDoFunc,
wg *sync.WaitGroup,
) {
defer fasthttp.ReleaseRequest(request)
defer wg.Done()
for range requestCount {
if ctx.Err() != nil {
return
}
func() {
startTime := time.Now()
response, err := clientDo(ctx, request)
completedTime := time.Since(startTime)
if err != nil {
if err == customerrors.ErrInterrupt {
return
}
*responseData = append(*responseData, Response{
StatusCode: 0,
Error: err,
Time: completedTime,
})
increase <- 1
return
}
defer fasthttp.ReleaseResponse(response)
*responseData = append(*responseData, Response{
StatusCode: response.StatusCode(),
Error: nil,
Time: completedTime,
})
increase <- 1
}()
}
}