一 nginx access log json
log_format json_log '{'
'"remote_addr":"$remote_addr",'
'"remote_user":"$remote_user",'
'"body_bytes_sent":"$body_bytes_sent",'
'"request_time":"$request_time",'
'"status":"$status",'
'"host":"$host",'
'"uri":"$uri",'
'"server":"$server_name",'
'"port":"$server_port",'
'"protocol":"$server_protocol",'
'"request_uri":"$request_uri",'
'"request_body":"$request_body",'
'"request_method":"$request_method",'
'"http_referrer":"$http_referer",'
'"body_bytes_sent":"$body_bytes_sent",'
'"http_x_forwarded_for":"$http_x_forwarded_for",'
'"http_user_agent":"$http_user_agent",'
'"time_iso8601":"$time_iso8601",'
'"time_local":"$time_local",'
'"request":"$request",'
'"http_referer":"$http_referer",'
'"http_user_agent":"$http_user_agent",'
'"upstream_response_time":"$upstream_response_time"'
'}';
access_log logs/access.log main;
access_log logs/access_json.log json_log;
二 kafka
./bin/kafka-topics.sh \
--create \
--bootstrap-server 192.168.76.153:9092 \
--replication-factor 3 \
--partitions 16 \
--topic nginx-access-log
三 filebeat
# https://www.elastic.co/docs/reference/beats/filebeat/running-on-docker
mkdir -p /data/filebeat/data
cat >/data/filebeat/filebeat.docker.yml<<EOF
fields_under_root: true
fields:
instance_id: i-10a64379
region: us-east-1
log_topic: nginx-access-log
filebeat.inputs:
- type: filestream
id: my-filestream-id
tags: ["nginx-access"]
fields_under_root: true
fields:
user_id: 123
nginx: true
paths:
- /data/nginx/logs/access_json.log
# - /var/log/nginx/access.log
# - /data/nginx/logs/access.log
# - type: filestream
# id: apache-filestream-id
# paths:
# - "/var/log/apache2/*"
# fields:
# apache: true
# fields_under_root: true
# Configure the precision of all timestamps. By default it is set to millisecond. Available options: millisecond, microsecond, nanosecond
timestamp.precision: microsecond
processors:
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_id: ~
- add_locale: ~
output.console:
enabled: false
pretty: true
output.kafka:
enabled: true
# initial brokers for reading cluster metadata
hosts: ["192.168.76.152:9092", "192.168.76.153:9092", "192.168.76.154:9092"]
# message topic selection + partitioning
#topic: '%{[fields.log_topic]}'
topic: nginx-access-log
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
EOF
docker rm -f filebeat >/dev/null 2>&1 || true
docker run \
-d \
--name=filebeat \
--restart always \
--user=root \
-e TZ=Asia/Shanghai \
-v /etc/timezone:/etc/timezone:ro \
--volume="/data/filebeat/filebeat.docker.yml:/usr/share/filebeat/filebeat.yml:ro" \
--volume="/var/lib/docker/containers:/var/lib/docker/containers:ro" \
--volume="/var/run/docker.sock:/var/run/docker.sock:ro" \
--volume="/data/nginx/logs:/data/nginx/logs:ro" \
--volume="/data/filebeat/data:/usr/share/filebeat/data:rw" \
docker.elastic.co/beats/filebeat:9.1.2 filebeat -e --strict.perms=false
四 创建doris ROUTINE LOAD
mysql --host 192.168.76.152 --port=9030 --user=root
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 19
Server version: 5.7.99 Doris version doris-3.0.6.2-rc01-910c4249c5
Copyright (c) 2000, 2024, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| __internal_schema |
| information_schema |
| mysql |
+--------------------+
3 rows in set (0.02 sec)
CREATE DATABASE IF NOT EXISTS testdb;
-- drop table if EXISTS testdb.nginx_access_log;
CREATE TABLE testdb.nginx_access_log(
id VARCHAR(512) NOT NULL COMMENT "id",
message JSON COMMENT "message",
timestamp VARCHAR(512) COMMENT "timestamp"
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10;
CREATE ROUTINE LOAD testdb.nginx_access_log_routine_load_json ON nginx_access_log
COLUMNS(id,message,timestamp)
PROPERTIES(
"format"="json",
"desired_concurrent_number" = "3",
"timezone" = "Asia/Shanghai",
"max_batch_interval" = "6",
"max_batch_size" = "104857600",
"max_batch_rows" = "200002",
"jsonpaths"="[\"$.@metadata._id\",\"$.message\",\"$.@timestamp\"]"
)
FROM KAFKA(
"kafka_broker_list" = "192.168.76.152:9092,192.168.76.153:9092,192.168.76.154:9092",
"kafka_topic" = "nginx-access-log",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
RESUME ROUTINE LOAD FOR testdb.nginx_access_log_routine_load_json;
SHOW ROUTINE LOAD FOR testdb.nginx_access_log_routine_load_json\G;
-- STOP ROUTINE LOAD FOR testdb.nginx_access_log_routine_load_json;
select * from testdb.nginx_access_log;
这个字段@timestamp
好像和 DATETIME
不配套所以暂时使用VARCHAR //TODO 待研究
五 创建doris materialized view
-- drop MATERIALIZED VIEW testdb.nginx_access_log_view;
-- refreshUnit(刷新时间间隔单位)可以是 minute, hour,day,week 等。
CREATE MATERIALIZED VIEW
IF NOT EXISTS testdb.nginx_access_log_view
BUILD IMMEDIATE
REFRESH AUTO
ON SCHEDULE EVERY 1 minute
DISTRIBUTED BY HASH(remote_addr) BUCKETS 10
PROPERTIES ("replication_num" = "1")
AS
SELECT
get_json_string(message, '$.remote_addr') AS remote_addr,
get_json_string(message, '$.remote_user') AS remote_user,
get_json_string(message, '$.body_bytes_sent') AS body_bytes_sent,
get_json_string(message, '$.request_time') AS request_time,
get_json_string(message, '$.status') AS status,
get_json_string(message, '$.host') AS host,
get_json_string(message, '$.uri') AS uri,
get_json_string(message, '$.server') AS server,
get_json_string(message, '$.port') AS port,
get_json_string(message, '$.protocol') AS protocol,
get_json_string(message, '$.request_uri') AS request_uri,
get_json_string(message, '$.request_body') AS request_body,
get_json_string(message, '$.request_method') AS request_method,
get_json_string(message, '$.http_referrer') AS http_referrer,
get_json_string(message, '$.http_x_forwarded_for') AS http_x_forwarded_for,
get_json_string(message, '$.http_user_agent') AS http_user_agent,
get_json_string(message, '$.time_iso8601') AS time_iso8601,
get_json_string(message, '$.time_local') AS time_local,
get_json_string(message, '$.request') AS request,
get_json_string(message, '$.http_referer') AS http_referer,
get_json_string(message, '$.upstream_response_time') AS upstream_response_time
FROM testdb.nginx_access_log
WHERE message IS NOT NULL ;
六 统计
select status,count(*) as count,remote_addr from testdb.nginx_access_log_view where status>400 group by status,remote_addr order by count desc;
select
remote_addr ,
sum(case when status between 400 and 499 then 1 else 0 end) Status4xx ,
sum(case when status between 500 and 599 then 1 else 0 end) Status5xx ,
count(*) acount
from testdb.nginx_access_log_view
where status between 400 and 599
group by remote_addr
order by acount desc;
doris创建的物化试图不会跟着原表的增长而增长? 使用异步物化视图并设置
BUILD IMMEDIATE
REFRESH AUTO
ON SCHEDULE EVERY 1 minute
七 其他
ReasonOfStateChanged: ErrorReason{code=errCode = 4, msg='errCode = 2, detailMessage = Failed to get all partitions of kafka topic: nginx_access_log error: errCode = 2, detailMessage = Failed to get info may be Kafka properties set in job is error or no partition in this topic that should check Kafka'}
检查topic name是否存在?
ReasonOfStateChanged: ErrorReason{code=errCode = 102, msg='current error rows is more than max_error_number or the max_filter_ratio is more than the value set'}
OtherMsg: errCode = 2, detailMessage = Failed to get all partitions of kafka topic: nginx-access-log error: errCode = 2, detailMessage = Failed to get info. No alive backends may be Kafka properties set in job is error or no partition in this topic that should check Kafka
问题: filebeat output 是将全部json写入kafka还是只截取message部分?
答案: 默认情况下,Filebeat 会将整个日志事件(event)以 JSON 格式写入 Kafka,不仅仅是 message 字段,而是包含多个字段的完整结构。