2024-07-11 02:32:31 +04:00

254 lines
7.7 KiB
Go

package main
import (
"context"
"crypto/tls"
"encoding/json"
"log"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"github.com/IBM/sarama"
"github.com/opensearch-project/opensearch-go"
)
var (
kafkaBootstrapServers = []string{"localhost:9092"}
kafkaTopic = "wikimedia.recentchange"
opensearchAddresses = []string{"https://localhost:9200"}
opensearchUsername = "admin"
opensearchPassword = "Toor1234_"
opensearchIndex = "wikimedia"
)
type OpensearchMessage struct {
Message []byte
ID string
}
type WikiData struct {
Schema string `json:"$schema"`
ID int `json:"id"`
Type string `json:"type"`
Namespace int `json:"namespace"`
Title string `json:"title"`
TitleURL string `json:"title_url"`
Comment string `json:"comment"`
Timestamp int `json:"timestamp"`
User string `json:"user"`
Bot bool `json:"bot"`
NotifyURL string `json:"notify_url"`
Minor bool `json:"minor"`
Patrolled bool `json:"patrolled"`
ServerURL string `json:"server_url"`
ServerName string `json:"server_name"`
ServerScriptPath string `json:"server_script_path"`
Wiki string `json:"wiki"`
ParsedComment string `json:"parsedcomment"`
Meta struct {
URI string `json:"uri"`
RequestID string `json:"request_id"`
ID string `json:"id"`
DT string `json:"dt"`
Domain string `json:"domain"`
Stream string `json:"stream"`
Topic string `json:"topic"`
Partition int `json:"partition"`
Offset int `json:"offset"`
} `json:"meta"`
Length struct {
Old int `json:"old"`
New int `json:"new"`
} `json:"length"`
Revision struct {
Old int `json:"old"`
New int `json:"new"`
} `json:"revision"`
}
// IMessageHandler represents an interface for handling messages in the Kafka consumer.
type IMessageHandler interface {
// Setup performs any necessary setup tasks before starting message processing to Opensearch.
Setup()
// Cleanup performs any necessary cleanup tasks after message consumption is complete.
Cleanup()
// OnMessage is called for each incoming message from Kafka.
// It takes a single parameter, `messages`, which represents the Opensearch message to be processed.
OnMessage(messages OpensearchMessage)
}
type opensearchHandler struct {
client *opensearch.Client
}
// Setup initializes the OpenSearch client and creates the necessary index if it doesn't exist.
func (h *opensearchHandler) Setup() {
log.Println("Setting up OpenSearch client")
var err error
client := newOpensearchClient()
_, err = client.Info()
if err != nil {
log.Fatal(err)
}
response, err := client.Indices.Exists([]string{opensearchIndex})
if err != nil {
log.Fatal(err)
}
response.Body.Close()
if response.StatusCode == 404 {
response, err = client.Indices.Create(opensearchIndex)
if err != nil {
log.Fatal(err)
}
response.Body.Close()
}
h.client = client
}
func (h *opensearchHandler) Cleanup() {
log.Println("Closing OpenSearch client")
}
// OnMessage is a method that handles incoming Opensearch messages.
// It indexes the message content into Opensearch and logs the response status code.
func (h *opensearchHandler) OnMessage(messages OpensearchMessage) {
response, err := h.client.Index(
opensearchIndex,
strings.NewReader(string(messages.Message)),
h.client.Index.WithDocumentID(messages.ID),
)
if err != nil {
log.Fatal(err)
}
log.Println("Index document:", response.StatusCode)
response.Body.Close()
}
// newOpensearchClient creates a new instance of the opensearch.Client.
// It configures the client with the provided opensearch addresses, username, and password.
// It also sets up a custom transport with TLS configuration to skip certificate verification.
// If any error occurs during the creation of the client, it logs the error and exits the program.
func newOpensearchClient() *opensearch.Client {
client, err := opensearch.NewClient(opensearch.Config{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
Addresses: opensearchAddresses,
Username: opensearchUsername,
Password: opensearchPassword,
})
if err != nil {
log.Fatal(err)
}
return client
}
type kafkaConsumerGroupHandler struct {
messageHandler IMessageHandler
}
// Setup initializes the consumer group handler.
// It sets up any necessary resources or configurations required for the handler to function properly.
// This method is called by the Sarama library when a new consumer group session is started.
// It returns an error if there was a problem setting up the handler.
func (h *kafkaConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
// h.messageHandler.Setup()
return nil
}
// Cleanup is called when the consumer group session is ending.
// It is responsible for cleaning up any resources used by the consumer group handler.
func (h *kafkaConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
// h.messageHandler.Cleanup()
return nil
}
// ConsumeClaim consumes messages from a Kafka consumer group claim.
// It processes each message by unmarshaling it into a WikiData struct,
// calling the message handler's OnMessage method with the OpensearchMessage,
// marking the message as processed, and committing the session.
// If there is an error during unmarshaling, it returns the error.
// It returns nil if all messages are consumed successfully.
func (h *kafkaConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
wikiData := &WikiData{}
err := json.Unmarshal(message.Value, wikiData)
if err != nil {
return err
}
h.messageHandler.OnMessage(OpensearchMessage{Message: message.Value, ID: wikiData.Meta.ID})
sess.MarkMessage(message, "")
sess.Commit()
}
return nil
}
// Consume consumes messages from a Kafka topic using a consumer group.
// It takes an IMessageHandler as a parameter to handle the consumed messages.
// The function creates a new consumer group with the specified group ID and configuration.
// It then starts consuming messages from the Kafka topic using the specified message handler.
// The function blocks until the consumer is closed or an error occurs.
func Consume(messageHandler IMessageHandler) {
groupID := "consumer-opensearch-demo"
config := sarama.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
config.Consumer.Offsets.AutoCommit.Enable = false
// config.Consumer.Offsets.AutoCommit.Interval = time.Millisecond * 5000
// config.RackID = "rack1"
consumer, err := sarama.NewConsumerGroup(kafkaBootstrapServers, groupID, config)
if err != nil {
log.Fatalf("Error creating consumer group: %v", err)
}
defer func() {
if err := consumer.Close(); err != nil {
log.Fatalf("Error closing consumer group: %v", err)
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle SIGINT and SIGTERM signals to gracefully shut down the consumer
go func() {
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
<-sigterm
cancel()
}()
handler := kafkaConsumerGroupHandler{messageHandler: messageHandler}
handler.messageHandler.Setup()
defer handler.messageHandler.Cleanup()
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := consumer.Consume(ctx, []string{kafkaTopic}, &handler); err != nil {
log.Fatalf("Error consuming messages: %v", err)
}
if ctx.Err() != nil {
log.Println("Consumer closed")
return
}
}
}()
wg.Wait()
}
func main() {
Consume(&opensearchHandler{})
}