見出し画像

TWLogAIAN:GO言語の全文検索エンジンBlugeを中心としたソフトウェア構造が見えてきた

今朝は4時から開発開始です。先週末の贅沢なテレワークで温泉につかりならが考えたTWLogAIANのソフトウェア構造を絵にしてみました。

TWLogAIANの構造

おおまかな流れは

  •  いろいろな方法(File/Folder/HTTP/SCP/SFTP)でログを集める

  • ログから(timegrinderで)時刻や(GROKで)その他の情報を抽出する

  • 全文検索エンジン(Bluge)のインデックスを作成する

  • インデックスから検索した結果を分析のための表示する

  • TensorFlow.jsを使ったAIが分析をアシストする

のような感じです。
このソフトの中心にあるのがBluge

です。

評価のためのログファイルを読み込んで時刻をキーにしたインデクスを作成するサンプルを作ってみました。

package main

import (
	"bufio"
	"context"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"time"

	"github.com/blugelabs/bluge"
	"github.com/gravwell/gravwell/v3/timegrinder"
)

func main() {
                 // メモリ上にインデックス作成
	config := bluge.InMemoryOnlyConfig()

	// インデック書き込み機能を開く
	writer, err := bluge.OpenWriter(config)
	if err != nil {
		log.Fatalf("error opening writer: %v", err)
	}
	defer func() {
		err = writer.Close()
		if err != nil {
			log.Fatalf("error closing writer: %v", err)
		}
	}()
                //ログ・ファイルをインデクスの追加
	loadLog(os.Args[1], writer)
	reader, err := writer.Reader()
	if err != nil {
		log.Fatalf("error getting index reader: %v", err)
	}
	defer func() {
		err = reader.Close()
		if err != nil {
			log.Fatalf("error closing reader: %v", err)
		}
	}()

	// 時刻の検索範囲を指定
	et := time.Now().Add(+1 * time.Hour * 24 * 365)
	st := time.Now().Add(-1 * time.Hour * 24 * 365)

	// 検索を作成
	query := bluge.NewDateRangeQuery(st, et).SetField("time")

	request := bluge.NewTopNSearch(10, query).
		WithStandardAggregations()
	fmt.Printf("searching for time after %s and before %s\n",
		st.Format(time.RFC3339), et.Format(time.RFC3339))

	// 検索を実行
	documentMatchIterator, err := reader.Search(context.Background(), request)
	if err != nil {
		log.Fatalf("error executing search: %v", err)
	}
	// 検索結果を表示
	match, err := documentMatchIterator.Next()
	for err == nil && match != nil {

		err = match.VisitStoredFields(func(field string, value []byte) bool {
			fmt.Printf("%s: %s\n", field, string(value))
			return true
		})
		if err != nil {
			log.Fatalf("error loading stored fields: %v", err)
		}
		match, err = documentMatchIterator.Next()
	}
	if err != nil {
		log.Fatalf("error iterator document matches: %v", err)
	}
}

//ログ・ファイルを読み込んでインデックスに追加
func loadLog(path string, writer *bluge.Writer) {
//ログから時刻を抽出するパッケージを初期化
	cfg := timegrinder.Config{
		EnableLeftMostSeed: true,
	}
	tg, err := timegrinder.New(cfg)
	if err != nil {
		log.Fatal("failed to create new timegrinder", err)
	}
                //ログファイルを開く
	file, err := os.Open(path)
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()
	scanner := bufio.NewScanner(file)
	ln := 0
	st := time.Now()
	batch := bluge.NewBatch()
	var lts time.Time
	for scanner.Scan() {
		l := scanner.Text()
                                //ログから時刻を抽出
		ts, ok, err := tg.Extract([]byte(l))
		if err != nil {
			fmt.Printf("err =%v", err)
		} else if ok {
                                     //時刻が取得できたものだけ追加する
			lts = ts
			doc := bluge.NewDocument(fmt.Sprintf("%d", ln)).AddField(bluge.NewTextField("line", l).StoreValue()).AddField(bluge.NewDateTimeField("time", ts).StoreValue())
			ln++
			batch.Insert(doc)
			if ln%10000 == 0 {
				log.Printf("ln=%d dur=%v", ln, time.Since(st))
			}
		} else {
			fmt.Println(l)
		}
	}
	if err := scanner.Err(); err != nil {
		log.Fatal(err)
	}
	err = writer.Batch(batch)
	if err != nil {
		log.Fatalf("error executing batch: %v", err)
	}
	log.Printf("end ln=%d dur=%v lts=%s", ln, time.Since(st), lts)
}

大体200MBのログファイル(125万件)で1分ぐらいでインデックスを作成できます。1GBで5分です。
ログの行から時刻を抽出するのは

を使っています。
作るものが整理できたので操作する画面を考えていますが、データの読み込み元の追加や削除できるタイミングやインデクスを再作成するタイミングで悩んでいます。今朝は時間切れにになりそうです。
TwitterにTWSNMP FCが

上のDocker環境で動いたという書き込み見つけて、私も買いたくなりました。

明日に続く


開発のための諸経費(機材、Appleの開発者、サーバー運用)に利用します。 ソフトウェアのマニュアルをnoteの記事で提供しています。 サポートによりnoteの運営にも貢献できるのでよろしくお願います。