博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于Nginx1.9+LuaJIT+Kafka的点播监控系统实战
阅读量:4030 次
发布时间:2019-05-24

本文共 7611 字,大约阅读时间需要 25 分钟。

http://m.blog.csdn.net/article/details?id=51168573

基于Nginx1.9+LuaJIT+Kafka的点播监控系统实战(上海卓越智慧树网点播监控系统)

发表于2016/4/16 16:06:45  1310人阅读

分类: Kafka Lua Nginx

最近在做点监控系统,先后采用了两套方案:

方案一:Nginx记录日志 --> tail语句扫描日志到Kafka --> Java站点消费Kafka消息后存储到Mysql数据库 --> Mysql数据库 --> 数据库定时汇聚数据 --> 界面呈现

方案一遇到的问题:

1.1 
面对海量数据,日志文件增长很快,磁盘占用大

1.2 
Java站点消费Kafka消息后存储到Mysql数据库 太慢



为了解决上述两个问题,采用方案二:

方案二:Nginx+Lua直接向Kafa发送数据  --> Java站点消费Kafka消息后存储到Mysql数据库 --> Mysql数据库 --> 数据库定时汇聚数据 --> 界面呈现

2.1 
Nginx+Lua直接向Kafa发送数据: Nginx不记录日志,直接发送到Kafka,不占用磁盘空间

2.2 
Java站点消费Kafka消息后存储到Mysql数据库: 合并更新操作,如对同一个sessionid的更新操作有100条,合并后只有一条更新语句

方案二遇到的问题:

2.3 Nginx可以直接用完整的
OpenResty,也可以在现有的
Nginx基础上添加插件

    
我采用的是在现有的Nginx基础上添加插件



方案一具体如下:

Nginx1.9.9

kafka_2.11-0.9.0.1



1.1、Nginx配置文件里的配置:

log_format ht-video  '$request|$msec|$http_x_forwarded_for|$http_user_agent|$request_body';



