dodo/requests/run.go
Aykhan Shahsuvarov 00f0bcb2de 🔨 Restructure entire project logic
- Moved readers to the config package
- Added an option to read remote config files
- Moved the validation package to the config package and removed the validator dependency
- Moved the customerrors package to the config package
- Replaced fatih/color with jedib0t/go-pretty/v6/text
- Removed proxy check functionality
- Added param, header, cookie, body, and proxy flags to the CLI
- Allowed multiple values for the same key in params, headers, and cookies
2025-03-16 21:20:33 +04:00

146 lines
4.1 KiB
Go

package requests
import (
"context"
"strconv"
"sync"
"time"
"github.com/aykhans/dodo/config"
"github.com/aykhans/dodo/types"
"github.com/aykhans/dodo/utils"
"github.com/valyala/fasthttp"
)
// Run executes the main logic for processing requests based on the provided configuration.
// It initializes clients based on the request configuration and releases the dodos.
// If the context is canceled and no responses are collected, it returns an interrupt error.
//
// Parameters:
// - ctx: The context for managing request lifecycle and cancellation.
// - requestConfig: The configuration for the request, including timeout, proxies, and other settings.
func Run(ctx context.Context, requestConfig *config.RequestConfig) (Responses, error) {
clients := getClients(
ctx,
requestConfig.Timeout,
requestConfig.Proxies,
requestConfig.GetMaxConns(fasthttp.DefaultMaxConnsPerHost),
requestConfig.URL,
)
if clients == nil {
return nil, types.ErrInterrupt
}
responses := releaseDodos(ctx, requestConfig, clients)
if ctx.Err() != nil && len(responses) == 0 {
return nil, types.ErrInterrupt
}
return responses, nil
}
// releaseDodos sends requests concurrently using multiple dodos (goroutines) and returns the aggregated responses.
//
// The function performs the following steps:
// 1. Initializes wait groups and other necessary variables.
// 2. Starts a goroutine to stream progress updates.
// 3. Distributes the total request count among the dodos.
// 4. Starts a goroutine for each dodo to send requests concurrently.
// 5. Waits for all dodos to complete their requests.
// 6. Cancels the progress streaming context and waits for the progress goroutine to finish.
// 7. Flattens and returns the aggregated responses.
func releaseDodos(
ctx context.Context,
requestConfig *config.RequestConfig,
clients []*fasthttp.HostClient,
) Responses {
var (
wg sync.WaitGroup
streamWG sync.WaitGroup
requestCountPerDodo uint
dodosCount uint = requestConfig.GetValidDodosCountForRequests()
dodosCountInt int = int(dodosCount)
requestCount uint = requestConfig.RequestCount
responses = make([][]*Response, dodosCount)
increase = make(chan int64, requestCount)
)
wg.Add(dodosCountInt)
streamWG.Add(1)
streamCtx, streamCtxCancel := context.WithCancel(context.Background())
go streamProgress(streamCtx, &streamWG, int64(requestCount), "Dodos Working🔥", increase)
for i := range dodosCount {
if i+1 == dodosCount {
requestCountPerDodo = requestCount - (i * requestCount / dodosCount)
} else {
requestCountPerDodo = ((i + 1) * requestCount / dodosCount) -
(i * requestCount / dodosCount)
}
go sendRequest(
ctx,
newRequest(*requestConfig, clients, int64(i)),
requestConfig.Timeout,
requestCountPerDodo,
&responses[i],
increase,
&wg,
)
}
wg.Wait()
streamCtxCancel()
streamWG.Wait()
return utils.Flatten(responses)
}
// sendRequest sends a specified number of HTTP requests concurrently with a given timeout.
// It appends the responses to the provided responseData slice and sends the count of completed requests
// to the increase channel. The function terminates early if the context is canceled or if a custom
// interrupt error is encountered.
func sendRequest(
ctx context.Context,
request *Request,
timeout time.Duration,
requestCount uint,
responseData *[]*Response,
increase chan<- int64,
wg *sync.WaitGroup,
) {
defer wg.Done()
for range requestCount {
if ctx.Err() != nil {
return
}
func() {
startTime := time.Now()
response, err := request.Send(ctx, timeout)
completedTime := time.Since(startTime)
if response != nil {
defer fasthttp.ReleaseResponse(response)
}
if err != nil {
if err == types.ErrInterrupt {
return
}
*responseData = append(*responseData, &Response{
Response: err.Error(),
Time: completedTime,
})
increase <- 1
return
}
*responseData = append(*responseData, &Response{
Response: strconv.Itoa(response.StatusCode()),
Time: completedTime,
})
increase <- 1
}()
}
}