mirror of
https://github.com/aykhans/kafka-wikimedia-go.git
synced 2025-04-16 23:33:12 +00:00
228 lines
7.2 KiB
Go
228 lines
7.2 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/IBM/sarama"
|
|
)
|
|
|
|
var (
|
|
kafkaBootstrapServers = []string{"localhost:9092"}
|
|
kafkaTopic = "wikimedia.recentchange"
|
|
wikimediaStreamURL = "https://stream.wikimedia.org/v2/stream/recentchange"
|
|
)
|
|
|
|
// WikiData represents the structure of the data received from Wikimedia.
|
|
type WikiData struct {
|
|
Schema string `json:"$schema"`
|
|
ID int `json:"id"`
|
|
Type string `json:"type"`
|
|
Namespace int `json:"namespace"`
|
|
Title string `json:"title"`
|
|
TitleURL string `json:"title_url"`
|
|
Comment string `json:"comment"`
|
|
Timestamp int `json:"timestamp"`
|
|
User string `json:"user"`
|
|
Bot bool `json:"bot"`
|
|
NotifyURL string `json:"notify_url"`
|
|
Minor bool `json:"minor"`
|
|
Patrolled bool `json:"patrolled"`
|
|
ServerURL string `json:"server_url"`
|
|
ServerName string `json:"server_name"`
|
|
ServerScriptPath string `json:"server_script_path"`
|
|
Wiki string `json:"wiki"`
|
|
ParsedComment string `json:"parsedcomment"`
|
|
Meta struct {
|
|
URI string `json:"uri"`
|
|
RequestID string `json:"request_id"`
|
|
ID string `json:"id"`
|
|
DT string `json:"dt"`
|
|
Domain string `json:"domain"`
|
|
Stream string `json:"stream"`
|
|
Topic string `json:"topic"`
|
|
Partition int `json:"partition"`
|
|
Offset int `json:"offset"`
|
|
} `json:"meta"`
|
|
Length struct {
|
|
Old int `json:"old"`
|
|
New int `json:"new"`
|
|
} `json:"length"`
|
|
Revision struct {
|
|
Old int `json:"old"`
|
|
New int `json:"new"`
|
|
} `json:"revision"`
|
|
}
|
|
|
|
func (wikiData *WikiData) Marshal() []byte {
|
|
data, _ := json.Marshal(wikiData)
|
|
return data
|
|
}
|
|
|
|
// IMessageHandler is an interface that defines the methods for handling messages.
|
|
type IMessageHandler interface {
|
|
// Setup is called to set up any necessary resources before starting message handling.
|
|
Setup()
|
|
|
|
// Cleanup is called to clean up any resources after message handling is complete.
|
|
Cleanup()
|
|
|
|
// OnMessage is called when a new message is received.
|
|
// It takes a pointer to a WikiData object as a parameter.
|
|
OnMessage(wikiData *WikiData)
|
|
}
|
|
|
|
type kafkaMessageHandler struct {
|
|
producer sarama.SyncProducer
|
|
}
|
|
|
|
// Setup initializes the Kafka message handler by creating a new producer.
|
|
func (h *kafkaMessageHandler) Setup() {
|
|
h.producer = newProducer()
|
|
}
|
|
|
|
// Cleanup closes the Kafka producer and performs any necessary cleanup operations.
|
|
func (h *kafkaMessageHandler) Cleanup() {
|
|
if err := h.producer.Close(); err != nil {
|
|
log.Fatalf("Failed to close Kafka producer: %v", err)
|
|
}
|
|
}
|
|
|
|
// OnMessage is a method that handles incoming WikiData messages.
|
|
// It sends the message to a Kafka topic and logs the result.
|
|
func (h *kafkaMessageHandler) OnMessage(wikiData *WikiData) {
|
|
message := &sarama.ProducerMessage{
|
|
Topic: kafkaTopic,
|
|
Value: sarama.StringEncoder(wikiData.Marshal()),
|
|
}
|
|
partition, offset, err := h.producer.SendMessage(message)
|
|
if err != nil {
|
|
log.Fatalf("Failed to send message: %v", err)
|
|
}
|
|
log.Printf("Message is stored in: topic(%s) - partition(%d) - offset(%d)\n", kafkaTopic, partition, offset)
|
|
}
|
|
|
|
// newProducer creates a new instance of a Kafka producer with the specified configuration.
|
|
// It returns a sarama.SyncProducer that can be used to send messages to Kafka topics.
|
|
func newProducer() sarama.SyncProducer {
|
|
config := sarama.NewConfig()
|
|
config.Producer.Return.Successes = true // enable message delivery reports
|
|
config.Producer.RequiredAcks = sarama.WaitForAll // require all in-sync replicas to acknowledge the message
|
|
config.Producer.Retry.Max = 5 // number of retries before giving up on sending a message to a partition
|
|
config.Producer.Retry.Backoff = time.Second * 60 // time to wait between retries
|
|
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner // walks through the available partitions one at a time
|
|
config.Producer.Compression = sarama.CompressionSnappy // compress messages using Snappy
|
|
config.Producer.Idempotent = true // producer will ensure that messages are successfully sent and acknowledged
|
|
// linger.ms
|
|
config.Producer.Flush.Frequency = time.Millisecond * 20 // time to wait before sending a batch of messages
|
|
// batch.size
|
|
config.Producer.Flush.Bytes = 32 * 1024 // number of bytes to trigger a batch of messages
|
|
config.Net.MaxOpenRequests = 1
|
|
|
|
producer, err := sarama.NewSyncProducer(kafkaBootstrapServers, config)
|
|
if err != nil {
|
|
log.Fatalf("Failed to start Kafka producer: %v", err)
|
|
}
|
|
return producer
|
|
}
|
|
|
|
// WikimediaEventHandler connects to the Wikimedia stream and handles incoming events.
|
|
// It takes an IMessageHandler as a parameter, which is responsible for setting up and cleaning up the message handling logic.
|
|
// The function reads events from the stream, parses the JSON data, and passes it to the message handler.
|
|
// It also counts the number of messages processed and prints the total count at the end.
|
|
func WikimediaEventHandler(messageHandler IMessageHandler) {
|
|
// Connect to the Wikimedia stream
|
|
resp, err := http.Get(wikimediaStreamURL)
|
|
if err != nil {
|
|
log.Fatalf("Failed to connect to SSE endpoint: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
log.Fatalf("Failed to connect to SSE endpoint: %s", resp.Status)
|
|
}
|
|
|
|
reader := bufio.NewReader(resp.Body)
|
|
// Read the initial response from the stream to confirm the connection
|
|
line, err := reader.ReadString('\n')
|
|
if err != nil {
|
|
log.Fatalf("Failed to read from SSE endpoint: %v", err)
|
|
}
|
|
line = strings.TrimSpace(line)
|
|
if line != ":ok" {
|
|
log.Fatalf("Failed to connect to SSE endpoint: %s", line)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Handle SIGINT and SIGTERM signals to gracefully shut down the producer
|
|
go func() {
|
|
sigterm := make(chan os.Signal, 1)
|
|
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
|
|
<-sigterm
|
|
cancel()
|
|
}()
|
|
|
|
messageHandler.Setup()
|
|
defer messageHandler.Cleanup()
|
|
messageCnt := 0
|
|
wg := &sync.WaitGroup{}
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
fmt.Println("Producer is shutting down")
|
|
return
|
|
default:
|
|
line, err := reader.ReadString('\n')
|
|
if err != nil {
|
|
log.Fatalf("Failed to read from SSE endpoint: %v", err)
|
|
}
|
|
// Trim leading and trailing whitespace from the line and ignore empty lines
|
|
line = strings.TrimSpace(line)
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
|
|
wikiData := &WikiData{}
|
|
switch {
|
|
// Check for the event type and ignore any other events except "message"
|
|
case strings.HasPrefix(line, "event: "):
|
|
if line != "event: message" {
|
|
log.Fatalf("Failed to read from SSE endpoint: %s", line)
|
|
}
|
|
// Parse the JSON data and pass it to the message handler
|
|
case strings.HasPrefix(line, "data: "):
|
|
err = json.Unmarshal([]byte(line[6:]), &wikiData)
|
|
if err != nil {
|
|
log.Fatalf("Failed to unmarshal JSON: %v", err)
|
|
}
|
|
messageHandler.OnMessage(wikiData)
|
|
messageCnt++
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
wg.Wait()
|
|
fmt.Printf("Total messages: %d\n", messageCnt)
|
|
}
|
|
|
|
func main() {
|
|
WikimediaEventHandler(&kafkaMessageHandler{})
|
|
}
|