Refactor request generation to use local random number generator (fix #26, #27)

This commit is contained in:
Aykhan Shahsuvarov 2024-09-14 19:42:40 +04:00
parent 8ad0bb5697
commit 891f1f1333
3 changed files with 179 additions and 215 deletions

View File

@ -3,34 +3,22 @@ package requests
import ( import (
"context" "context"
"fmt" "fmt"
"math/rand"
"net/url" "net/url"
"sync" "sync"
"time" "time"
"github.com/aykhans/dodo/config" "github.com/aykhans/dodo/config"
customerrors "github.com/aykhans/dodo/custom_errors"
"github.com/aykhans/dodo/readers" "github.com/aykhans/dodo/readers"
"github.com/aykhans/dodo/utils" "github.com/aykhans/dodo/utils"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"github.com/valyala/fasthttp/fasthttpproxy" "github.com/valyala/fasthttp/fasthttpproxy"
) )
type ClientDoFunc func(ctx context.Context, request *fasthttp.Request) (*fasthttp.Response, error) type ClientGeneratorFunc func() *fasthttp.HostClient
// getClientDoFunc returns a ClientDoFunc function that can be used to make HTTP requests. // getClients initializes and returns a slice of fasthttp.HostClient based on the provided parameters.
// // It can either return clients with proxies or a single client without proxies.
// The function first checks if there are any proxies available. If there are, it retrieves the active proxy clients func getClients(
// using the getActiveProxyClients function. If the context is canceled during this process, it returns nil.
// It then checks the number of active proxy clients and prompts the user to continue if there are none.
// If the user chooses to continue, it creates a fasthttp.HostClient with the appropriate settings and returns
// a ClientDoFunc function using the getSharedClientDoFunc function.
// If there is only one active proxy client, it uses that client to create the ClientDoFunc function.
// If there are multiple active proxy clients, it uses the getSharedRandomClientDoFunc function to create the ClientDoFunc function.
//
// If there are no proxies available, it creates a fasthttp.HostClient with the appropriate settings and returns
// a ClientDoFunc function using the getSharedClientDoFunc function.
func getClientDoFunc(
ctx context.Context, ctx context.Context,
timeout time.Duration, timeout time.Duration,
proxies []config.Proxy, proxies []config.Proxy,
@ -38,7 +26,7 @@ func getClientDoFunc(
maxConns uint, maxConns uint,
yes bool, yes bool,
URL *url.URL, URL *url.URL,
) ClientDoFunc { ) []*fasthttp.HostClient {
isTLS := URL.Scheme == "https" isTLS := URL.Scheme == "https"
if len(proxies) > 0 { if len(proxies) > 0 {
@ -73,26 +61,9 @@ func getClientDoFunc(
} }
} }
fmt.Println() fmt.Println()
if activeProxyClientsCount == 0 { if activeProxyClientsCount > 0 {
client := &fasthttp.HostClient{ return activeProxyClients
MaxConns: int(maxConns),
IsTLS: isTLS,
Addr: URL.Host,
MaxIdleConnDuration: timeout,
MaxConnDuration: timeout,
WriteTimeout: timeout,
ReadTimeout: timeout,
} }
return getSharedClientDoFunc(client, timeout)
} else if activeProxyClientsCount == 1 {
client := activeProxyClients[0]
return getSharedClientDoFunc(client, timeout)
}
return getSharedRandomClientDoFunc(
activeProxyClients,
activeProxyClientsCount,
timeout,
)
} }
client := &fasthttp.HostClient{ client := &fasthttp.HostClient{
@ -104,7 +75,7 @@ func getClientDoFunc(
WriteTimeout: timeout, WriteTimeout: timeout,
ReadTimeout: timeout, ReadTimeout: timeout,
} }
return getSharedClientDoFunc(client, timeout) return []*fasthttp.HostClient{client}
} }
// getActiveProxyClients divides the proxies into slices based on the number of dodos and // getActiveProxyClients divides the proxies into slices based on the number of dodos and
@ -305,75 +276,34 @@ func getDialFunc(proxy *config.Proxy, timeout time.Duration) (fasthttp.DialFunc,
return dialer, nil return dialer, nil
} }
// getSharedRandomClientDoFunc is equivalent to getSharedClientDoFunc but uses a random client from the provided slice. // getSharedClientFuncMultiple returns a ClientGeneratorFunc that cycles through
func getSharedRandomClientDoFunc( // a provided list of fasthttp.HostClient instances. Each call to the returned
clients []*fasthttp.HostClient, // function will return the next client in the list, cycling back to the first
clientsCount uint, // client after reaching the end of the slice.
timeout time.Duration, //
) ClientDoFunc { // The returned function isn't thread-safe and should be used in a single-threaded context.
clientsCountInt := int(clientsCount) func getSharedClientFuncMultiple(clients []*fasthttp.HostClient) ClientGeneratorFunc {
return func(ctx context.Context, request *fasthttp.Request) (*fasthttp.Response, error) { var (
client := clients[rand.Intn(clientsCountInt)] currentIndex int = 0
defer client.CloseIdleConnections() clientsCount int = len(clients)
response := fasthttp.AcquireResponse() )
ch := make(chan error)
go func() { return func() *fasthttp.HostClient {
err := client.DoTimeout(request, response, timeout) client := clients[currentIndex]
ch <- err if currentIndex == clientsCount-1 {
}() currentIndex = 0
select { } else {
case err := <-ch: currentIndex++
if err != nil {
fasthttp.ReleaseResponse(response)
return nil, err
}
return response, nil
case <-time.After(timeout):
fasthttp.ReleaseResponse(response)
return nil, customerrors.ErrTimeout
case <-ctx.Done():
return nil, customerrors.ErrInterrupt
} }
return client
} }
} }
// getSharedClientDoFunc is a function that returns a ClientDoFunc, which is a function type used for making HTTP requests using a shared client. // getSharedClientFuncSingle returns a ClientGeneratorFunc that always returns the provided fasthttp.HostClient instance.
// It takes a client of type *fasthttp.HostClient and a timeout of type time.Duration as input parameters. // This can be useful for sharing a single client instance across multiple requests.
// The returned ClientDoFunc function can be used to make an HTTP request with the given client and timeout. func getSharedClientFuncSingle(client *fasthttp.HostClient) ClientGeneratorFunc {
// It takes a context.Context and a *fasthttp.Request as input parameters and returns a *fasthttp.Response and an error. return func() *fasthttp.HostClient {
// The function internally creates a new response using fasthttp.AcquireResponse() and a channel to handle errors. return client
// It then spawns a goroutine to execute the client.DoTimeout() method with the given request, response, and timeout.
// The function uses a select statement to handle three cases:
// - If an error is received from the channel, it checks if the error is not nil. If it's not nil, it releases the response and returns nil and the error.
// Otherwise, it returns the response and nil.
// - If the timeout duration is reached, it releases the response and returns nil and a custom timeout error.
// - If the context is canceled, it returns nil and a custom interrupt error.
//
// The function ensures that idle connections are closed by calling client.CloseIdleConnections() using a defer statement.
func getSharedClientDoFunc(
client *fasthttp.HostClient,
timeout time.Duration,
) ClientDoFunc {
return func(ctx context.Context, request *fasthttp.Request) (*fasthttp.Response, error) {
defer client.CloseIdleConnections()
response := fasthttp.AcquireResponse()
ch := make(chan error)
go func() {
err := client.DoTimeout(request, response, timeout)
ch <- err
}()
select {
case err := <-ch:
if err != nil {
fasthttp.ReleaseResponse(response)
return nil, err
}
return response, nil
case <-time.After(timeout):
fasthttp.ReleaseResponse(response)
return nil, customerrors.ErrTimeout
case <-ctx.Done():
return nil, customerrors.ErrInterrupt
}
} }
} }

