Метаданные тарфика Mikrotik в БД (логи)

Здесь выкладываем скрипты
Правила форума
Уважаемые Пользователи форума, обратите внимание!
Ни при каких обстоятельствах, Администрация форума, не несёт ответственности за какой-либо, прямой или косвенный, ущерб причиненный в результате использования материалов, взятых на этом Сайте или на любом другом сайте, на который имеется гиперссылка с данного Сайта. Возникновение неисправностей, потерю программ или данных в Ваших устройствах, даже если Администрация будет явно поставлена в известность о возможности такого ущерба.
Просим Вас быть предельно осторожными и внимательными, в использовании материалов раздела. Учитывать не только Ваши пожелания, но и границы возможностей вашего оборудования.
Ответить
sergio_sd
Сообщения: 3
Зарегистрирован: 09 окт 2018, 13:01

Добрый день.
Недавно закончил развертывание сервера сбора логов в БД. Готов поделится успешным опытом.
Основное назначение этого решения, для меня, это понимание откуда, куда, по каким протоколам и портам движется трафик.
Вся соль такого подхода заключается в разборе строк на поля и раскладывании их в оптимальный тип данных, а так же отброса лишнего. В результате получаем хранилище для последующего изучения со всеми прелестями СУБД.

Использую Debian9.1, rsyslogd 8.24 , MariaDB-10.1.2 .
Логи принимаю в udp пакетах, далее средствами rsyslog происходит выборка необходимых полей и отправка в СУБД. СУБД в свою очередь перед записью допиливает напильником то, что не удалось сделать в rsyslog.

Настраиваем RSYSLOG.
Добавляем в конфиг /etc/rsyslog.conf

Код: Выделить всё

module(load="imudp")
input(type="imudp" port="514")

module(load="ommysql")
Это откроет порт 514 UDP и загрузит модуль для работы с mysql.

Содаем конфиг /etc/rsyslog.d/40-remote.conf

Код: Выделить всё

$template tpl_traflog,"insert into traflog.traffic (datetime, inif, outif, src, dst, smac, proto, chain, logpref, len) values ('%timereported:::date-mysql%', '%msg:R,ERE,0,DFLT,0:in:[a-zA-Z]+[0-9]+|in:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,BLANK,0:out:[a-zA-Z]+[0-9]+|out:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,DFLT,0:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,DFLT,1:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,BLANK:([0-f]+:){5}[0-f]+--end%', '%msg:R,ERE,0,DFLT:\b[A-X]{3,4}\b--end%', '%msg:R,ERE,0,DFLT:[a-x]+--end%', '%msg:F,32:2%', '%msg:R,ERE,0,DFLT:[0-9]+$--end%' )",SQL
if ($fromhost-ip == '192.168.0.230') then {action(type="ommysql" server="localhost" serverport="3306" db="traflog" uid="rsyslogger" pwd="..." template="tpl_traflog") stop}
Здесь всего две строчки ).
Первая, и самая основная, описывает шаблон - строку SQL кода, для передачи её в СУБД. В качестве значений(values) подставляются данные из получаемой от Микротика строки, в терминологии rsyslog - %msg%. Строка %msg% разбирается с помощью регулярных выражений в такой последовательности: datetime, inif, outif, src, dst, smac, proto, chain, logpref, len. Последовательность полей соответствует последовательности регулярок, поэтому можете легко отследить что здесь к чему относится. Единственное что мне не удалось отделить это src:sport и dst:dport и названия интерфейсов, эти данные прям так и идут в СУБД, но об этом позднее.
Вторая строка это условие когда будет происходить действие, то есть запись в СУБД. Это означает что если источник лога = 192.168.0.230, то используя модуль ommysql с параметрами подключения вызываем шаблон tpl_traflog, а после этого прекращаем дальнейшую обработку строки - stop.
Однако на начальном этапе рекомендую вторую строку закомментировать и добавить новый шаблон + два условия:

Код: Выделить всё

