运维开发实战
2026/3/20大约 8 分钟
运维开发实战
一、CLI 工具开发
1.1 使用 Cobra 框架
package main
import (
"fmt"
"os"
"github.com/spf13/cobra"
)
var (
verbose bool
config string
)
// 根命令
var rootCmd = &cobra.Command{
Use: "ops-tool",
Short: "运维工具集",
Long: `一个功能强大的运维命令行工具集,提供服务器管理、部署、监控等功能。`,
}
// 服务器管理命令
var serverCmd = &cobra.Command{
Use: "server",
Short: "服务器管理",
}
var serverListCmd = &cobra.Command{
Use: "list",
Short: "列出所有服务器",
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("服务器列表:")
fmt.Println(" web-01 10.0.0.1 running")
fmt.Println(" web-02 10.0.0.2 running")
fmt.Println(" db-01 10.0.0.10 running")
},
}
var serverCheckCmd = &cobra.Command{
Use: "check [server-name]",
Short: "检查服务器状态",
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
serverName := args[0]
if verbose {
fmt.Printf("详细检查服务器: %s\n", serverName)
fmt.Println(" CPU: 45%")
fmt.Println(" Memory: 60%")
fmt.Println(" Disk: 35%")
} else {
fmt.Printf("服务器 %s: 正常\n", serverName)
}
},
}
// 部署命令
var deployCmd = &cobra.Command{
Use: "deploy [service]",
Short: "部署服务",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
service := args[0]
env, _ := cmd.Flags().GetString("env")
version, _ := cmd.Flags().GetString("version")
fmt.Printf("部署服务: %s\n", service)
fmt.Printf(" 环境: %s\n", env)
fmt.Printf(" 版本: %s\n", version)
fmt.Printf(" 配置: %s\n", config)
},
}
func init() {
// 全局标志
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "详细输出")
rootCmd.PersistentFlags().StringVarP(&config, "config", "c", "", "配置文件路径")
// 部署命令标志
deployCmd.Flags().StringP("env", "e", "production", "部署环境")
deployCmd.Flags().StringP("version", "V", "latest", "版本号")
// 构建命令树
serverCmd.AddCommand(serverListCmd)
serverCmd.AddCommand(serverCheckCmd)
rootCmd.AddCommand(serverCmd)
rootCmd.AddCommand(deployCmd)
}
func main() {
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
1.2 使用 Viper 配置管理
package main
import (
"fmt"
"log"
"github.com/spf13/viper"
)
type Config struct {
Server ServerConfig `mapstructure:"server"`
Database DatabaseConfig `mapstructure:"database"`
Redis RedisConfig `mapstructure:"redis"`
Log LogConfig `mapstructure:"log"`
}
type ServerConfig struct {
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
}
type DatabaseConfig struct {
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
DBName string `mapstructure:"dbname"`
}
type RedisConfig struct {
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
Password string `mapstructure:"password"`
DB int `mapstructure:"db"`
}
type LogConfig struct {
Level string `mapstructure:"level"`
Format string `mapstructure:"format"`
Output string `mapstructure:"output"`
}
func LoadConfig(path string) (*Config, error) {
viper.SetConfigFile(path)
// 设置默认值
viper.SetDefault("server.host", "0.0.0.0")
viper.SetDefault("server.port", 8080)
viper.SetDefault("log.level", "info")
viper.SetDefault("log.format", "json")
// 环境变量覆盖
viper.AutomaticEnv()
viper.SetEnvPrefix("APP")
if err := viper.ReadInConfig(); err != nil {
return nil, fmt.Errorf("读取配置失败: %w", err)
}
var config Config
if err := viper.Unmarshal(&config); err != nil {
return nil, fmt.Errorf("解析配置失败: %w", err)
}
return &config, nil
}
func main() {
// 配置文件示例:config.yaml
// server:
// host: 0.0.0.0
// port: 8080
// database:
// host: localhost
// port: 3306
// user: root
// password: secret
// dbname: myapp
config, err := LoadConfig("config.yaml")
if err != nil {
log.Fatal(err)
}
fmt.Printf("服务器配置: %s:%d\n", config.Server.Host, config.Server.Port)
fmt.Printf("数据库配置: %s@%s:%d/%s\n",
config.Database.User,
config.Database.Host,
config.Database.Port,
config.Database.DBName,
)
}
二、系统监控采集
2.1 系统信息采集
package main
import (
"fmt"
"os"
"runtime"
"time"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/host"
"github.com/shirou/gopsutil/v3/load"
"github.com/shirou/gopsutil/v3/mem"
"github.com/shirou/gopsutil/v3/net"
)
type SystemMetrics struct {
Hostname string `json:"hostname"`
Platform string `json:"platform"`
Uptime uint64 `json:"uptime"`
CollectTime time.Time `json:"collect_time"`
CPU CPUMetrics `json:"cpu"`
Memory MemoryMetrics `json:"memory"`
Disk []DiskMetrics `json:"disk"`
Network []NetMetrics `json:"network"`
Load LoadMetrics `json:"load"`
}
type CPUMetrics struct {
Cores int `json:"cores"`
UsageTotal float64 `json:"usage_total"`
}
type MemoryMetrics struct {
Total uint64 `json:"total"`
Used uint64 `json:"used"`
Free uint64 `json:"free"`
UsedPercent float64 `json:"used_percent"`
}
type DiskMetrics struct {
Path string `json:"path"`
Total uint64 `json:"total"`
Used uint64 `json:"used"`
Free uint64 `json:"free"`
UsedPercent float64 `json:"used_percent"`
}
type NetMetrics struct {
Interface string `json:"interface"`
BytesSent uint64 `json:"bytes_sent"`
BytesRecv uint64 `json:"bytes_recv"`
PacketsSent uint64 `json:"packets_sent"`
PacketsRecv uint64 `json:"packets_recv"`
}
type LoadMetrics struct {
Load1 float64 `json:"load1"`
Load5 float64 `json:"load5"`
Load15 float64 `json:"load15"`
}
func CollectMetrics() (*SystemMetrics, error) {
metrics := &SystemMetrics{
CollectTime: time.Now(),
}
// 主机信息
hostInfo, _ := host.Info()
metrics.Hostname = hostInfo.Hostname
metrics.Platform = hostInfo.Platform
metrics.Uptime = hostInfo.Uptime
// CPU
cpuPercent, _ := cpu.Percent(time.Second, false)
metrics.CPU = CPUMetrics{
Cores: runtime.NumCPU(),
UsageTotal: cpuPercent[0],
}
// 内存
memInfo, _ := mem.VirtualMemory()
metrics.Memory = MemoryMetrics{
Total: memInfo.Total,
Used: memInfo.Used,
Free: memInfo.Free,
UsedPercent: memInfo.UsedPercent,
}
// 磁盘
partitions, _ := disk.Partitions(false)
for _, p := range partitions {
usage, err := disk.Usage(p.Mountpoint)
if err != nil {
continue
}
metrics.Disk = append(metrics.Disk, DiskMetrics{
Path: p.Mountpoint,
Total: usage.Total,
Used: usage.Used,
Free: usage.Free,
UsedPercent: usage.UsedPercent,
})
}
// 网络
netStats, _ := net.IOCounters(true)
for _, stat := range netStats {
if stat.Name == "lo" {
continue
}
metrics.Network = append(metrics.Network, NetMetrics{
Interface: stat.Name,
BytesSent: stat.BytesSent,
BytesRecv: stat.BytesRecv,
PacketsSent: stat.PacketsSent,
PacketsRecv: stat.PacketsRecv,
})
}
// 负载
loadInfo, _ := load.Avg()
metrics.Load = LoadMetrics{
Load1: loadInfo.Load1,
Load5: loadInfo.Load5,
Load15: loadInfo.Load15,
}
return metrics, nil
}
func main() {
metrics, err := CollectMetrics()
if err != nil {
fmt.Println("采集失败:", err)
os.Exit(1)
}
fmt.Printf("主机: %s (%s)\n", metrics.Hostname, metrics.Platform)
fmt.Printf("运行时间: %d 秒\n\n", metrics.Uptime)
fmt.Printf("CPU: %d 核心, 使用率 %.1f%%\n",
metrics.CPU.Cores, metrics.CPU.UsageTotal)
fmt.Printf("内存: 总计 %.1f GB, 使用 %.1f%% \n",
float64(metrics.Memory.Total)/1024/1024/1024,
metrics.Memory.UsedPercent)
fmt.Printf("负载: %.2f %.2f %.2f\n\n",
metrics.Load.Load1, metrics.Load.Load5, metrics.Load.Load15)
fmt.Println("磁盘:")
for _, d := range metrics.Disk {
fmt.Printf(" %s: %.1f%% (%.1f/%.1f GB)\n",
d.Path, d.UsedPercent,
float64(d.Used)/1024/1024/1024,
float64(d.Total)/1024/1024/1024)
}
}
三、SSH 批量执行
package main
import (
"fmt"
"sync"
"time"
"golang.org/x/crypto/ssh"
)
type SSHConfig struct {
Host string
Port int
User string
Password string
PrivateKey string
Timeout time.Duration
}
type SSHClient struct {
config *SSHConfig
client *ssh.Client
}
func NewSSHClient(config *SSHConfig) (*SSHClient, error) {
var authMethods []ssh.AuthMethod
if config.Password != "" {
authMethods = append(authMethods, ssh.Password(config.Password))
}
sshConfig := &ssh.ClientConfig{
User: config.User,
Auth: authMethods,
HostKeyCallback: ssh.InsecureIgnoreHostKey(), // 生产环境应验证
Timeout: config.Timeout,
}
addr := fmt.Sprintf("%s:%d", config.Host, config.Port)
client, err := ssh.Dial("tcp", addr, sshConfig)
if err != nil {
return nil, fmt.Errorf("SSH 连接失败: %w", err)
}
return &SSHClient{config: config, client: client}, nil
}
func (c *SSHClient) Run(command string) (string, error) {
session, err := c.client.NewSession()
if err != nil {
return "", fmt.Errorf("创建会话失败: %w", err)
}
defer session.Close()
output, err := session.CombinedOutput(command)
if err != nil {
return string(output), fmt.Errorf("执行命令失败: %w", err)
}
return string(output), nil
}
func (c *SSHClient) Close() error {
return c.client.Close()
}
// 批量执行结果
type ExecResult struct {
Host string
Command string
Output string
Error error
Latency time.Duration
}
// 批量执行器
type BatchExecutor struct {
Concurrency int
}
func (be *BatchExecutor) Execute(hosts []SSHConfig, command string) []ExecResult {
results := make([]ExecResult, len(hosts))
var wg sync.WaitGroup
sem := make(chan struct{}, be.Concurrency)
for i, host := range hosts {
wg.Add(1)
go func(idx int, h SSHConfig) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
start := time.Now()
result := ExecResult{
Host: h.Host,
Command: command,
}
client, err := NewSSHClient(&h)
if err != nil {
result.Error = err
result.Latency = time.Since(start)
results[idx] = result
return
}
defer client.Close()
output, err := client.Run(command)
result.Output = output
result.Error = err
result.Latency = time.Since(start)
results[idx] = result
}(i, host)
}
wg.Wait()
return results
}
func main() {
hosts := []SSHConfig{
{Host: "10.0.0.1", Port: 22, User: "root", Password: "xxx", Timeout: 10 * time.Second},
{Host: "10.0.0.2", Port: 22, User: "root", Password: "xxx", Timeout: 10 * time.Second},
{Host: "10.0.0.3", Port: 22, User: "root", Password: "xxx", Timeout: 10 * time.Second},
}
executor := &BatchExecutor{Concurrency: 5}
results := executor.Execute(hosts, "uptime")
fmt.Println("执行结果:")
for _, r := range results {
if r.Error != nil {
fmt.Printf("[%s] 错误: %v\n", r.Host, r.Error)
} else {
fmt.Printf("[%s] (%v)\n%s\n", r.Host, r.Latency, r.Output)
}
}
}
四、日志收集与分析
package main
import (
"bufio"
"fmt"
"os"
"regexp"
"sort"
"strings"
"time"
)
// Nginx 访问日志解析
type NginxLogEntry struct {
IP string
Time time.Time
Method string
Path string
Status int
Size int
Referer string
UserAgent string
Latency float64
}
type LogAnalyzer struct {
entries []NginxLogEntry
}
// 简化的日志解析(实际应用需更完善的正则)
func ParseNginxLog(line string) (*NginxLogEntry, error) {
// 简化的正则,实际应用需要更精确的匹配
pattern := `^(\S+) .* \[([^\]]+)\] "(\S+) (\S+) [^"]*" (\d+) (\d+)`
re := regexp.MustCompile(pattern)
matches := re.FindStringSubmatch(line)
if len(matches) < 7 {
return nil, fmt.Errorf("无法解析日志行")
}
entry := &NginxLogEntry{
IP: matches[1],
Method: matches[3],
Path: matches[4],
}
fmt.Sscanf(matches[5], "%d", &entry.Status)
fmt.Sscanf(matches[6], "%d", &entry.Size)
return entry, nil
}
func (la *LogAnalyzer) LoadFile(path string) error {
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
entry, err := ParseNginxLog(scanner.Text())
if err != nil {
continue
}
la.entries = append(la.entries, *entry)
}
return scanner.Err()
}
// 统计分析
type Stats struct {
TotalRequests int
StatusCounts map[int]int
TopIPs []IPCount
TopPaths []PathCount
MethodCounts map[string]int
}
type IPCount struct {
IP string
Count int
}
type PathCount struct {
Path string
Count int
}
func (la *LogAnalyzer) Analyze() *Stats {
stats := &Stats{
TotalRequests: len(la.entries),
StatusCounts: make(map[int]int),
MethodCounts: make(map[string]int),
}
ipCounts := make(map[string]int)
pathCounts := make(map[string]int)
for _, entry := range la.entries {
stats.StatusCounts[entry.Status]++
stats.MethodCounts[entry.Method]++
ipCounts[entry.IP]++
pathCounts[entry.Path]++
}
// Top IPs
for ip, count := range ipCounts {
stats.TopIPs = append(stats.TopIPs, IPCount{ip, count})
}
sort.Slice(stats.TopIPs, func(i, j int) bool {
return stats.TopIPs[i].Count > stats.TopIPs[j].Count
})
if len(stats.TopIPs) > 10 {
stats.TopIPs = stats.TopIPs[:10]
}
// Top Paths
for path, count := range pathCounts {
stats.TopPaths = append(stats.TopPaths, PathCount{path, count})
}
sort.Slice(stats.TopPaths, func(i, j int) bool {
return stats.TopPaths[i].Count > stats.TopPaths[j].Count
})
if len(stats.TopPaths) > 10 {
stats.TopPaths = stats.TopPaths[:10]
}
return stats
}
func (s *Stats) Print() {
fmt.Printf("总请求数: %d\n\n", s.TotalRequests)
fmt.Println("状态码统计:")
for status, count := range s.StatusCounts {
fmt.Printf(" %d: %d (%.1f%%)\n",
status, count, float64(count)/float64(s.TotalRequests)*100)
}
fmt.Println("\nTop 10 IP:")
for i, ip := range s.TopIPs {
fmt.Printf(" %d. %s: %d\n", i+1, ip.IP, ip.Count)
}
fmt.Println("\nTop 10 路径:")
for i, p := range s.TopPaths {
fmt.Printf(" %d. %s: %d\n", i+1, p.Path, p.Count)
}
}
func main() {
analyzer := &LogAnalyzer{}
// 示例:从日志文件加载
// analyzer.LoadFile("/var/log/nginx/access.log")
// 模拟数据
analyzer.entries = []NginxLogEntry{
{IP: "10.0.0.1", Method: "GET", Path: "/api/users", Status: 200},
{IP: "10.0.0.1", Method: "GET", Path: "/api/users", Status: 200},
{IP: "10.0.0.2", Method: "POST", Path: "/api/login", Status: 200},
{IP: "10.0.0.1", Method: "GET", Path: "/api/orders", Status: 500},
{IP: "10.0.0.3", Method: "GET", Path: "/api/users", Status: 200},
}
stats := analyzer.Analyze()
stats.Print()
}
五、定时任务调度
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Job struct {
Name string
Schedule time.Duration
Task func(ctx context.Context) error
}
type Scheduler struct {
jobs []*Job
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func NewScheduler() *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{
ctx: ctx,
cancel: cancel,
}
}
func (s *Scheduler) AddJob(job *Job) {
s.jobs = append(s.jobs, job)
}
func (s *Scheduler) Start() {
for _, job := range s.jobs {
s.wg.Add(1)
go s.runJob(job)
}
}
func (s *Scheduler) runJob(job *Job) {
defer s.wg.Done()
ticker := time.NewTicker(job.Schedule)
defer ticker.Stop()
// 立即执行一次
s.executeJob(job)
for {
select {
case <-s.ctx.Done():
fmt.Printf("[%s] 任务停止\n", job.Name)
return
case <-ticker.C:
s.executeJob(job)
}
}
}
func (s *Scheduler) executeJob(job *Job) {
start := time.Now()
fmt.Printf("[%s] 开始执行 (%s)\n", job.Name, start.Format("15:04:05"))
if err := job.Task(s.ctx); err != nil {
fmt.Printf("[%s] 执行失败: %v\n", job.Name, err)
} else {
fmt.Printf("[%s] 执行成功 (耗时: %v)\n", job.Name, time.Since(start))
}
}
func (s *Scheduler) Stop() {
s.cancel()
s.wg.Wait()
}
func main() {
scheduler := NewScheduler()
// 添加任务
scheduler.AddJob(&Job{
Name: "健康检查",
Schedule: 5 * time.Second,
Task: func(ctx context.Context) error {
fmt.Println(" 检查服务健康状态...")
return nil
},
})
scheduler.AddJob(&Job{
Name: "日志清理",
Schedule: 10 * time.Second,
Task: func(ctx context.Context) error {
fmt.Println(" 清理过期日志...")
return nil
},
})
scheduler.AddJob(&Job{
Name: "监控采集",
Schedule: 3 * time.Second,
Task: func(ctx context.Context) error {
fmt.Println(" 采集系统指标...")
return nil
},
})
scheduler.Start()
// 运行 30 秒后停止
time.Sleep(30 * time.Second)
scheduler.Stop()
fmt.Println("调度器已停止")
}
六、本章小结
| 场景 | 推荐方案 | 核心库 |
|---|---|---|
| CLI 开发 | Cobra + Viper | spf13/cobra, spf13/viper |
| 系统监控 | gopsutil 采集 | shirou/gopsutil |
| SSH 批量执行 | golang.org/x/crypto/ssh | crypto/ssh |
| 日志分析 | bufio + regexp | regexp, bufio |
| 定时任务 | 自定义调度器 / robfig/cron | time.Ticker |
| 配置管理 | Viper 多格式支持 | spf13/viper |
运维开发建议
- CLI 工具使用 Cobra 框架,功能完善易于扩展
- 配置管理使用 Viper,支持多格式和环境变量
- SSH 批量执行注意并发控制和超时处理
- 监控采集使用 gopsutil 跨平台兼容
- 定时任务注意优雅关闭和错误处理