分类
devops

Real-time log analysis system filebeat kafka and apache doris

一 nginx access log json

log_format json_log '{'
'"remote_addr":"$remote_addr",'
'"remote_user":"$remote_user",'
'"body_bytes_sent":"$body_bytes_sent",'
'"request_time":"$request_time",'
'"status":"$status",'
'"host":"$host",'
'"uri":"$uri",'
'"server":"$server_name",'
'"port":"$server_port",'
'"protocol":"$server_protocol",'
'"request_uri":"$request_uri",'
'"request_body":"$request_body",'
'"request_method":"$request_method",'
'"http_referrer":"$http_referer",'
'"body_bytes_sent":"$body_bytes_sent",'
'"http_x_forwarded_for":"$http_x_forwarded_for",'
'"http_user_agent":"$http_user_agent",'
'"time_iso8601":"$time_iso8601",'
'"time_local":"$time_local",'
'"request":"$request",'
'"http_referer":"$http_referer",'
'"http_user_agent":"$http_user_agent",'
'"upstream_response_time":"$upstream_response_time"'
'}';

access_log  logs/access.log  main;
access_log  logs/access_json.log  json_log;

二 kafka

./bin/kafka-topics.sh \
--create \
--bootstrap-server 192.168.76.153:9092 \
--replication-factor 3 \
--partitions 16 \
--topic nginx-access-log

三 filebeat

# https://www.elastic.co/docs/reference/beats/filebeat/running-on-docker
mkdir -p /data/filebeat/data

cat >/data/filebeat/filebeat.docker.yml<<EOF

fields_under_root: true
fields:
  instance_id: i-10a64379
  region: us-east-1
  log_topic: nginx-access-log

filebeat.inputs:
- type: filestream
  id: my-filestream-id
  tags: ["nginx-access"]
  fields_under_root: true
  fields:
    user_id: 123
    nginx: true
  paths:
    - /data/nginx/logs/access_json.log
    # - /var/log/nginx/access.log
    # - /data/nginx/logs/access.log

# - type: filestream
#   id: apache-filestream-id
#   paths:
#     - "/var/log/apache2/*"
#   fields:
#     apache: true
#   fields_under_root: true


# Configure the precision of all timestamps. By default it is set to millisecond. Available options: millisecond, microsecond, nanosecond
timestamp.precision: microsecond

processors:
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_id: ~
  - add_locale: ~

output.console:
  enabled: false
  pretty: true

output.kafka:
  enabled: true
  # initial brokers for reading cluster metadata
  hosts: ["192.168.76.152:9092", "192.168.76.153:9092", "192.168.76.154:9092"]

  # message topic selection + partitioning
  #topic: '%{[fields.log_topic]}'
  topic: nginx-access-log
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

EOF
docker rm -f filebeat >/dev/null 2>&1 || true
docker run \
-d \
--name=filebeat \
--restart always \
--user=root \
-e TZ=Asia/Shanghai \
-v /etc/timezone:/etc/timezone:ro \
--volume="/data/filebeat/filebeat.docker.yml:/usr/share/filebeat/filebeat.yml:ro" \
--volume="/var/lib/docker/containers:/var/lib/docker/containers:ro" \
--volume="/var/run/docker.sock:/var/run/docker.sock:ro" \
--volume="/data/nginx/logs:/data/nginx/logs:ro" \
--volume="/data/filebeat/data:/usr/share/filebeat/data:rw" \
docker.elastic.co/beats/filebeat:9.1.2 filebeat -e --strict.perms=false

四 创建doris ROUTINE LOAD

mysql --host 192.168.76.152 --port=9030 --user=root
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 19
Server version: 5.7.99 Doris version doris-3.0.6.2-rc01-910c4249c5

Copyright (c) 2000, 2024, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| __internal_schema  |
| information_schema |
| mysql              |
+--------------------+
3 rows in set (0.02 sec)
CREATE DATABASE IF NOT EXISTS testdb;

-- drop table if EXISTS testdb.nginx_access_log;

