見出し画像

FluentbitとTWSNMP FCの連携開発5日目:FluentbitのOutputプラグインができたので、おまけでCPU使用率などを送信できるInputプラグインも作ってみた

今朝は4時から開発開始です。助手の猫さんも一緒におきました。
昨日も月をコンパクトデジカメで撮影できました。タイトルの写真です。

昨日、おしいところまでできたTWSNMP FCへログを送信するFluentbitのOutputプラグインの開発の続きです。ログが送信できなかったのは、sshでコマンドを実行する時のメッソドが間違っていたためでした。
 

//export FLBPluginFlushCtx
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
	// Type assert context back into the original type for the Go variable
	id := output.FLBPluginGetContext(ctx).(string)
	conf, ok := outputConfMap[id]
	if !ok {
		log.Printf("[out_twsnmp] Flush called for inkown id: %s", id)
		return output.FLB_ERROR
	}

	c, s, err := connectSSH(conf)
	if err != nil {
		log.Printf("[out_twsnmp] connectSSH err: %v", err)
		return output.FLB_ERROR
	}
	defer s.Close()
	defer c.Close()
	cmd := fmt.Sprintf("put syslog %d %d", conf.Facility, conf.Severity)
	if conf.Host != "" {
		cmd += " " + conf.Host
	}
	out, err := s.StdinPipe()
	if err != nil {
		log.Printf("[out_twsnmp] ssh.StdinPipe err: %v", err)
		return output.FLB_ERROR
	}
	err = s.Start(cmd)
	if err != nil {
		log.Printf("[out_twsnmp] ssh.Start err: %v", err)
		return output.FLB_ERROR
	}

	dec := output.NewDecoder(data, int(length))
	count := 0
	for {
		ret, ts, record := output.GetRecord(dec)
		if ret != 0 {
			break
		}

		var timestamp time.Time
		switch t := ts.(type) {
		case output.FLBTime:
			timestamp = ts.(output.FLBTime).Time
		case uint64:
			timestamp = time.Unix(int64(t), 0)
		default:
			timestamp = time.Now()
		}
		msg := []string{}
		for k, v := range record {
			switch vv := v.(type) {
			case string:
				msg = append(msg, fmt.Sprintf(`"%v": "%s"`, k, vv))
			case []uint8:
				msg = append(msg, fmt.Sprintf(`"%v": "%s"`, k, string(vv)))
			default:
				msg = append(msg, fmt.Sprintf(`"%v": %v`, k, v))
			}
		}
		io.WriteString(out, fmt.Sprintf("%d\t%s\t{%s}\r\n", timestamp.UnixNano(), C.GoString(tag), strings.Join(msg, ", ")))
		count++
	}
	return output.FLB_OK
}

func connectSSH(conf *outputConfEnt) (*ssh.Client, *ssh.Session, error) {
	signer, err := ssh.ParsePrivateKey(conf.PrivateKey)
	if err != nil {
		return nil, nil, err
	}
	sshConfig := &ssh.ClientConfig{
		User:    "twsnmp",
		Auth:    []ssh.AuthMethod{},
		Timeout: time.Second * 1,
	}
	sshConfig.Auth = append(sshConfig.Auth, ssh.PublicKeys(signer))
	if len(conf.HostKey) > 0 {
		pubkey, _, _, _, err := ssh.ParseAuthorizedKey(conf.HostKey)
		if err != nil {
			return nil, nil, err
		}
		sshConfig.HostKeyCallback = ssh.FixedHostKey(pubkey)
	} else {
		sshConfig.HostKeyCallback =
			func(hostname string, remote net.Addr, key ssh.PublicKey) error {
				newKey := strings.TrimSpace(string(ssh.MarshalAuthorizedKey(key)))
				if conf.LastHostKey == "" {
					conf.LastHostKey = newKey
				}
				if conf.LastHostKey != newKey {
					return fmt.Errorf("host key changed")
				}
				return nil
			}
	}
	client, err := ssh.Dial("tcp", conf.Twsnmp, sshConfig)
	if err != nil {
		return nil, nil, err
	}

	session, err := client.NewSession()
	if err != nil {
		client.Close()
		return nil, nil, err
	}

	return client, session, nil
}

