Недавно закончил развертывание сервера сбора логов в БД. Готов поделится успешным опытом.
Основное назначение этого решения, для меня, это понимание откуда, куда, по каким протоколам и портам движется трафик.
Вся соль такого подхода заключается в разборе строк на поля и раскладывании их в оптимальный тип данных, а так же отброса лишнего. В результате получаем хранилище для последующего изучения со всеми прелестями СУБД.
Использую Debian9.1, rsyslogd 8.24 , MariaDB-10.1.2 .
Логи принимаю в udp пакетах, далее средствами rsyslog происходит выборка необходимых полей и отправка в СУБД. СУБД в свою очередь перед записью допиливает напильником то, что не удалось сделать в rsyslog.
Настраиваем RSYSLOG.
Добавляем в конфиг /etc/rsyslog.conf
Код: Выделить всё
module(load="imudp")
input(type="imudp" port="514")
module(load="ommysql")
Содаем конфиг /etc/rsyslog.d/40-remote.conf
Код: Выделить всё
$template tpl_traflog,"insert into traflog.traffic (datetime, inif, outif, src, dst, smac, proto, chain, logpref, len) values ('%timereported:::date-mysql%', '%msg:R,ERE,0,DFLT,0:in:[a-zA-Z]+[0-9]+|in:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,BLANK,0:out:[a-zA-Z]+[0-9]+|out:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,DFLT,0:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,DFLT,1:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,BLANK:([0-f]+:){5}[0-f]+--end%', '%msg:R,ERE,0,DFLT:\b[A-X]{3,4}\b--end%', '%msg:R,ERE,0,DFLT:[a-x]+--end%', '%msg:F,32:2%', '%msg:R,ERE,0,DFLT:[0-9]+$--end%' )",SQL
if ($fromhost-ip == '192.168.0.230') then {action(type="ommysql" server="localhost" serverport="3306" db="traflog" uid="rsyslogger" pwd="..." template="tpl_traflog") stop}
Первая, и самая основная, описывает шаблон - строку SQL кода, для передачи её в СУБД. В качестве значений(values) подставляются данные из получаемой от Микротика строки, в терминологии rsyslog - %msg%. Строка %msg% разбирается с помощью регулярных выражений в такой последовательности: datetime, inif, outif, src, dst, smac, proto, chain, logpref, len. Последовательность полей соответствует последовательности регулярок, поэтому можете легко отследить что здесь к чему относится. Единственное что мне не удалось отделить это src:sport и dst:dport и названия интерфейсов, эти данные прям так и идут в СУБД, но об этом позднее.
Вторая строка это условие когда будет происходить действие, то есть запись в СУБД. Это означает что если источник лога = 192.168.0.230, то используя модуль ommysql с параметрами подключения вызываем шаблон tpl_traflog, а после этого прекращаем дальнейшую обработку строки - stop.
Однако на начальном этапе рекомендую вторую строку закомментировать и добавить новый шаблон + два условия:
Код: Выделить всё
$template tpl_traflog_test,"'%timereported:::date-mysql%', '%msg:R,ERE,0,DFLT,0:in:[a-zA-Z]+[0-9]+|in:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,BLANK,0:out:[a-zA-Z]+[0-9]+|out:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,DFLT,0:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,DFLT,1:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,BLANK:([0-f]+:){5}[0-f]+--end%', '%msg:R,ERE,0,DFLT:\b[A-X]{3,4}\b--end%', '%msg:R,ERE,0,DFLT:[a-x]+--end%', '%msg:F,32:2%', '%msg:R,ERE,0,DFLT:[0-9]+$--end%' "
if ($fromhost-ip == '192.168.0.230') then {action(type="omfile" file="/var/log/remote/192.168.0.230.log" flushOnTXEnd="off" asyncWriting="on" flushInterval="3" ioBufferSize="64k")}
if ($fromhost-ip == '192.168.0.230') then {action(type="omfile" file="/var/log/remote/192.168.0.230.log" template="tpl_traflog_test" flushOnTXEnd="off" asyncWriting="on" flushInterval="3" ioBufferSize="64k") stop}
Первое условие пишет не обработанную строку сообщения.
Второе условие пишет обработанную строку.
Шаблон tpl_traflog_test делает то же самое что и tpl_traflog только без SQL кода, чистый текст.
Готовим БД
Добавляем базу данных, создаем таблицу, добавляем пользователя:
Код: Выделить всё
--добавляем базу данных
create database traflog character set utf8 collate utf8_bin;
use traflog;
--добавляем таблицу
create table traffic (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
datetime DATETIME,
inif VARCHAR(20),
outif VARCHAR(20),
src VARCHAR(21),
sport INT(5),
dst VARCHAR(21),
dport INT(5),
smac VARCHAR(17),
proto VARCHAR(4),
chain VARCHAR(8),
logpref VARCHAR(24),
len INT(5)) ENGINE=MYISAM;
--добавляем пользователя
create user rsyslogger@localhost identified by '...';
grant all privileges on traflog.* to rsyslogger@localhost;
Код: Выделить всё
--добавляем триггер
DELIMITER //
create TRIGGER delim_ip_port BEFORE insert ON traffic
FOR EACH ROW
begin
set NEW.inif = REGEXP_REPLACE ((NEW.inif), 'in:', '' );
set NEW.outif = REGEXP_REPLACE ((NEW.outif), 'out:', '' );
set NEW.sport = REGEXP_REPLACE ((NEW.src), '([0-9]+\.){3}[0-9]+:|([0-9]+\.){3}[0-9]+', '' );
set NEW.src = REGEXP_REPLACE ((NEW.src), ':[0-9]+', '' );
set NEW.dport = REGEXP_REPLACE ((NEW.dst), '([0-9]+\.){3}[0-9]+:|([0-9]+\.){3}[0-9]+', '' );
set NEW.dst = REGEXP_REPLACE ((NEW.dst), ':[0-9]+', '' );
end //
delimiter ;
Код: Выделить всё
--вставка тестовой строки
insert into traffic (datetime, inif, outif, src, dst, smac, proto, chain, logpref)
values (20180730075437, 'in:ether6', 'out:VLAN55', '192.168.0.234:4997', '192.168.6.18:65535', '00:15:17:31:b8:d7', 'TCP', 'forward', 'BLOCKSMKNETS');
Код: Выделить всё
select * from traffic;
Добавим хотя бы один индекс, может быть не самый правильный, у меня их несколько и я до сих пор не вкурю как готовить правильный индекс, сделайте на свое усмотрение столько сколько нужно, а для начала хватит и этого:
Код: Выделить всё
--добалвяем индекс
create index traffic_index on traffic (src, dst, dport, datetime);
Код: Выделить всё
/system logging action
add name=remote remote=192.168.0.19 syslog-facility=local6 target=remote
/system logging
add action=remote topics=firewall
Если все в порядке, выборка идет как надо, раскомментируем условие для шаблона tpl_traflog, и закомментируем шаблон tpl_traflog_test и два новых условия, хотя можно и без этого ведь мы будем делать stop нашей строки выше, так для порядку...перезапустим логгер.
Да, еще чуть не забыл, надо дефолтный конфиг в /etc/rsyslog.d/ спустить ниже, я переименовал его в 50-default.conf , дабы remote логи не сыпались в системный журнал /var/log/message
Подождем немного, пока наша БД наполнится. Далее можем начинать выборку.
Несколько моих запросов для примера:
Что бы понимать каков размер нашей БД:
Код: Выделить всё
MariaDB [traflog]> select table_schema as "database", round(sum(data_length + index_length)/1024/1024,2) as "size Mb" from information_schema.tables group by table_schema;
+--------------------+---------+
| database | size Mb |
+--------------------+---------+
| information_schema | 0.17 |
| traflog | 2223.94 |
+--------------------+---------+
2 rows in set (0.24 sec)
Количество строк в БД:
Код: Выделить всё
MariaDB [traflog]> select count(id) from traffic;
+-----------+
| count(id) |
+-----------+
| 13086545 |
+-----------+
1 row in set (0.00 sec)
Код: Выделить всё
MariaDB [traflog]> select src,count(dport) from traffic where logpref='SMTP_DNAT' and datetime > '2018100900000000' group by src order by count(dport) desc limit 10;
+-----------------+--------------+
| src | count(dport) |
+-----------------+--------------+
| 191.96.249.92 | 13811 |
| 191.96.249.61 | 5112 |
| 191.96.249.24 | 5096 |
| 191.96.249.26 | 5009 |
| 185.222.209.54 | 2236 |
| 178.57.79.250 | 363 |
| 185.234.219.32 | 243 |
| 37.49.224.97 | 197 |
| 185.234.219.254 | 93 |
| 89.248.162.145 | 43 |
+-----------------+--------------+
10 rows in set, 1 warning (1.24 sec)
Код: Выделить всё
MariaDB [traflog]> select src,chain,count(dport) from traffic where logpref='DROP_SMTP_BRUTE' and datetime > '2018100912000000' group by src order by count(dport) desc limit 10;
+-----------------+---------+--------------+
| src | chain | count(dport) |
+-----------------+---------+--------------+
| 191.96.249.92 | forward | 3155 |
| 191.96.249.24 | forward | 1198 |
| 191.96.249.26 | forward | 1187 |
| 191.96.249.61 | forward | 1010 |
| 188.165.124.31 | forward | 49 |
| 103.207.38.157 | forward | 32 |
| 144.217.126.189 | forward | 17 |
| 104.131.79.40 | forward | 16 |
+-----------------+---------+--------------+
8 rows in set, 1 warning (0.20 sec)
Надо сказать что это не первое мое решение в данной области. Ранее я делал аналогичную штуку для маршуртизаторов zywall, но логи я регекспал текстовые, с помощью perl скрипта и клал в БД, решение было рабочее, и к томуж имело ту самую веб морду где я видел весь трафик проходящий через маршуртизатор, мог отследить каждое соединение, откуда, куда, по каким портам, объем, время начала и конца сессии tcp. Но то особенности логгирования zywall, он выдает лог только по закрытию соединения tcp, а не по каждому прошедшему пакету. Но то другая история, если кому надо могу поискать что сохранилось.
ps Надо было делать на postgresql ) там больше выбор типов данных и многое другое доброе.