CREATE TABLE testdb.nginx_access_log(
    id                 VARCHAR(512)         NOT NULL COMMENT "id",
    message            JSON                  COMMENT "message",
    timestamp          VARCHAR(512)              COMMENT "timestamp"
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10;


CREATE ROUTINE LOAD testdb.nginx_access_log_routine_load_json ON nginx_access_log
COLUMNS(id,message,timestamp)
PROPERTIES(
    "format"="json",
    "desired_concurrent_number" = "3",
    "timezone" = "Asia/Shanghai",
    "max_batch_interval" = "6",
    "max_batch_size" = "104857600",
    "max_batch_rows" = "200002",
    "jsonpaths"="[\"$.@metadata._id\",\"$.message\",\"$.@timestamp\"]"
)
FROM KAFKA(
    "kafka_broker_list" = "192.168.76.152:9092,192.168.76.153:9092,192.168.76.154:9092",
    "kafka_topic" = "nginx-access-log",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);


RESUME ROUTINE LOAD FOR testdb.nginx_access_log_routine_load_json;
SHOW ROUTINE LOAD FOR testdb.nginx_access_log_routine_load_json\G;

-- STOP ROUTINE LOAD FOR testdb.nginx_access_log_routine_load_json;


select * from testdb.nginx_access_log;

这个字段@timestamp 好像和 DATETIME 不配套所以暂时使用VARCHAR //TODO 待研究

五 创建doris materialized view

-- drop MATERIALIZED VIEW testdb.nginx_access_log_view;

-- refreshUnit(刷新时间间隔单位)可以是 minute, hour,day,week 等。
CREATE MATERIALIZED VIEW
IF NOT EXISTS testdb.nginx_access_log_view
BUILD IMMEDIATE
REFRESH AUTO
ON SCHEDULE EVERY 1 minute
DISTRIBUTED BY HASH(remote_addr) BUCKETS 10
PROPERTIES ("replication_num" = "1")
AS
SELECT
    get_json_string(message, '$.remote_addr')        AS remote_addr,
    get_json_string(message, '$.remote_user')        AS remote_user,
    get_json_string(message, '$.body_bytes_sent')    AS body_bytes_sent,
    get_json_string(message, '$.request_time')       AS request_time,
    get_json_string(message, '$.status')             AS status,
    get_json_string(message, '$.host')               AS host,
    get_json_string(message, '$.uri')                AS uri,
    get_json_string(message, '$.server')             AS server,
    get_json_string(message, '$.port')               AS port,
    get_json_string(message, '$.protocol')           AS protocol,
    get_json_string(message, '$.request_uri')        AS request_uri,
    get_json_string(message, '$.request_body')       AS request_body,
    get_json_string(message, '$.request_method')     AS request_method,
    get_json_string(message, '$.http_referrer')      AS http_referrer,
    get_json_string(message, '$.http_x_forwarded_for') AS http_x_forwarded_for,
    get_json_string(message, '$.http_user_agent')    AS http_user_agent,
    get_json_string(message, '$.time_iso8601')       AS time_iso8601,
    get_json_string(message, '$.time_local')         AS time_local,
    get_json_string(message, '$.request')            AS request,
    get_json_string(message, '$.http_referer')       AS http_referer,
    get_json_string(message, '$.upstream_response_time') AS upstream_response_time
FROM testdb.nginx_access_log
WHERE message IS NOT NULL ;

六 统计

select status,count(*) as count,remote_addr from testdb.nginx_access_log_view where status>400 group by status,remote_addr order by count desc;
select
remote_addr ,
sum(case when status between 400 and 499 then 1 else 0 end) Status4xx ,
sum(case when status between 500 and 599 then 1 else 0 end) Status5xx ,
count(*) acount
from testdb.nginx_access_log_view
where status between 400 and 599
group by remote_addr
order by acount desc;

doris创建的物化试图不会跟着原表的增长而增长? 使用异步物化视图并设置

BUILD IMMEDIATE
REFRESH AUTO
ON SCHEDULE EVERY 1 minute

七 其他

ReasonOfStateChanged: ErrorReason{code=errCode = 4, msg='errCode = 2, detailMessage = Failed to get all partitions of kafka topic: nginx_access_log error: errCode = 2, detailMessage = Failed to get info may be Kafka properties set in job is error or no partition in this topic that should check Kafka'}

检查topic name是否存在?

ReasonOfStateChanged: ErrorReason{code=errCode = 102, msg='current error rows is more than max_error_number or the max_filter_ratio is more than the value set'}
OtherMsg: errCode = 2, detailMessage = Failed to get all partitions of kafka topic: nginx-access-log error: errCode = 2, detailMessage = Failed to get info. No alive backends may be Kafka properties set in job is error or no partition in this topic that should check Kafka

问题: filebeat output 是将全部json写入kafka还是只截取message部分?

答案: 默认情况下,Filebeat 会将整个日志事件(event)以 JSON 格式写入 Kafka,不仅仅是 message 字段,而是包含多个字段的完整结构。