🔨 refactor request timeout logic

- Move the responsibility of the response object to the client function.
- Add timeout functionality to client function
This commit is contained in:
Aykhan Shahsuvarov 2024-08-30 00:12:37 +04:00
parent b91c32d990
commit 8539acf5c9

View File

@ -27,7 +27,7 @@ type Response struct {
type Responses []Response type Responses []Response
type ClientFunc func() *fasthttp.HostClient type ClientDoFunc func(ctx context.Context, request *fasthttp.Request) (*fasthttp.Response, error)
// Print prints the responses in a tabular format, including information such as // Print prints the responses in a tabular format, including information such as
// response count, minimum time, maximum time, and average time. // response count, minimum time, maximum time, and average time.
@ -105,14 +105,14 @@ func Run(ctx context.Context, requestConfig *config.RequestConfig) (Responses, e
} }
checkConnectionCtxCancel() checkConnectionCtxCancel()
clientFunc := getClientFunc( clientDoFunc := getClientDoFunc(
ctx, ctx,
requestConfig.Timeout, requestConfig.Timeout,
requestConfig.Proxies, requestConfig.Proxies,
requestConfig.GetValidDodosCountForProxies(), requestConfig.GetValidDodosCountForProxies(),
requestConfig.URL, requestConfig.URL,
) )
if clientFunc == nil { if clientDoFunc == nil {
return nil, customerrors.ErrInterrupt return nil, customerrors.ErrInterrupt
} }
@ -128,8 +128,7 @@ func Run(ctx context.Context, requestConfig *config.RequestConfig) (Responses, e
responses := releaseDodos( responses := releaseDodos(
ctx, ctx,
request, request,
requestConfig.Timeout, clientDoFunc,
clientFunc,
requestConfig.GetValidDodosCountForRequests(), requestConfig.GetValidDodosCountForRequests(),
requestConfig.RequestCount, requestConfig.RequestCount,
) )
@ -141,14 +140,13 @@ func Run(ctx context.Context, requestConfig *config.RequestConfig) (Responses, e
} }
// releaseDodos sends multiple HTTP requests concurrently using multiple "dodos" (goroutines). // releaseDodos sends multiple HTTP requests concurrently using multiple "dodos" (goroutines).
// It takes a mainRequest as the base request, timeout duration for each request, clientFunc for customizing the client behavior, // It takes a mainRequest as the base request, timeout duration for each request, clientDoFunc for customizing the client behavior,
// dodosCount as the number of goroutines to be used, and requestCount as the total number of requests to be sent. // dodosCount as the number of goroutines to be used, and requestCount as the total number of requests to be sent.
// It returns the responses received from all the requests. // It returns the responses received from all the requests.
func releaseDodos( func releaseDodos(
ctx context.Context, ctx context.Context,
mainRequest *fasthttp.Request, mainRequest *fasthttp.Request,
timeout time.Duration, clientDoFunc ClientDoFunc,
clientFunc ClientFunc,
dodosCount int, dodosCount int,
requestCount int, requestCount int,
) Responses { ) Responses {
@ -180,11 +178,10 @@ func releaseDodos(
go sendRequest( go sendRequest(
ctx, ctx,
dodoSpecificRequest, dodoSpecificRequest,
timeout,
&responses[i], &responses[i],
&countSlice[i], &countSlice[i],
requestCountPerDodo, requestCountPerDodo,
clientFunc, clientDoFunc,
&wg, &wg,
) )
} }
@ -194,18 +191,22 @@ func releaseDodos(
return utils.Flatten(responses) return utils.Flatten(responses)
} }
// sendRequest sends multiple requests concurrently using the provided parameters. // sendRequest sends multiple HTTP requests concurrently using the provided clientDo function.
// It releases the request and response object and marks the completion of the wait group after each request. // It takes a context, a request, a slice to store the response data, a counter to keep track of the number of requests,
// For each request, it acquires a response object, gets a client, and measures the time taken to complete the request. // the total number of requests to be sent, a clientDo function to execute the requests,
// If an error occurs during the request, the error is recorded in the responseData slice. // and a wait group to synchronize the goroutines.
// It releases the request and decrements the wait group counter when done.
// For each request, it checks if the context has been canceled and returns if so.
// It measures the time it takes to complete each request and appends the response data to the responseData slice.
// If an error occurs during the request, it appends a response with a status code of 0 and the error to the responseData slice.
// Otherwise, it appends a response with the actual status code and nil error to the responseData slice.
func sendRequest( func sendRequest(
ctx context.Context, ctx context.Context,
request *fasthttp.Request, request *fasthttp.Request,
timeout time.Duration,
responseData *[]Response, responseData *[]Response,
counter *int, counter *int,
requestCount int, requestCount int,
getClient ClientFunc, clientDo ClientDoFunc,
wg *sync.WaitGroup, wg *sync.WaitGroup,
) { ) {
defer fasthttp.ReleaseRequest(request) defer fasthttp.ReleaseRequest(request)
@ -219,11 +220,8 @@ func sendRequest(
func() { func() {
defer func() { *counter++ }() defer func() { *counter++ }()
response := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(response)
client := getClient()
startTime := time.Now() startTime := time.Now()
err := client.DoTimeout(request, response, timeout) response, err := clientDo(ctx, request)
completedTime := time.Since(startTime) completedTime := time.Since(startTime)
if err != nil { if err != nil {
@ -234,7 +232,8 @@ func sendRequest(
}) })
return return
} }
defer fasthttp.ReleaseResponse(response)
fmt.Println(string(response.Body()))
*responseData = append(*responseData, Response{ *responseData = append(*responseData, Response{
StatusCode: response.StatusCode(), StatusCode: response.StatusCode(),
Error: nil, Error: nil,
@ -244,18 +243,25 @@ func sendRequest(
} }
} }
// getClientFunc returns a ClientFunc based on the provided parameters. // getClientDoFunc returns a ClientDoFunc function that can be used to make HTTP requests.
// If there are proxies available, it checks for active proxies and prompts the user to continue. //
// If there are no active proxies, it asks the user if they want to continue. // The function first checks if there are any proxies available. If there are, it retrieves the active proxy clients
// If the user chooses to continue, it returns a ClientFunc with a shared client or a randomized client. // using the getActiveProxyClients function. If the context is canceled during this process, it returns nil.
// If there are no proxies available, it returns a ClientFunc with a shared client. // It then checks the number of active proxy clients and prompts the user to continue if there are none.
func getClientFunc( // If the user chooses to continue, it creates a fasthttp.HostClient with the appropriate settings and returns
// a ClientDoFunc function using the getSharedClientDoFunc function.
// If there is only one active proxy client, it uses that client to create the ClientDoFunc function.
// If there are multiple active proxy clients, it uses the getSharedRandomClientDoFunc function to create the ClientDoFunc function.
//
// If there are no proxies available, it creates a fasthttp.HostClient with the appropriate settings and returns
// a ClientDoFunc function using the getSharedClientDoFunc function.
func getClientDoFunc(
ctx context.Context, ctx context.Context,
timeout time.Duration, timeout time.Duration,
proxies []config.Proxy, proxies []config.Proxy,
dodosCount int, dodosCount int,
URL *url.URL, URL *url.URL,
) ClientFunc { ) ClientDoFunc {
isTLS := URL.Scheme == "https" isTLS := URL.Scheme == "https"
if len(proxies) > 0 { if len(proxies) > 0 {
activeProxyClients := getActiveProxyClients( activeProxyClients := getActiveProxyClients(
@ -295,12 +301,16 @@ func getClientFunc(
WriteTimeout: timeout, WriteTimeout: timeout,
ReadTimeout: timeout, ReadTimeout: timeout,
} }
return getSharedClientFunc(client) return getSharedClientDoFunc(client, timeout)
} else if activeProxyClientsCount == 1 { } else if activeProxyClientsCount == 1 {
client := &activeProxyClients[0] client := &activeProxyClients[0]
return getSharedClientFunc(client) return getSharedClientDoFunc(client, timeout)
} }
return getRandomizedClientFunc(activeProxyClients, activeProxyClientsCount) return getSharedRandomClientDoFunc(
activeProxyClients,
activeProxyClientsCount,
timeout,
)
} }
client := &fasthttp.HostClient{ client := &fasthttp.HostClient{
@ -311,7 +321,7 @@ func getClientFunc(
WriteTimeout: timeout, WriteTimeout: timeout,
ReadTimeout: timeout, ReadTimeout: timeout,
} }
return getSharedClientFunc(client) return getSharedClientDoFunc(client, timeout)
} }
// getActiveProxyClients divides the proxies into slices based on the number of dodos and // getActiveProxyClients divides the proxies into slices based on the number of dodos and
@ -363,10 +373,14 @@ func getActiveProxyClients(
return utils.Flatten(activeProxyClientsArray) return utils.Flatten(activeProxyClientsArray)
} }
// findActiveProxyClients finds the active proxy clients by sending a GET request to each proxy in the given slice. // findActiveProxyClients is a function that finds active proxy clients by sending HTTP GET requests to a list of proxies.
// The function runs each request in a separate goroutine and updates the activeProxyClients slice with the active proxy clients. // It takes a context.Context, a slice of config.Proxy, a time.Duration for the timeout, a pointer to a slice of fasthttp.HostClient to store the active proxy clients,
// It also increments the count for each successful request. // a pointer to an int to keep track of the count, a pointer to a url.URL for the URL to send the requests to, and a pointer to a sync.WaitGroup to synchronize the goroutines.
// The function is designed to be used as a concurrent operation, and it uses the WaitGroup to wait for all goroutines to finish. // It sends GET requests to each proxy in the given list and checks if the response status code is 200.
// If the context is canceled, the function returns immediately.
// The active proxy clients that pass the check are added to the provided slice of fasthttp.HostClient.
// The function is designed to be run concurrently using goroutines and the sync.WaitGroup is used to wait for all goroutines to finish.
// The function is responsible for releasing acquired resources and closing idle connections.
func findActiveProxyClients( func findActiveProxyClients(
ctx context.Context, ctx context.Context,
proxies []config.Proxy, proxies []config.Proxy,
@ -401,10 +415,29 @@ func findActiveProxyClients(
client := &fasthttp.Client{ client := &fasthttp.Client{
Dial: dialFunc, Dial: dialFunc,
} }
err = client.DoTimeout(request, response, timeout) defer client.CloseIdleConnections()
startTime := time.Now()
ch := make(chan error)
go func() {
err := client.DoTimeout(request, response, timeout)
ch <- err
}()
select {
case err := <-ch:
if err != nil { if err != nil {
fmt.Println(time.Since(startTime))
return return
} }
break
case <-time.After(timeout):
fmt.Println(time.Since(startTime))
return
case <-ctx.Done():
fmt.Println(time.Since(startTime))
return
}
fmt.Println(time.Since(startTime))
isTLS := URL.Scheme == "https" isTLS := URL.Scheme == "https"
addr := URL.Host addr := URL.Host
@ -482,23 +515,74 @@ func getDialFunc(proxy *config.Proxy, timeout time.Duration) (fasthttp.DialFunc,
return dialer, nil return dialer, nil
} }
// getRandomizedClientFunc returns a ClientFunc that randomly selects a HostClient from the given list of clients. // getSharedRandomClientDoFunc is equivalent to getSharedClientDoFunc but uses a random client from the provided slice.
// The clientsCount parameter specifies the number of clients in the slice. func getSharedRandomClientDoFunc(
// The returned ClientFunc can be used to share the same client instance across multiple goroutines.
func getRandomizedClientFunc(
clients []fasthttp.HostClient, clients []fasthttp.HostClient,
clientsCount int, clientsCount int,
) ClientFunc { timeout time.Duration,
return func() *fasthttp.HostClient { ) ClientDoFunc {
return &clients[rand.Intn(clientsCount)] return func (ctx context.Context, request *fasthttp.Request) (*fasthttp.Response, error) {
client := &clients[rand.Intn(clientsCount)]
defer client.CloseIdleConnections()
response := fasthttp.AcquireResponse()
ch := make(chan error)
go func() {
err := client.DoTimeout(request, response, timeout)
ch <- err
}()
select {
case err := <-ch:
if err != nil {
fasthttp.ReleaseResponse(response)
return nil, err
}
return response, nil
case <-time.After(timeout):
fasthttp.ReleaseResponse(response)
return nil, customerrors.ErrTimeout
case <-ctx.Done():
return nil, customerrors.ErrInterrupt
}
} }
} }
// getSharedClientFunc returns a ClientFunc that returns the provided client. // getSharedClientDoFunc is a function that returns a ClientDoFunc, which is a function type used for making HTTP requests using a shared client.
// The returned ClientFunc can be used to share the same client instance across multiple goroutines. // It takes a client of type *fasthttp.HostClient and a timeout of type time.Duration as input parameters.
func getSharedClientFunc(client *fasthttp.HostClient) ClientFunc { // The returned ClientDoFunc function can be used to make an HTTP request with the given client and timeout.
return func() *fasthttp.HostClient { // It takes a context.Context and a *fasthttp.Request as input parameters and returns a *fasthttp.Response and an error.
return client // The function internally creates a new response using fasthttp.AcquireResponse() and a channel to handle errors.
// It then spawns a goroutine to execute the client.DoTimeout() method with the given request, response, and timeout.
// The function uses a select statement to handle three cases:
// - If an error is received from the channel, it checks if the error is not nil. If it's not nil, it releases the response and returns nil and the error.
// Otherwise, it returns the response and nil.
// - If the timeout duration is reached, it releases the response and returns nil and a custom timeout error.
// - If the context is canceled, it returns nil and a custom interrupt error.
// The function ensures that idle connections are closed by calling client.CloseIdleConnections() using a defer statement.
func getSharedClientDoFunc(
client *fasthttp.HostClient,
timeout time.Duration,
) ClientDoFunc {
return func (ctx context.Context, request *fasthttp.Request) (*fasthttp.Response, error) {
defer client.CloseIdleConnections()
response := fasthttp.AcquireResponse()
ch := make(chan error)
go func() {
err := client.DoTimeout(request, response, timeout)
ch <- err
}()
select {
case err := <-ch:
if err != nil {
fasthttp.ReleaseResponse(response)
return nil, err
}
return response, nil
case <-time.After(timeout):
fasthttp.ReleaseResponse(response)
return nil, customerrors.ErrTimeout
case <-ctx.Done():
return nil, customerrors.ErrInterrupt
}
} }
} }