该功能要实现检测日志输出内容,实时摘取错误日志片段,存放数据库中~
后续可进一步开发实现,错误信息分析,邮件预警等功能
一、需求
从服务器上检测程序运行输出的日志内容,将日志内容中Exception错误的内容块收取
将收取的错误信息存放到MySQL数据库中
二、选用技术点 1. FileBeat 采用fileBeat用于检测日志信息,获取日志中错误信息内容块,作为生产者传递给KafKa
2. Kafka 用于错误信息的收取,fileBeat传递内容后,通过kafka进行错误信息的传递
3. SpringBoot+Mybatis-plus+MySQL+MAVEN 开启kafka消费者监听,实时接受kafka获取的错误日志信息,格式化后入库
三、实现步骤 1、使用FileBeat实现日志的正则抓取
所在Linux服务器执行以下命令下载安装fielBeat
1 2 3 4 获取filebeat文件 curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.15.2-linux-x86_64.tar.gz 解压filebeat压缩包 tar -zxf kafka_2.12-3.0.0.tgz -C /opt/
修改配置文件
cd filebeat-7.15.2-linux-x86_64/ 进入目录
修改filebeat.yml文件,关注以下配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 # 限制一个cpu核心,避免在日志较频繁时可能导致CPU满载 max_procs: 1 # 监测的日志路径 paths: - /home/weblogic/base_domain/logs/nohup/nohup*.log # 匹配日志的行信息,例如找到以Excepton和Error开头的错误 include_lines: ['ERROR','^(\w{1,}\.){1,}(.*Exception|.*Error){1}'] ### Multiline options # 根据实际日志情况进行多行匹配,获取的是错误多行信息 multiline.type: pattern multiline.pattern: '^(\w{1,}\.){1,}(.*Exception|.*Error){1}|^[[:space:]]+(at|\.{3})[[:space:]]+\b|^Caused by:' multiline.negate: false multiline.match: after # -------------------------------kafka Output----------------------------------- output.kafka: # initial brokers for reading cluster metadata hosts: ["101.132.XXX.XX:9092"] # message topic selection + partitioning topic: 'error_log_collect_topic' partition.round_robin: reachable_only: false version: 2.0.0 required_acks: 1 compression: gzip max_message_bytes: 1000000
filebeat启动
1 ./filebeat -e -c filebeat配置文件
也可以后台启动
1 nohup ./filebeat -e -c filebeat.yml -d "publish" > /dev/null >2& >1 &
关于nohup具体用法,查看章节四
2、使用kafka获取错误日志信息 kafka中文教程
kafka的使用,要求先要安装好jdk以及zookeeper,kafka3.0.0自带zookeeper,也可以自行单独安装zookee。
所在Linux服务器执行以下命令下载安装Kafka
1 2 3 4 获取kafka文件包 wget http://archive.apache.org/dist/kafka/3.0.0/kafka_2.12-3.0.0.tgz 解压 tar kafka_2.12-3.0.0.tgz
配置环境变量
vim /etc/profile
添加以下内容
1 2 export KAFKA_HOME=/opt/kafka_2.12-3.0.0 export PATH=$PATH:$KAFKA_HOME/bin
修改配置文件
vim /opt/kafka_2.12-3.0.0/config/server.properties
在内容中修改或者添加以下信息:
1 2 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://101.132.140.20:9092
同时需要关注zookeeper.connect
属性值是否是正确的,如果是本机的zookeeper一般是localhost:2181
启动
启动参数 -daemon 指定后台启动
1 2 3 4 5 6 7 8 9 --启动zookeeper zookeeper-server-start.sh -daemon /opt/kafka_2.12-3.0.0/config/zookeeper.properties --启动kafka kafka-server-start.sh -daemon /opt/kafka_2.12-3.0.0/config/server.properties --生产者 kafka-console-producer.sh --broker-list localhost:9092 --topic error_log_collect_topic --消费者 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic error_log_collect_topic --from-beginning
创建和查看主题
创建一个名为“test”的Topic,只有一个分区和一个备份:
1 kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
创建好之后,可以通过运行以下命令,查看已创建的topic信息:
1 2 3 kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092 Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
3、SpringBoot+MySQL+Mybatis-Plus (—)SpingBoot相关 新增Module模块tool-log处理kafka获取的信息
修改pom.xml,注意以下内容
是否配置了<parent></parent>属性
<dependency>是否依赖了需要的模块
这个Module另外需要的依赖是否引入
parent的pom.xml中module是否有这个新module
SpringBoot主类Module的pom.xml是否依赖了这个新的Module
1. application.yml配置文件添加kafka信息 在SpringBoot入口配置Module的resources目录下的application.yml添加以下信息
1 2 3 4 5 6 7 spring: kafka: consumer: enable-auto-commit: true auto-offset-reset: earliest bootstrap-servers: 101.132.140.20:9092 group-id: test-consumer-group
2. application.yml配置文件添加日志数据源 主要是配置 log数据源,作为后面持久层入库的数据源连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 spring: datasource: druid: stat-view-servlet: loginUsername: admin loginPassword: 123456 dynamic: druid: #以下是全局默认值,可以全局更改 minIdle: 5 maxActive: 20 filters: stat,wall # 注意这个值和druid原生不一致,默认启动了stat,wall stat: merge-sql: true log-slow-sql: true datasource: log: url: jdbc:mysql://101.132.XXX.XX:3306/XXX username: root password: XXX driverClassName: com.mysql.jdbc.Driver
3. 使用 mybatis-generator
逆向工程生成文件 将生成的文件转移到新的Module中去,注意MyBatis-plus配置在SpringBoot启动类上的Mapper的扫描路径,那么在转移文件的时候,新的Module也要新建这样一层包路径,将相关的mapper文件放进去
4. 业务代码编写 1. 编写kafka消费者监听类 1 2 3 4 5 6 7 8 9 10 11 12 @Component public class ErrorMsgCollect { @Autowired ParseErrorInfoMessage parseErrorInfoMessage; @KafkaListener(topics = "error_log_collect_topic") public void onMessage(ConsumerRecord<Integer,String> record){ ErrorLogInfo errorLogInfo = new ErrorLogInfo(); System.out.println("消费者收到的消息=="+record.topic()+"\t"+record.partition()+"\t"+record.offset()+"\t"+record.key()+"\t"+record.value()); parseErrorInfoMessage.parseMessage(record.value(), errorLogInfo); } }
2. 编写数据解析类,和数据存库 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Component public class ParseErrorInfoMessage { @Autowired ErrorLogInfoService errorLogInfoService; /** * 解析错误信息 * @param errorInfo * @param errorLogInfo * @return */ public ErrorLogInfo parseMessage(String errorInfo, ErrorLogInfo errorLogInfo){ //textMp(); JSONObject errorInfoObject = JSONObject.parseObject(errorInfo); //获取错误日志时间 String errDate = errorInfoObject.get("@timestamp").toString(); errorLogInfo.setErrordate(DateUtil.parse(errDate)); //获取日志位置 JSONObject logInfoObject = (JSONObject)errorInfoObject.get("log"); JSONObject fileObject = (JSONObject)logInfoObject.get("file"); errorLogInfo.setLogpath(fileObject.get("path").toString()); //获取日志所在服务器ip JSONObject hostObject = (JSONObject)errorInfoObject.get("host"); JSONArray ipArr = hostObject.getJSONArray("ip"); errorLogInfo.setIpaddr(ipArr.get(0).toString()); //获取错误日志信息 Object errMessage = errorInfoObject.get("message"); errorLogInfo.setMessage(errMessage.toString()); errorLogInfo.setAdddate(new Date()); saveErrorLogInfo(errorLogInfo); return errorLogInfo; } //入库 public void saveErrorLogInfo(ErrorLogInfo errorLogInfo){ boolean save = errorLogInfoService.save(errorLogInfo); System.out.println("插入提示:" + save ); } }
3. 注意: 在对应的mapper文件上方提交@DS(“XX”),指定使用的数据源
1 2 3 4 5 @Mapper @DS("log") public interface ErrorLogInfoMapper extends BaseMapper<ErrorLogInfo> { }
至此,完成代码编写开发,通过SpringBoot启动类即可启动
(二)MySQL mysql相关下载安装参考文档:点击下载 密码:lzlz
建立数据库,确定表结构
1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE `error_log_info` ( `id` int (11 ) NOT NULL AUTO_INCREMENT COMMENT '自增逐渐ID' , `ipaddr` varchar (50 ) DEFAULT NULL COMMENT 'IP地址' , `logpath` varchar (255 ) DEFAULT NULL COMMENT '日志地址' , `message` text COMMENT '错误信息' , `errordate` datetime DEFAULT NULL COMMENT '错误日志时间' , `adddate` datetime DEFAULT NULL COMMENT '日志入库时间' , PRIMARY KEY (`id`), KEY `error_date_index` (`errordate`), FULLTEXT KEY `error_info_index` (`message`) ) ENGINE= InnoDB AUTO_INCREMENT= 50426 DEFAULT CHARSET= utf8
(三)项目部署 1、安装配置maven 这一步骤有无皆可,可以本地打包上传
将maven二进制文件包上传到服务器解压后,添加到环境变量
1 2 3 MAVEN_HOME=/usr/local/src/apache-maven-3.6.3 PATH=$PATH:$MAVEN_HOME/bin export PATH JAVA_HOME CLASSPATH
然后输入 source /etc/profile 使得环境变量生效
输入 mvn -version 测试,配置是否成功
配置 maven的setting.xml文件
本地仓库地址
阿里云镜像
1 2 3 4 5 6 7 8 <localRepository>/home/b2b/repository</localRepository> <mirror> <id>aliyunmaven</id> <mirrorOf>*</mirrorOf> <name>阿里云公共仓库</name> <url>https://maven.aliyun.com/repository/public</url> </mirror>
2、上传源码 将源码上传到服务器上
3、使用maven打包 将目录切到源码的parent层级下,执行
进行先clean再打包的操作
打包完成,在SpringBoot启动类模块,target文件夹下,找到生成的jar文件,在该层级执行
1 nohup java -jar b2btool-web-0.0.1-SNAPSHOT.jar &
nohup 会将输出的日志后台输出到 nohub.out文件中
至此项目部署完成
四、具体细节 1、mabits-generator逆向工程使用 pom.xml 依赖导入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <!--mp代码生成器--> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId> <version>3.4.1</version> </dependency> <!--为实体类自动添加getter、setter、toString等方法--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!--数据库连接驱动--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.46</version> </dependency>
编写generator代码生成器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 import com.baomidou.mybatisplus.annotation.DbType;import com.baomidou.mybatisplus.annotation.IdType;import com.baomidou.mybatisplus.core.toolkit.StringPool;import com.baomidou.mybatisplus.generator.AutoGenerator;import com.baomidou.mybatisplus.generator.InjectionConfig;import com.baomidou.mybatisplus.generator.config.*;import com.baomidou.mybatisplus.generator.config.po.TableInfo;import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy;import com.baomidou.mybatisplus.generator.engine.FreemarkerTemplateEngine;import com.baomidou.mybatisplus.generator.engine.VelocityTemplateEngine; import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map; public class MyBatisPlusGenerator { private static final String AUTHOR = "kevin" ; private static final String PREFIX = "t_" ; private static final String MODULE_NAME = "test" ; private static final String[] TABLES= {"想生成代码的表名" }; private static final String JDBC_URL = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8" + "&useSSL=false&zeroDateTimeBehavior=convertToNull&" ; private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver" ; private static final String JDBC_USERNAME = "数据库用户名" ; private static final String JDBC_PASSWORD = "数据库密码" ; private static final String BASE_PACKAGE = "com.liu.test.mybatis.demo" ; public static void main (String[] args) { String projectPath = System.getProperty("user.dir" ); AutoGenerator generator = new AutoGenerator(); configDataSource(generator); configGlobal(generator, projectPath); configPackage(generator); configStrategy(generator); cofnigCustom(generator, projectPath); configTemplate(generator); generator.execute(); } private static void configDataSource (AutoGenerator generator) { DataSourceConfig dataSourceConfig = new DataSourceConfig(); dataSourceConfig.setUrl(JDBC_URL); dataSourceConfig.setDbType(DbType.MYSQL); dataSourceConfig.setDriverName(JDBC_DRIVER); dataSourceConfig.setUsername(JDBC_USERNAME); dataSourceConfig.setPassword(JDBC_PASSWORD); generator.setDataSource(dataSourceConfig); } private static void configGlobal (AutoGenerator generator, String projectPath) { GlobalConfig globalConfig = new GlobalConfig(); String fileOutputPatch = projectPath.concat("/src/main/java" ); globalConfig.setOutputDir(fileOutputPatch); globalConfig.setAuthor(AUTHOR); globalConfig.setOpen(false ); globalConfig.setFileOverride(true ); globalConfig.setActiveRecord(true ); globalConfig.setEnableCache(false ); globalConfig.setBaseResultMap(true ); globalConfig.setBaseColumnList(true ); globalConfig.setIdType(IdType.INPUT); globalConfig.setMapperName("%sMapper" ); globalConfig.setXmlName("%sMapper" ); globalConfig.setEntityName("%s" ); globalConfig.setServiceName("%sService" ); globalConfig.setServiceImplName("%sServiceImpl" ); globalConfig.setControllerName("%sController" ); globalConfig.setSwagger2(true ); generator.setGlobalConfig(globalConfig); } private static void configPackage (AutoGenerator generator) { PackageConfig packageConfig = new PackageConfig(); packageConfig.setModuleName(MODULE_NAME); packageConfig.setParent(BASE_PACKAGE); packageConfig.setController("controller" ); packageConfig.setService("service" ); packageConfig.setServiceImpl("service.impl" ); packageConfig.setEntity("entity" ); packageConfig.setMapper("mapper" ); packageConfig.setXml("mapper" ); generator.setPackageInfo(packageConfig); } private static void configStrategy (AutoGenerator generator) { StrategyConfig strategy = new StrategyConfig(); strategy.setTablePrefix(PREFIX); strategy.setNaming(NamingStrategy.underline_to_camel); strategy.setColumnNaming(NamingStrategy.underline_to_camel); strategy.setInclude(TABLES); strategy.setControllerMappingHyphenStyle(true ); generator.setTemplateEngine(new FreemarkerTemplateEngine()); strategy.setEntityLombokModel(true ); strategy.setRestControllerStyle(true ); strategy.setEntityBooleanColumnRemoveIsPrefix(true ); generator.setStrategy(strategy); } private static void cofnigCustom (AutoGenerator generator, String projectPath) { InjectionConfig cfg = new InjectionConfig() { @Override public void initMap () { } }; String templatePath = "/templates/mapper.xml.ftl" ; List<FileOutConfig> focList = new ArrayList<>(); focList.add(new FileOutConfig(templatePath) { @Override public String outputFile (TableInfo tableInfo) { return projectPath.concat("/src/main/resources/mapper/" ).concat(MODULE_NAME).concat("/" ) .concat(tableInfo.getEntityName()).concat("Mapper" ).concat(StringPool.DOT_XML); } }); cfg.setFileOutConfigList(focList); generator.setCfg(cfg); } private static void configTemplate (AutoGenerator generator) { TemplateConfig templateConfig = new TemplateConfig(); templateConfig.setXml(null ); generator.setTemplate(templateConfig); } }
其实这样配置后就可右键运行生成代码了,只不过是最原始的代码,模版是官方的默认模版。
根据自己使用的模版引擎来编写模版。模版在项目中存放到resources目录下的templates目录中,如图:
2、FastJson解析数据 将获取的数据转化为JSON对象
1 JSONObject jsonObject = JSONObject.parseObject(jsonStr);
对于暴漏的键值对可以直接通过get方法获取,例如:
1 2 //获取错误日志时间 String errDate = errorInfoObject.get("@timestamp").toString();
对于键值对,值仍是JSON对象的,获取后仍将类型转换为JSONObject,例如:
1 2 3 4 //获取日志位置 JSONObject logInfoObject = (JSONObject)errorInfoObject.get("log"); JSONObject fileObject = (JSONObject)logInfoObject.get("file"); errorLogInfo.setLogpath(fileObject.get("path").toString());
对于值是数组的,获取其数组对象,例如:
1 2 3 4 //获取日志所在服务器ip JSONObject hostObject = (JSONObject)errorInfoObject.get("host"); JSONArray ipArr = hostObject.getJSONArray("ip"); errorLogInfo.setIpaddr(ipArr.get(0).toString());
3、nohup (一)作用 nohup命令用于不挂断地运行命令(关闭当前session不会中断改程序,只能通过kill等命令删除)。 使用nohup命令提交作业,如果使用nohup命令提交作业,那么在缺省情况下该作业的所有输出都被重定向到一个名为nohup.out的文件中,除非另外指定了输出文件。
示例:
1 nohup command > myout.file 2>&1 & echo $! > command.pid
&用于后台执行程序,但是关闭当前session程序也会结束
(二)2>&1 &详解 bash中:
0 代表STDIN_FILENO 标准输入(一般是键盘),
1 代表STDOUT_FILENO 标准输出(一般是显示屏,准确的说是用户终端控制台),
2 三代表STDERR_FILENO (标准错误(出错信息输出)。
> 直接把内容生成到指定文件,会覆盖原来文件中的内容[ls > test.txt], >> 尾部追加,不会覆盖原有内容 [ls >> test.txt], < 将指定文件的内容作为前面命令的参数[cat < text.sh]
2>&1就是用来将标准错误2重定向到标准输出1中的。此处1前面的&就是为了让bash将1解释成标准输出而不是文件1。至于最后一个&,则是让bash在后台执行。
(三)/dev/null 2>&1 可以把/dev/null 可以看作”黑洞”. 它等价于一个只写文件. 所有写入它的内容都会永远丢失. 而尝试从它那儿读取内容则什么也读不到. /dev/null 2>&1则表示吧标准输出和错误输出都放到这个“黑洞”,表示什么也不输出。