$template tpl_traflog_test,"'%timereported:::date-mysql%', '%msg:R,ERE,0,DFLT,0:in:[a-zA-Z]+[0-9]+|in:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,BLANK,0:out:[a-zA-Z]+[0-9]+|out:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,DFLT,0:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,DFLT,1:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,BLANK:([0-f]+:){5}[0-f]+--end%', '%msg:R,ERE,0,DFLT:\b[A-X]{3,4}\b--end%', '%msg:R,ERE,0,DFLT:[a-x]+--end%', '%msg:F,32:2%', '%msg:R,ERE,0,DFLT:[0-9]+$--end%' "
if ($fromhost-ip == '192.168.0.230') then {action(type="omfile" file="/var/log/remote/192.168.0.230.log" flushOnTXEnd="off" asyncWriting="on" flushInterval="3" ioBufferSize="64k")}
if ($fromhost-ip == '192.168.0.230') then {action(type="omfile" file="/var/log/remote/192.168.0.230.log" template="tpl_traflog_test" flushOnTXEnd="off" asyncWriting="on" flushInterval="3" ioBufferSize="64k") stop}
Это нужно для отладки регулярных выражений и что бы понимать как оно будет выглядеть в БД.
Первое условие пишет не обработанную строку сообщения.
Второе условие пишет обработанную строку.
Шаблон tpl_traflog_test делает то же самое что и tpl_traflog только без SQL кода, чистый текст.

Готовим БД
Добавляем базу данных, создаем таблицу, добавляем пользователя:

Код: Выделить всё

--добавляем базу данных
create database traflog character set utf8 collate utf8_bin;
use traflog;

--добавляем таблицу
create table traffic (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
datetime DATETIME,
inif VARCHAR(20),
outif VARCHAR(20),
src VARCHAR(21),
sport INT(5),
dst VARCHAR(21),
dport INT(5),
smac VARCHAR(17),
proto VARCHAR(4),
chain VARCHAR(8),
logpref VARCHAR(24),
len INT(5)) ENGINE=MYISAM;

--добавляем пользователя
create user rsyslogger@localhost identified by '...';
grant all privileges on traflog.* to rsyslogger@localhost;
Для отделения src:sport dst:dport и названий интерфейсов предлагаю использовать триггер который, с помощью регулярки, сделает это перед вставкой строки в таблицу:

Код: Выделить всё

--добавляем триггер
DELIMITER //
create TRIGGER delim_ip_port BEFORE insert ON traffic
FOR EACH ROW
begin
set NEW.inif = REGEXP_REPLACE ((NEW.inif), 'in:', '' );
set NEW.outif = REGEXP_REPLACE ((NEW.outif), 'out:', '' );
set NEW.sport = REGEXP_REPLACE ((NEW.src), '([0-9]+\.){3}[0-9]+:|([0-9]+\.){3}[0-9]+', '' );
set NEW.src = REGEXP_REPLACE ((NEW.src), ':[0-9]+', '' );
set NEW.dport = REGEXP_REPLACE ((NEW.dst), '([0-9]+\.){3}[0-9]+:|([0-9]+\.){3}[0-9]+', '' );
set NEW.dst = REGEXP_REPLACE ((NEW.dst), ':[0-9]+', '' );
end //
delimiter ;
Проверяем корректно ли работает триггер:

Код: Выделить всё

--вставка тестовой строки
insert into traffic (datetime, inif, outif, src, dst, smac, proto, chain, logpref)
values (20180730075437, 'in:ether6', 'out:VLAN55', '192.168.0.234:4997', '192.168.6.18:65535', '00:15:17:31:b8:d7', 'TCP', 'forward', 'BLOCKSMKNETS');
Далее посмотрим что получилось:

Код: Выделить всё

select * from traffic;
Все верно? Тогда идем дальше.
Добавим хотя бы один индекс, может быть не самый правильный, у меня их несколько и я до сих пор не вкурю как готовить правильный индекс, сделайте на свое усмотрение столько сколько нужно, а для начала хватит и этого:

Код: Выделить всё

