package requests import ( "context" "strconv" "sync" "time" "github.com/aykhans/dodo/config" "github.com/aykhans/dodo/types" "github.com/aykhans/dodo/utils" "github.com/valyala/fasthttp" ) // Run executes the main logic for processing requests based on the provided configuration. // It initializes clients based on the request configuration and releases the dodos. // If the context is canceled and no responses are collected, it returns an interrupt error. // // Parameters: // - ctx: The context for managing request lifecycle and cancellation. // - requestConfig: The configuration for the request, including timeout, proxies, and other settings. func Run(ctx context.Context, requestConfig *config.RequestConfig) (Responses, error) { clients := getClients( ctx, requestConfig.Timeout, requestConfig.Proxies, requestConfig.GetMaxConns(fasthttp.DefaultMaxConnsPerHost), requestConfig.URL, ) if clients == nil { return nil, types.ErrInterrupt } responses := releaseDodos(ctx, requestConfig, clients) if ctx.Err() != nil && len(responses) == 0 { return nil, types.ErrInterrupt } return responses, nil } // releaseDodos sends requests concurrently using multiple dodos (goroutines) and returns the aggregated responses. // // The function performs the following steps: // 1. Initializes wait groups and other necessary variables. // 2. Starts a goroutine to stream progress updates. // 3. Distributes the total request count among the dodos. // 4. Starts a goroutine for each dodo to send requests concurrently. // 5. Waits for all dodos to complete their requests. // 6. Cancels the progress streaming context and waits for the progress goroutine to finish. // 7. Flattens and returns the aggregated responses. func releaseDodos( ctx context.Context, requestConfig *config.RequestConfig, clients []*fasthttp.HostClient, ) Responses { var ( wg sync.WaitGroup streamWG sync.WaitGroup requestCountPerDodo uint dodosCount uint = requestConfig.GetValidDodosCountForRequests() dodosCountInt int = int(dodosCount) responses = make([][]*Response, dodosCount) increase = make(chan int64, requestConfig.RequestCount) ) wg.Add(dodosCountInt) streamWG.Add(1) streamCtx, streamCtxCancel := context.WithCancel(context.Background()) go streamProgress(streamCtx, &streamWG, int64(requestConfig.RequestCount), "Dodos Working🔥", increase) for i := range dodosCount { if i+1 == dodosCount { requestCountPerDodo = requestConfig.RequestCount - (i * requestConfig.RequestCount / dodosCount) } else { requestCountPerDodo = ((i + 1) * requestConfig.RequestCount / dodosCount) - (i * requestConfig.RequestCount / dodosCount) } go sendRequest( ctx, newRequest(*requestConfig, clients, int64(i)), requestConfig.Timeout, requestCountPerDodo, &responses[i], increase, &wg, ) } wg.Wait() streamCtxCancel() streamWG.Wait() return utils.Flatten(responses) } // sendRequest sends a specified number of HTTP requests concurrently with a given timeout. // It appends the responses to the provided responseData slice and sends the count of completed requests // to the increase channel. The function terminates early if the context is canceled or if a custom // interrupt error is encountered. func sendRequest( ctx context.Context, request *Request, timeout time.Duration, requestCount uint, responseData *[]*Response, increase chan<- int64, wg *sync.WaitGroup, ) { defer wg.Done() for range requestCount { if ctx.Err() != nil { return } func() { startTime := time.Now() response, err := request.Send(ctx, timeout) completedTime := time.Since(startTime) if response != nil { defer fasthttp.ReleaseResponse(response) } if err != nil { if err == types.ErrInterrupt { return } *responseData = append(*responseData, &Response{ Response: err.Error(), Time: completedTime, }) increase <- 1 return } *responseData = append(*responseData, &Response{ Response: strconv.Itoa(response.StatusCode()), Time: completedTime, }) increase <- 1 }() } }