server{

                server_name lc.zhihuishu.com;



                location / {

                        root /usr/local/nginxnew/html/view;

                        index index.html;

                }

                location /ht{

                        #root   html;

                        #index  index.html index.htm;

                        if ( $request ~ "GET" ) {

                                #access_log logs/ht-video.log ht-video;

                                                access_log /htlog/ht-video.log ht-video;

                        }

                        proxy_set_header Host $http_host;

                        proxy_pass http://localhost:6099;

                        client_max_body_size 8000M;

                        #error_page 405 =200 $1;

                }

        }



        server {

                listen       6099;

                server_name  localhost;

                location /ht {

                root   /usr/local/nginxnew/html/hightower;

                index  index.html index.htm;

                client_max_body_size 8000M;

                error_page 405 =200 $1;

        }



1.2、修改本机Hosts文件

vi /etc/hosts



10.252.126.242  kafka1.livecourse.com

10.252.126.242  zookeeper1.livecourse.com







1.3、修改Kafka文件

vi /usr/local/kafka_2.11-0.9.0.1/config/server.properties 

   这个文件里要改下面5点:

broker.id=0

port=9092

host.name=10.252.126.242

log.dirs=/able4/kafka-logs

zookeeper.connect=zookeeper1.livecourse.com:2181





vi /usr/local/kafka_2.11-0.9.0.1/bin/kafka-run-class.sh 

  添加两行:

export JAVA_HOME=/usr/local/java/jdk1.7.0_79

export JRE_HOME=/usr/local/java/jdk1.7.0_79/jre





1.4、启动zookeeper和kafka

/usr/local/kafka_2.11-0.9.0.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/zookeeper.properties

/usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/server.properties





1.5、扫描Nginx日志发送到kafka

tail -n 0 -f  /usr/local/nginxnew/logs/ht-video.log | /usr/local/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh --broker-list kafka1.livecourse.com:9092 --topic HT-VIDEO







方案二具体如下:








参考的这几篇文章方成此方案




1 安装LuaJIT

下载http://luajit.org/download.html

http://luajit.org/install.html

  命令如下:

tar zxf LuaJIT-2.0.4.tar.gz

cd LuaJIT-2.0.4

make PREFIX=/usr/local/LuaJIT

make install PREFIX=/usr/local/LuaJIT

echo "/usr/local/LuaJIT/lib" > /etc/ld.so.conf.d/usr_local_luajit_lib.conf

ldconfig

#注意环境变量!

export LUAJIT_LIB=/usr/local/LuaJIT/lib

export LUAJIT_INC=/usr/local/LuaJIT/include/luajit-2.0





2 安装lua-nginx-module

https://github.com/openresty/lua-nginx-module/tags

cd /usr/local

tar zxvf lua-nginx-module-0.10.2.tar.gz 





3 安装ngx_devel_kit

https://github.com/simpl/ngx_devel_kit/tags

http://17173ops.com/2013/11/01/17173-ngx-lua-manual.shtml

cd /usr/local

tar zxvf ngx_devel_kit-0.3.0rc1.tar.gz 





4 安装编译Nginx

http://17173ops.com/2013/11/01/17173-ngx-lua-manual.shtml

给Nginx添加下面的参数,如果已经安装了Nginx则用

  nginx -V

   --add-module=/usr/local/lua-nginx-module-0.10.2 --add-module=/usr/local/ngx_devel_kit-0.3.0rc1

如:

cd /usr/local/nginx-1.9.9

./configure  --prefix=/usr/local/nginxnew/ --with-http_stub_status_module --with-http_ssl_module --with-http_realip_module --add-module=/root/install/ngx_log_if-master --add-module=/usr/local/lua-nginx-module-0.10.2 --add-module=/usr/local/ngx_devel_kit-0.3.0rc1



5 lua插件lua-resty-kafka

https://github.com/doujiang24/lua-resty-kafka

mkdir /usr/local/lua

上传lua-cjson-2.1.0.3.tar.gz到/usr/local/lua

上传lua-resty-kafka到/usr/local/lua

这里遇到些问题,我改写了其中的client.lua文件的两个方法:

方法1:

function _M.new(self, broker_list, client_config)

    local opts = client_config or {}

    local socket_config = {

        socket_timeout = opts.socket_timeout or 3000,

        keepalive_timeout = opts.keepalive_timeout or 600 * 1000,   -- 10 min

        keepalive_size = opts.keepalive_size or 2,

socket_config

    }

    --start 0 == wangsihong 2016-4-16

    
local broker_list_host_ip = opts.broker_list_host_ip or {}



    local cli = setmetatable({

        broker_list = broker_list,

broker_list_host_ip = broker_list_host_ip,

        topic_partitions = {},

        brokers = {},

        client_id = "worker" .. pid(),

        socket_config = socket_config,

    }, mt)

    --end 0 == wangsihong 2016-4-16



    if opts.refresh_interval then

        meta_refresh(nil, cli, opts.refresh_interval / 1000) -- in ms

    end



    return cli

end



方法2:

function _M.choose_broker(self, topic, partition_id)

    local brokers, partitions = self:fetch_metadata(topic)



    if not brokers then

        return nil, partitions

    end



    local partition = partitions[partition_id]

    if not partition then

        return nil, "not found partition"

    end



    local config = brokers[partition.leader]

    if not config then

        return nil, "not found broker"

    end



    --start 1 == wangsihong 2016-4-16

    local broker_list_host_ip = self.broker_list_host_ip
    for k = 1, #broker_list_host_ip do
         local hostip = broker_list_host_ip[k]
         if config ~= nil and hostip ~= nil and config.host == hostip.host then
             config.host = broker_list_host_ip[k].ip
         end
     end

    --end  1 == wangsihong 2016-4-16



    return config

end



6 lua插件cjson

http://www.kyne.com.au/~mark/software/lua-cjson-manual.html

cd /usr/local/lua

tar zxvf lua-cjson-2.1.0.3.tar.gz 





7 安装Nginx1.9.9好后修改nginx.conf



http{

#    lua_package_path "/usr/local/lua/lua-resty-kafka/lib/?.lua;/usr/local/lua/lua-cjson-2.1.0.3/lua/?.lua;;";

#    lua_package_cpath '/usr/local/LuaJIT/lib/lua/5.1/?.so;';

    lua_package_path "/usr/local/lua/lua-resty-kafka/lib/?.lua;;";


    server{

#为了方便调试,关闭了lua_code_cache,如果是生产环境,应该开启它。

lua_code_cache off;



listen 80;

server_name localhost;

location = /lua-v {

            content_by_lua '

                ngx.header.content_type = "text/plain";

                if jit then

                        ngx.say(jit.version)

                else

                        ngx.say(_VERSION)

                end

             ';

        }



location /ht3/video/p {

            content_by_lua '

                ngx.header.content_type = "text/plain";

                --local cjson = require "cjson"

                --local client = require "resty.kafka.client"

                local producer = require "resty.kafka.producer"



                local broker_list = {

                    { host = "10.252.126.242", port = 9092 },

                    { host = "10.252.126.242", port = 9093 },

                    { host = "10.252.126.242", port = 9094 },

                    { host = "10.252.126.242", port = 9095 },

                    { host = "10.252.126.242", port = 9096 },

                }

local broker_list_host_ip = {

   { host = "kafka1.livecourse.com", ip = "10.252.126.242" },
}



                local key = "key"

                --local message = $request|$msec|$remote_addrx|$http_user_agent|$request_body



                local myIP = ngx.req.get_headers()["X-Real-IP"]

                if myIP == nil then

                        myIP = ngx.req.get_headers()["x_forwarded_for"]

                end

                if myIP == nil then

                        myIP = ngx.var.remote_addr

                end

                local h = ngx.req.get_headers()



local message = ngx.req.get_method() .. " " .. ngx.var.uri

if ngx.var.args  ~= nil then

    message = message .. "?" .. ngx.var.args .. "|"

end

message = message .. ngx.now() .. "|"

message = message .. myIP .. "|"

message = message .. h["user-agent"] .. "|"

local bodyData = ngx.req.get_body_data()

if bodyData  == nil then

bodyData = "-"

end

message = message .. bodyData





               -- 定义kafka异步生产者

                -- 发送日志消息,send第二个参数key,用于kafka路由控制:

                -- key为nill(空)时,一段时间向同一partition写入数据

                -- 指定key,按照key的hash写入到对应的partition

               -- 指定key,按照key的hash写入到对应的partition

                local key = nil


                -- this is async producer_type and bp will be reused in the whole nginx worker

                local bp = producer:new(broker_list, { producer_type = "async" ,
 broker_list_host_ip = broker_list_host_ip,refresh_interval = 3000})


                local ok, err = bp:send("HT-VIDEO", key, message)

                if not ok then

                    --ngx.say("send err:", err)

   ngx.say("void(1);");

                    return

                end

ngx.say("void(0);");

                --ngx.say("send success, ok:", ok)

            ';

        }

       

    }



}



8 完成

请求Url:http://localhost/ht3/video/p?a=bbbb 就能将日志内容发送到kafka

你可能感兴趣的文章
openstack网络总结
查看>>
excel 查找一个表的数据在另一个表中是否存在
查看>>
centos 7 上配置dnsmasq 同时支持ipv4和ipv6的DHCP服务
查看>>
AsyncTask、View.post(Runnable)、ViewTreeObserver三种方式总结frame animation自动启动
查看>>
Android中AsyncTask的简单用法
查看>>
S3C6410启动模式介绍
查看>>
Jlink + ADS调试 S3C2440
查看>>
2440初始化存储器原理(接上一篇)
查看>>
S3C2440 USB 设备控制器(转)
查看>>
Linux usb 设备驱动 (1)
查看>>
解决跨网场景下,CAS重定向无法登录的问题(无需修改现有代码)
查看>>
java反编译命令
查看>>
activemq依赖包获取
查看>>
概念区别
查看>>
关于静态块、静态属性、构造块、构造方法的执行顺序
查看>>
final 的作用
查看>>
在Idea中使用Eclipse编译器
查看>>
idea讲web项目部署到tomcat,热部署
查看>>
优化IDEA启动速度,快了好多。后面有什么优化点,会继续往里面添加
查看>>
JMeter 保持sessionId
查看>>