--добалвяем индекс
create index traffic_index on traffic (src, dst, dport, datetime);
Все, система готова. Можно запускать сбор, добавьте настройку логгирования на удаленный сервер и опцию логгирования в одно из правил фаервола, добавьте префикс не более 24 символов

Код: Выделить всё

/system logging action
add name=remote remote=192.168.0.19 syslog-facility=local6 target=remote
/system logging
add action=remote topics=firewall
Далее смотрите что повалится в файл /var/log/remote/192.168.0.230.log

Если все в порядке, выборка идет как надо, раскомментируем условие для шаблона tpl_traflog, и закомментируем шаблон tpl_traflog_test и два новых условия, хотя можно и без этого ведь мы будем делать stop нашей строки выше, так для порядку...перезапустим логгер.
Да, еще чуть не забыл, надо дефолтный конфиг в /etc/rsyslog.d/ спустить ниже, я переименовал его в 50-default.conf , дабы remote логи не сыпались в системный журнал /var/log/message

Подождем немного, пока наша БД наполнится. Далее можем начинать выборку.
Несколько моих запросов для примера:
Что бы понимать каков размер нашей БД:

Код: Выделить всё

MariaDB [traflog]> select table_schema as "database", round(sum(data_length + index_length)/1024/1024,2) as "size Mb" from information_schema.tables group by table_schema;
+--------------------+---------+
| database           | size Mb |
+--------------------+---------+
| information_schema |    0.17 |
| traflog            | 2223.94 |
+--------------------+---------+
2 rows in set (0.24 sec)
Да, база данных быстро растет, и с этим надо что-то делать. Либо очищать, либо уменьшать количество логгируемых строк.

Количество строк в БД:

Код: Выделить всё

MariaDB [traflog]> select count(id) from traffic;
+-----------+
| count(id) |
+-----------+
|  13086545 |
+-----------+
1 row in set (0.00 sec)
Посмотрим какие гады сегодня пытались подобраться к моему smtp серверу:

Код: Выделить всё

MariaDB [traflog]> select src,count(dport) from traffic where logpref='SMTP_DNAT' and datetime > '2018100900000000' group by src order by count(dport) desc limit 10;
+-----------------+--------------+
| src             | count(dport) |
+-----------------+--------------+
| 191.96.249.92   |        13811 |
| 191.96.249.61   |         5112 |
| 191.96.249.24   |         5096 |
| 191.96.249.26   |         5009 |
| 185.222.209.54  |         2236 |
| 178.57.79.250   |          363 |
| 185.234.219.32  |          243 |
| 37.49.224.97    |          197 |
| 185.234.219.254 |           93 |
| 89.248.162.145  |           43 |
+-----------------+--------------+
10 rows in set, 1 warning (1.24 sec)
Их время вышло, я сделал фильтр для brutforc-еров после полудня ) :

Код: Выделить всё

MariaDB [traflog]> select src,chain,count(dport) from traffic where logpref='DROP_SMTP_BRUTE' and datetime > '2018100912000000' group by src order by count(dport) desc limit 10;
+-----------------+---------+--------------+
| src             | chain   | count(dport) |
+-----------------+---------+--------------+
| 191.96.249.92   | forward |         3155 |
| 191.96.249.24   | forward |         1198 |
| 191.96.249.26   | forward |         1187 |
| 191.96.249.61   | forward |         1010 |
| 188.165.124.31  | forward |           49 |
| 103.207.38.157  | forward |           32 |
| 144.217.126.189 | forward |           17 |
| 104.131.79.40   | forward |           16 |
+-----------------+---------+--------------+
8 rows in set, 1 warning (0.20 sec)
На этом все, очень надеюсь что решение получит дальнейшую жизнь и развитие не только у меня. В планах хотелось бы иметь доступ к таблице через веб, конструктор запросов, с иерархическим проваливанием в кликнутный src, dst, port, logpref и т.д.
Надо сказать что это не первое мое решение в данной области. Ранее я делал аналогичную штуку для маршуртизаторов zywall, но логи я регекспал текстовые, с помощью perl скрипта и клал в БД, решение было рабочее, и к томуж имело ту самую веб морду где я видел весь трафик проходящий через маршуртизатор, мог отследить каждое соединение, откуда, куда, по каким портам, объем, время начала и конца сессии tcp. Но то особенности логгирования zywall, он выдает лог только по закрытию соединения tcp, а не по каждому прошедшему пакету. Но то другая история, если кому надо могу поискать что сохранилось.