のようなコードで動作しましした。昨日は

	err = s.Start(cmd)

のところを

	err = s.Run(cmd)

としていたので、そこで止まっていました。

dummy プラグインからのログを送信すれば、

TWSNMP FCで受信できました。

調子がでてきたので、FluentbitのCPU使用率やDisk使用率のInputプラグインがLinuxでしか動作しないという課題を解決したくなりました。
GO言語には、

という便利なパッケージがあるので、これを利用してInputプラグインを作ってみることにしました。

package main

/*
#include <stdlib.h>
*/
import "C"
import (
	"fmt"
	"log"
	"time"
	"unsafe"

	"github.com/fluent/fluent-bit-go/input"
	"github.com/shirou/gopsutil/v3/cpu"
	"github.com/shirou/gopsutil/v3/disk"
	"github.com/shirou/gopsutil/v3/mem"
)

var collection = ""

//export FLBPluginRegister
func FLBPluginRegister(def unsafe.Pointer) int {
	log.Printf("[in_gopsutil] Register called")
	return input.FLBPluginRegister(def, "gopsutil", "Input plugin for gopsutil")
}

// (fluentbit will call this)
// plugin (context) pointer to fluentbit context (state/ c code)
//
//export FLBPluginInit
func FLBPluginInit(plugin unsafe.Pointer) int {
	collection = input.FLBPluginConfigKey(plugin, "collection")
	log.Printf("[in_gopsutil] collection= '%s'", collection)
	return input.FLB_OK
}

//export FLBPluginInputCallback
func FLBPluginInputCallback(data *unsafe.Pointer, size *C.size_t) int {
	flb_time := input.FLBTime{time.Now()}
	entry := []interface{}{flb_time}
	switch collection {
	case "cpu.percent.percpu":
		entry = append(entry, getCPUPercent(true))
	case "cpu.percent":
		entry = append(entry, getCPUPercent(false))
	case "cpu.times":
		if d, err := cpu.Times(false); err == nil {
			entry = append(entry, d)
		}
	case "cpu.times.percpu":
		if d, err := cpu.Times(true); err == nil {
			entry = append(entry, d)
		}
	case "mem.vm":
		if d, err := mem.VirtualMemory(); err == nil {
			entry = append(entry, d)
		}
	case "disk.usage":
		if d, err := disk.Usage("/"); err == nil {
			entry = append(entry, d)
		}
	default:
		log.Println("unknown collection")
		return input.FLB_ERROR
	}
	enc := input.NewEncoder()
	packed, err := enc.Encode(entry)
	if err != nil {
		log.Printf("[in_gopsutil] encode err:%v", err)
		return input.FLB_ERROR
	}

	length := len(packed)
	*data = C.CBytes(packed)
	*size = C.size_t(length)
	return input.FLB_OK
}

//export FLBPluginInputCleanupCallback
func FLBPluginInputCleanupCallback(data unsafe.Pointer) int {
	return input.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int {
	log.Printf("[in_gopsutil] Exit called")
	return input.FLB_OK
}

func getCPUPercent(percpu bool) map[string]interface{} {
	ret := make(map[string]interface{})
	if cpus, err := cpu.Percent(0, percpu); err == nil {
		if percpu {
			for i, v := range cpus {
				ret[fmt.Sprintf("cpu%d", i+1)] = v
			}
		} else if len(cpus) > 0 {
			ret["cpu"] = cpus[0]
		}
	}
	return ret
}

func main() {
}

のような感じでできました。ディスクの使用量を

のような感じで測定できました。

GO言語で作るFluentbitのInputプラグインは、複数インタンスに対応していないようです。収集するデータを複数指定する方法を試行錯誤しているうちに時間がたってしまいました。TWSNMP FCから受信するInputプラグインも複数インスタンスを起動するのは、だめみたいですが、深追いするのは諦めました。
おまけのプラグインをもう少し整理したら、今回開発したFluentbitのプラグイン達をTWSNMP FCのリリースに組み込んで配布しようと思います。

明日に続く

開発のための諸経費(機材、Appleの開発者、サーバー運用)に利用します。 ソフトウェアのマニュアルをnoteの記事で提供しています。 サポートによりnoteの運営にも貢献できるのでよろしくお願います。