2024-07-11 02:32:31 +04:00

228 lines
7.2 KiB
Go

package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/IBM/sarama"
)
var (
kafkaBootstrapServers = []string{"localhost:9092"}
kafkaTopic = "wikimedia.recentchange"
wikimediaStreamURL = "https://stream.wikimedia.org/v2/stream/recentchange"
)
// WikiData represents the structure of the data received from Wikimedia.
type WikiData struct {
Schema string `json:"$schema"`
ID int `json:"id"`
Type string `json:"type"`
Namespace int `json:"namespace"`
Title string `json:"title"`
TitleURL string `json:"title_url"`
Comment string `json:"comment"`
Timestamp int `json:"timestamp"`
User string `json:"user"`
Bot bool `json:"bot"`
NotifyURL string `json:"notify_url"`
Minor bool `json:"minor"`
Patrolled bool `json:"patrolled"`
ServerURL string `json:"server_url"`
ServerName string `json:"server_name"`
ServerScriptPath string `json:"server_script_path"`
Wiki string `json:"wiki"`
ParsedComment string `json:"parsedcomment"`
Meta struct {
URI string `json:"uri"`
RequestID string `json:"request_id"`
ID string `json:"id"`
DT string `json:"dt"`
Domain string `json:"domain"`
Stream string `json:"stream"`
Topic string `json:"topic"`
Partition int `json:"partition"`
Offset int `json:"offset"`
} `json:"meta"`
Length struct {
Old int `json:"old"`
New int `json:"new"`
} `json:"length"`
Revision struct {
Old int `json:"old"`
New int `json:"new"`
} `json:"revision"`
}
func (wikiData *WikiData) Marshal() []byte {
data, _ := json.Marshal(wikiData)
return data
}
// IMessageHandler is an interface that defines the methods for handling messages.
type IMessageHandler interface {
// Setup is called to set up any necessary resources before starting message handling.
Setup()
// Cleanup is called to clean up any resources after message handling is complete.
Cleanup()
// OnMessage is called when a new message is received.
// It takes a pointer to a WikiData object as a parameter.
OnMessage(wikiData *WikiData)
}
type kafkaMessageHandler struct {
producer sarama.SyncProducer
}
// Setup initializes the Kafka message handler by creating a new producer.
func (h *kafkaMessageHandler) Setup() {
h.producer = newProducer()
}
// Cleanup closes the Kafka producer and performs any necessary cleanup operations.
func (h *kafkaMessageHandler) Cleanup() {
if err := h.producer.Close(); err != nil {
log.Fatalf("Failed to close Kafka producer: %v", err)
}
}
// OnMessage is a method that handles incoming WikiData messages.
// It sends the message to a Kafka topic and logs the result.
func (h *kafkaMessageHandler) OnMessage(wikiData *WikiData) {
message := &sarama.ProducerMessage{
Topic: kafkaTopic,
Value: sarama.StringEncoder(wikiData.Marshal()),
}
partition, offset, err := h.producer.SendMessage(message)
if err != nil {
log.Fatalf("Failed to send message: %v", err)
}
log.Printf("Message is stored in: topic(%s) - partition(%d) - offset(%d)\n", kafkaTopic, partition, offset)
}
// newProducer creates a new instance of a Kafka producer with the specified configuration.
// It returns a sarama.SyncProducer that can be used to send messages to Kafka topics.
func newProducer() sarama.SyncProducer {
config := sarama.NewConfig()
config.Producer.Return.Successes = true // enable message delivery reports
config.Producer.RequiredAcks = sarama.WaitForAll // require all in-sync replicas to acknowledge the message
config.Producer.Retry.Max = 5 // number of retries before giving up on sending a message to a partition
config.Producer.Retry.Backoff = time.Second * 60 // time to wait between retries
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner // walks through the available partitions one at a time
config.Producer.Compression = sarama.CompressionSnappy // compress messages using Snappy
config.Producer.Idempotent = true // producer will ensure that messages are successfully sent and acknowledged
// linger.ms
config.Producer.Flush.Frequency = time.Millisecond * 20 // time to wait before sending a batch of messages
// batch.size
config.Producer.Flush.Bytes = 32 * 1024 // number of bytes to trigger a batch of messages
config.Net.MaxOpenRequests = 1
producer, err := sarama.NewSyncProducer(kafkaBootstrapServers, config)
if err != nil {
log.Fatalf("Failed to start Kafka producer: %v", err)
}
return producer
}
// WikimediaEventHandler connects to the Wikimedia stream and handles incoming events.
// It takes an IMessageHandler as a parameter, which is responsible for setting up and cleaning up the message handling logic.
// The function reads events from the stream, parses the JSON data, and passes it to the message handler.
// It also counts the number of messages processed and prints the total count at the end.
func WikimediaEventHandler(messageHandler IMessageHandler) {
// Connect to the Wikimedia stream
resp, err := http.Get(wikimediaStreamURL)
if err != nil {
log.Fatalf("Failed to connect to SSE endpoint: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Fatalf("Failed to connect to SSE endpoint: %s", resp.Status)
}
reader := bufio.NewReader(resp.Body)
// Read the initial response from the stream to confirm the connection
line, err := reader.ReadString('\n')
if err != nil {
log.Fatalf("Failed to read from SSE endpoint: %v", err)
}
line = strings.TrimSpace(line)
if line != ":ok" {
log.Fatalf("Failed to connect to SSE endpoint: %s", line)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle SIGINT and SIGTERM signals to gracefully shut down the producer
go func() {
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
<-sigterm
cancel()
}()
messageHandler.Setup()
defer messageHandler.Cleanup()
messageCnt := 0
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Println("Producer is shutting down")
return
default:
line, err := reader.ReadString('\n')
if err != nil {
log.Fatalf("Failed to read from SSE endpoint: %v", err)
}
// Trim leading and trailing whitespace from the line and ignore empty lines
line = strings.TrimSpace(line)
if len(line) == 0 {
continue
}
wikiData := &WikiData{}
switch {
// Check for the event type and ignore any other events except "message"
case strings.HasPrefix(line, "event: "):
if line != "event: message" {
log.Fatalf("Failed to read from SSE endpoint: %s", line)
}
// Parse the JSON data and pass it to the message handler
case strings.HasPrefix(line, "data: "):
err = json.Unmarshal([]byte(line[6:]), &wikiData)
if err != nil {
log.Fatalf("Failed to unmarshal JSON: %v", err)
}
messageHandler.OnMessage(wikiData)
messageCnt++
time.Sleep(1 * time.Second)
}
}
}
}()
wg.Wait()
fmt.Printf("Total messages: %d\n", messageCnt)
}
func main() {
WikimediaEventHandler(&kafkaMessageHandler{})
}