ps Надо было делать на postgresql ) там больше выбор типов данных и многое другое доброе.


sergio_sd
Сообщения: 3
Зарегистрирован: 09 окт 2018, 13:01

Извините, наверно не в тот раздел написал. Конечно к скриптам Mikrotik это мало имеет отношения, увидел соседнюю тему тоже про логи и подумал что по адресу.


Аватара пользователя
podarok66
Модератор
Сообщения: 4355
Зарегистрирован: 11 фев 2012, 18:49
Откуда: МО

sergio_sd писал(а): 09 окт 2018, 16:22 Извините, наверно не в тот раздел написал. Конечно к скриптам Mikrotik это мало имеет отношения, увидел соседнюю тему тоже про логи и подумал что по адресу.
Не, всё нормально. Надо будет попробовать у себя. Благо сервак сейчас опустел совсем, почти никаких задач не осталось.
Спасибо, что поделились.
Теперь вопросы:
1). ($fromhost-ip == '192.168.0.230') - это адрес маршрутизатора, с которого снимаем логи? Или IP сервера?
2). serverport="3306" - это что за порт? Вроде как он всегда к MySQL относился... Тогда в первом пункте точно адрес сервера)))


Мануалы изучил и нигде не ошибся? Фаервол отключил? Очереди погасил? Витая пара проверена? ... Тогда Netinstal'ом железку прошей и настрой ее заново. Что, все равно не фурычит? Тогда к нам. Если не подскажем, хоть посочувствуем...
sergio_sd
Сообщения: 3
Зарегистрирован: 09 окт 2018, 13:01

podarok66
1. Да это адрес маршрутизатора. Условие по ip адресу источника, как один из вариантов. Можно выбирать по другим параметрам, например по содержимому строки в нашем случае можно по LOGPREF с направлением в отдельную таблицу.
2 Верно. Это порт сервера mysql, он передается в качестве параметра в action. Поскольку СУБД имеется на той же машине то server="localhost"

Код: Выделить всё

action(type="ommysql" server="localhost" serverport="3306" db="traflog" uid="rsyslogger" pwd="..." ...)
Вообще, надо сказать что эти никсовые логгеры (syslog-ng, rsyslog, etc) могут быть невероятно полезны , такие элементарные вещи как оповещение на email и мессенджеры в купе с широкими возможностями выборки и фильтрации а также централизованного сбора, единожды настроены, будут всегда на страже безопасности и отказоустойчивости любой сети. Хоть малой хоть большой.
На первый взгляд мутная вещь, но разобраться стоит.


Аватара пользователя
podarok66
Модератор
Сообщения: 4355
Зарегистрирован: 11 фев 2012, 18:49
Откуда: МО

Так как IT-сфера никогда не являлось для меня основной (как это ни странно), мне никогда не хватало времени досконально разобраться с базами данных. Хотя желание периодически возникало и возникает. Да что там досконально, всё на уровне копипаста и медленного, построчного ползанья по конфигу в попытках решить очередную проблему. Таблицы вообще у меня в голове никак не уложились и кажутся сумбуром, несмотря на то, что душой я чувствую их несомненную логическую упорядоченность и математическую красоту. :smu:sche_nie:
Ну бестолочь, что с меня взять...
Я попробую разобраться, но обещать не стану. Просто, чтобы не наобещать лишнего.
И ещё раз спасибо.


Мануалы изучил и нигде не ошибся? Фаервол отключил? Очереди погасил? Витая пара проверена? ... Тогда Netinstal'ом железку прошей и настрой ее заново. Что, все равно не фурычит? Тогда к нам. Если не подскажем, хоть посочувствуем...
Ответить