View File

@ -4,51 +4,117 @@ import (
"context" "context"
"math/rand" "math/rand"
"net/url" "net/url"
"time"
"github.com/aykhans/dodo/config" "github.com/aykhans/dodo/config"
customerrors "github.com/aykhans/dodo/custom_errors" customerrors "github.com/aykhans/dodo/custom_errors"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
) )
// getRequests generates a list of HTTP requests based on the provided parameters. type RequestGeneratorFunc func() *fasthttp.Request
//
// Parameters: // Request represents an HTTP request to be sent using the fasthttp client.
// - ctx: The context to control cancellation and deadlines. // It isn't thread-safe and should be used by a single goroutine.
// - URL: The base URL for the requests. type Request struct {
// - Headers: A map of headers to include in each request. getClient ClientGeneratorFunc
// - Cookies: A map of cookies to include in each request. getRequest RequestGeneratorFunc
// - Params: A map of query parameters to include in each request. }
// - Method: The HTTP method to use for the requests (e.g., GET, POST).
// - Bodies: A list of request bodies to cycle through for each request. // Send sends the HTTP request using the fasthttp client with a specified timeout.
// - RequestCount: The number of requests to generate. // It returns the HTTP response or an error if the request fails or times out.
// func (r *Request) Send(ctx context.Context, timeout time.Duration) (*fasthttp.Response, error) {
// Returns: client := r.getClient()
// - A list of fasthttp.Request objects based on the provided parameters. request := r.getRequest()
// - An error if the context is canceled. defer client.CloseIdleConnections()
func getRequests( defer fasthttp.ReleaseRequest(request)
ctx context.Context,
response := fasthttp.AcquireResponse()
ch := make(chan error)
go func() {
err := client.DoTimeout(request, response, timeout)
ch <- err
}()
select {
case err := <-ch:
if err != nil {
fasthttp.ReleaseResponse(response)
return nil, err
}
return response, nil
case <-time.After(timeout):
fasthttp.ReleaseResponse(response)
return nil, customerrors.ErrTimeout
case <-ctx.Done():
return nil, customerrors.ErrInterrupt
}
}
// newRequest creates a new Request instance based on the provided configuration and clients.
// It initializes a random number generator using the current time and a unique identifier (uid).
// Depending on the number of clients provided, it sets up a function to select the appropriate client.
// It also sets up a function to generate the request based on the provided configuration.
func newRequest(
requestConfig config.RequestConfig,
clients []*fasthttp.HostClient,
uid int64,
) *Request {
localRand := rand.New(rand.NewSource(time.Now().UnixNano() + uid))
clientsCount := len(clients)
if clientsCount < 1 {
panic("no clients")
}
getClient := ClientGeneratorFunc(nil)
if clientsCount == 1 {
getClient = getSharedClientFuncSingle(clients[0])
} else {
getClient = getSharedClientFuncMultiple(clients)
}
getRequest := getRequestGeneratorFunc(
requestConfig.URL,
requestConfig.Headers,
requestConfig.Cookies,
requestConfig.Params,
requestConfig.Method,
requestConfig.Body,
localRand,
)
requests := &Request{
getClient: getClient,
getRequest: getRequest,
}
return requests
}
// getRequestGeneratorFunc returns a RequestGeneratorFunc which generates HTTP requests
// with the specified parameters.
// The function uses a local random number generator to select bodies, headers, cookies, and parameters
// if multiple options are provided.
func getRequestGeneratorFunc(
URL *url.URL, URL *url.URL,
Headers map[string][]string, Headers map[string][]string,
Cookies map[string][]string, Cookies map[string][]string,
Params map[string][]string, Params map[string][]string,
Method string, Method string,
Bodies []string, Bodies []string,
RequestCount uint, localRand *rand.Rand,
) ([]*fasthttp.Request, error) { ) RequestGeneratorFunc {
requests := make([]*fasthttp.Request, 0, RequestCount)
bodiesLen := len(Bodies) bodiesLen := len(Bodies)
getBody := func() string { return "" } getBody := func() string { return "" }
if bodiesLen == 1 { if bodiesLen == 1 {
getBody = func() string { return Bodies[0] } getBody = func() string { return Bodies[0] }
} else if bodiesLen > 1 { } else if bodiesLen > 1 {
currentIndex := 0 currentIndex := localRand.Intn(bodiesLen)
stopIndex := bodiesLen - 1 stopIndex := bodiesLen - 1
getBody = func() string { getBody = func() string {
body := Bodies[currentIndex%bodiesLen] body := Bodies[currentIndex%bodiesLen]
if currentIndex == stopIndex { if currentIndex == stopIndex {
currentIndex = rand.Intn(bodiesLen) currentIndex = localRand.Intn(bodiesLen)
stopIndex = currentIndex - 1 stopIndex = currentIndex - 1
} else { } else {
currentIndex = (currentIndex + 1) % bodiesLen currentIndex = (currentIndex + 1) % bodiesLen
@ -56,15 +122,12 @@ func getRequests(
return body return body
} }
} }
getHeaders := getKeyValueSetFunc(Headers) getHeaders := getKeyValueSetFunc(Headers, localRand)
getCookies := getKeyValueSetFunc(Cookies) getCookies := getKeyValueSetFunc(Cookies, localRand)
getParams := getKeyValueSetFunc(Params) getParams := getKeyValueSetFunc(Params, localRand)
for range RequestCount { return func() *fasthttp.Request {
if ctx.Err() != nil { return newFasthttpRequest(
return nil, customerrors.ErrInterrupt
}
request := newRequest(
URL, URL,
getHeaders(), getHeaders(),
getCookies(), getCookies(),
@ -72,15 +135,12 @@ func getRequests(
Method, Method,
getBody(), getBody(),
) )
requests = append(requests, request)
} }
return requests, nil
} }
// newRequest creates a new fasthttp.Request object with the provided parameters. // newFasthttpRequest creates a new fasthttp.Request object with the provided parameters.
// It sets the request URI, host header, headers, cookies, params, method, and body. // It sets the request URI, host header, headers, cookies, params, method, and body.
func newRequest( func newFasthttpRequest(
URL *url.URL, URL *url.URL,
Headers map[string]string, Headers map[string]string,
Cookies map[string]string, Cookies map[string]string,
@ -153,7 +213,7 @@ func setRequestBody(req *fasthttp.Request, body string) {
func getKeyValueSetFunc[ func getKeyValueSetFunc[
KeyValueSet map[string][]string, KeyValueSet map[string][]string,
KeyValue map[string]string, KeyValue map[string]string,
](keyValueSet KeyValueSet) func() KeyValue { ](keyValueSet KeyValueSet, localRand *rand.Rand) func() KeyValue {
getKeyValueSlice := []map[string]func() string{} getKeyValueSlice := []map[string]func() string{}
isRandom := false isRandom := false
for key, values := range keyValueSet { for key, values := range keyValueSet {
@ -166,13 +226,13 @@ func getKeyValueSetFunc[
if valuesLen == 1 { if valuesLen == 1 {
getKeyValue = func() string { return values[0] } getKeyValue = func() string { return values[0] }
} else if valuesLen > 1 { } else if valuesLen > 1 {
currentIndex := 0 currentIndex := localRand.Intn(valuesLen)
stopIndex := valuesLen - 1 stopIndex := valuesLen - 1
getKeyValue = func() string { getKeyValue = func() string {
value := values[currentIndex%valuesLen] value := values[currentIndex%valuesLen]
if currentIndex == stopIndex { if currentIndex == stopIndex {
currentIndex = rand.Intn(valuesLen) currentIndex = localRand.Intn(valuesLen)
stopIndex = currentIndex - 1 stopIndex = currentIndex - 1
} else { } else {
currentIndex = (currentIndex + 1) % valuesLen currentIndex = (currentIndex + 1) % valuesLen

View File

@ -11,10 +11,18 @@ import (
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
) )
// Run executes the HTTP requests based on the provided request configuration. // Run executes the main logic for processing requests based on the provided configuration.
// It checks for internet connection and returns an error if there is no connection. // It first checks for an internet connection with a timeout context. If no connection is found,
// If the context is canceled while checking proxies, it returns the ErrInterrupt. // it returns an error. Then, it initializes clients based on the request configuration and
// If the context is canceled while sending requests, it returns the response objects obtained so far. // releases the dodos. If the context is canceled and no responses are collected, it returns an interrupt error.
//
// Parameters:
// - ctx: The context for managing request lifecycle and cancellation.
// - requestConfig: The configuration for the request, including timeout, proxies, and other settings.
//
// Returns:
// - Responses: A collection of responses from the executed requests.
// - error: An error if the operation fails, such as no internet connection or an interrupt.
func Run(ctx context.Context, requestConfig *config.RequestConfig) (Responses, error) { func Run(ctx context.Context, requestConfig *config.RequestConfig) (Responses, error) {
checkConnectionCtx, checkConnectionCtxCancel := context.WithTimeout(ctx, 8*time.Second) checkConnectionCtx, checkConnectionCtxCancel := context.WithTimeout(ctx, 8*time.Second)
if !checkConnection(checkConnectionCtx) { if !checkConnection(checkConnectionCtx) {
@ -23,7 +31,7 @@ func Run(ctx context.Context, requestConfig *config.RequestConfig) (Responses, e
} }
checkConnectionCtxCancel() checkConnectionCtxCancel()
clientDoFunc := getClientDoFunc( clients := getClients(
ctx, ctx,
requestConfig.Timeout, requestConfig.Timeout,
requestConfig.Proxies, requestConfig.Proxies,
@ -32,30 +40,8 @@ func Run(ctx context.Context, requestConfig *config.RequestConfig) (Responses, e
requestConfig.Yes, requestConfig.Yes,
requestConfig.URL, requestConfig.URL,
) )
if clientDoFunc == nil {
return nil, customerrors.ErrInterrupt
}
requests, err := getRequests( responses := releaseDodos(ctx, requestConfig, clients)
ctx,
requestConfig.URL,
requestConfig.Headers,
requestConfig.Cookies,
requestConfig.Params,
requestConfig.Method,
requestConfig.Body,
requestConfig.RequestCount,
)
if err != nil {
return nil, err
}
responses := releaseDodos(
ctx,
requests,
clientDoFunc,
requestConfig.GetValidDodosCountForRequests(),
)
if ctx.Err() != nil && len(responses) == 0 { if ctx.Err() != nil && len(responses) == 0 {
return nil, customerrors.ErrInterrupt return nil, customerrors.ErrInterrupt
} }
@ -63,59 +49,55 @@ func Run(ctx context.Context, requestConfig *config.RequestConfig) (Responses, e
return responses, nil return responses, nil
} }
// releaseDodos sends HTTP requests concurrently using multiple "dodos" (goroutines). // releaseDodos sends requests concurrently using multiple dodos (goroutines) and returns the aggregated responses.
// //
// Parameters: // The function performs the following steps:
// - ctx: The context to control the lifecycle of the requests. // 1. Initializes wait groups and other necessary variables.
// - requests: A slice of HTTP requests to be sent. // 2. Starts a goroutine to stream progress updates.
// - clientDoFunc: A function to execute the HTTP requests. // 3. Distributes the total request count among the dodos.
// - dodosCount: The number of dodos (goroutines) to use for sending the requests. // 4. Starts a goroutine for each dodo to send requests concurrently.
// // 5. Waits for all dodos to complete their requests.
// Returns: // 6. Cancels the progress streaming context and waits for the progress goroutine to finish.
// - A slice of Response objects containing the results of the requests. // 7. Flattens and returns the aggregated responses.
//
// The function divides the requests into equal parts based on the number of dodos.
// It then sends each part concurrently using a separate goroutine.
func releaseDodos( func releaseDodos(
ctx context.Context, ctx context.Context,
requests []*fasthttp.Request, requestConfig *config.RequestConfig,
clientDoFunc ClientDoFunc, clients []*fasthttp.HostClient,
dodosCount uint,
) Responses { ) Responses {
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
streamWG sync.WaitGroup streamWG sync.WaitGroup
requestCountPerDodo uint requestCountPerDodo uint
dodosCount uint = requestConfig.GetValidDodosCountForRequests()
dodosCountInt int = int(dodosCount) dodosCountInt int = int(dodosCount)
totalRequestCount uint = uint(len(requests)) requestCount uint = uint(requestConfig.RequestCount)
requestCount uint = 0
responses = make([][]*Response, dodosCount) responses = make([][]*Response, dodosCount)
increase = make(chan int64, totalRequestCount) increase = make(chan int64, requestCount)
) )
wg.Add(dodosCountInt) wg.Add(dodosCountInt)
streamWG.Add(1) streamWG.Add(1)
streamCtx, streamCtxCancel := context.WithCancel(context.Background()) streamCtx, streamCtxCancel := context.WithCancel(context.Background())
go streamProgress(streamCtx, &streamWG, int64(totalRequestCount), "Dodos Working🔥", increase) go streamProgress(streamCtx, &streamWG, int64(requestCount), "Dodos Working🔥", increase)
for i := range dodosCount { for i := range dodosCount {
if i+1 == dodosCount { if i+1 == dodosCount {
requestCountPerDodo = totalRequestCount - (i * totalRequestCount / dodosCount) requestCountPerDodo = requestCount - (i * requestCount / dodosCount)
} else { } else {
requestCountPerDodo = ((i + 1) * totalRequestCount / dodosCount) - requestCountPerDodo = ((i + 1) * requestCount / dodosCount) -
(i * totalRequestCount / dodosCount) (i * requestCount / dodosCount)
} }
go sendRequest( go sendRequest(
ctx, ctx,
requests[requestCount:requestCount+requestCountPerDodo], newRequest(*requestConfig, clients, int64(i)),
requestConfig.Timeout,
requestCountPerDodo,
&responses[i], &responses[i],
increase, increase,
clientDoFunc,
&wg, &wg,
) )
requestCount += requestCountPerDodo
} }
wg.Wait() wg.Wait()
streamCtxCancel() streamCtxCancel()
@ -123,40 +105,33 @@ func releaseDodos(
return utils.Flatten(responses) return utils.Flatten(responses)
} }
// sendRequest sends multiple HTTP requests concurrently and collects their responses. // sendRequest sends a specified number of HTTP requests concurrently with a given timeout.
// // It appends the responses to the provided responseData slice and sends the count of completed requests
// Parameters: // to the increase channel. The function terminates early if the context is canceled or if a custom
// - ctx: The context to control cancellation and timeout. // interrupt error is encountered.
// - requests: A slice of pointers to fasthttp.Request objects to be sent.
// - responseData: A pointer to a slice of *Response objects to store the results.
// - increase: A channel to signal the completion of each request.
// - clientDo: A function to execute the HTTP request.
// - wg: A wait group to synchronize the completion of the requests.
//
// The function iterates over the provided requests, sending each one using the clientDo function.
// It measures the time taken for each request and appends the response data to responseData.
// If an error occurs, it appends an error response. The function signals completion through the increase channel
// and ensures proper resource cleanup by releasing requests and responses.
func sendRequest( func sendRequest(
ctx context.Context, ctx context.Context,
requests []*fasthttp.Request, request *Request,
timeout time.Duration,
requestCount uint,
responseData *[]*Response, responseData *[]*Response,
increase chan<- int64, increase chan<- int64,
clientDo ClientDoFunc,
wg *sync.WaitGroup, wg *sync.WaitGroup,
) { ) {
defer wg.Done() defer wg.Done()
for _, request := range requests { for range requestCount {
if ctx.Err() != nil { if ctx.Err() != nil {
return return
} }
func() { func() {
defer fasthttp.ReleaseRequest(request)
startTime := time.Now() startTime := time.Now()
response, err := clientDo(ctx, request) response, err := request.Send(ctx, timeout)
completedTime := time.Since(startTime) completedTime := time.Since(startTime)
if response != nil {
defer fasthttp.ReleaseResponse(response)
}
if err != nil { if err != nil {
if err == customerrors.ErrInterrupt { if err == customerrors.ErrInterrupt {
@ -170,7 +145,6 @@ func sendRequest(
increase <- 1 increase <- 1
return return
} }
defer fasthttp.ReleaseResponse(response)
*responseData = append(*responseData, &Response{ *responseData = append(*responseData, &Response{
StatusCode: response.StatusCode(), StatusCode: response.StatusCode(),