omhiredis: Redis Output Module
Module Name: | omhiredis |
Author: | Brian Knox <bknox@digitalocean.com> |
Contributors: | Theo Bertin <theo.bertin@advens.fr> |
Purpose
This module provides native support for writing to Redis, using the hiredis client library.
Configuration Parameters
Note
Parameter names are case-insensitive.
Action Parameters
Server
type | default | mandatory |
|
---|---|---|---|
word | none | no | none |
Name or address of the Redis server
ServerPort
type | default | mandatory |
|
---|---|---|---|
integer | 6379 | no | none |
Port of the Redis server if the server is not listening on the default port.
ServerPassword
type | default | mandatory |
|
---|---|---|---|
word | none | no | none |
Password to support authenticated redis database server to push messages across networks and datacenters. Parameter is optional if not provided AUTH command wont be sent to the server.
Mode
type | default | mandatory |
|
---|---|---|---|
word | template | no | none |
Mode to run the output action in: “queue”, “publish”, “set” or “stream” If not supplied, the original “template” mode is used.
Note
Due to a config parsing bug in 8.13, explicitly setting this to “template” mode will result in a config parsing error.
If mode is “set”, omhiredis will send SET commands. If “expiration” parameter is provided (see parameter below), omhiredis will send SETEX commands.
If mode is “stream”, logs will be sent using XADD. In that case, the template-formatted message will be inserted in the msg field of the stream ID (this behaviour can be controlled through the stream.outField parameter)
Template
type | default | mandatory |
|
---|---|---|---|
word | RSYSLOG_ForwardFormat | no | none |
Warning
Template is required if using “template” mode.
Key
type | default | mandatory |
|
---|---|---|---|
word | none | no | none |
Key is required if using “publish”, “queue” or “set” modes.
Dynakey
type | default | mandatory |
|
---|---|---|---|
binary | off | no | none |
If set to “on”, the key value will be considered a template by Rsyslog. Useful when dynamic key generation is desired.
Userpush
type | default | mandatory |
|
---|---|---|---|
binary | off | no | none |
If set to on, use RPUSH instead of LPUSH, if not set or off, use LPUSH.
Expiration
type | default | mandatory |
|
---|---|---|---|
number | 0 | no | none |
Only applicable with mode “set”. Specifies an expiration for the keys set by omhiredis. If this parameter is not specified, the value will be 0 so keys will last forever, otherwise they will last for X seconds.
stream.outField
type | default | mandatory |
|
---|---|---|---|
word | msg | no | none |
Note
Currently, the module cannot use the full message object, so it can only insert templated messages to a single stream entry’s specific field
stream.capacityLimit
type | default | mandatory |
|
---|---|---|---|
positive integer | 0 | no | none |
Warning
This parameter has no way to check if the deleted entries have been ACK’ed once or even used, this should be set if you’re sure the insertion rate in lower that the dequeuing to avoid losing entries!
stream.ack
type | default | mandatory |
|
---|---|---|---|
boolean | off | no | none |
stream.del
type | default | mandatory |
|
---|---|---|---|
boolean | off | no | none |
stream.keyAck
type | default | mandatory |
|
---|---|---|---|
word | no | none |
stream.dynaKeyAck
type | default | mandatory |
|
---|---|---|---|
boolean | off | no | none |
stream.groupAck
type | default | mandatory |
|
---|---|---|---|
word | no | none |
stream.dynaGroupAck
type | default | mandatory |
|
---|---|---|---|
boolean | off | no | none |
stream.indexAck
type | default | mandatory |
|
---|---|---|---|
word | no | none |
stream.dynaIndexAck
type | default | mandatory |
|
---|---|---|---|
boolean | off | no | none |
Examples
Example 1: Template mode
In “template” mode, the string constructed by the template is sent to Redis as a command.
Note
This mode has problems with strings with spaces in them - full message won’t work correctly. In this mode, the template argument is required, and the key argument is meaningless.
module(load="omhiredis")
template(
name="program_count_tmpl"
type="string"
string="HINCRBY progcount %programname% 1")
action(
name="count_programs"
server="my-redis-server.example.com"
serverport="6379"
type="omhiredis"
mode="template"
template="program_count_tmpl")
Results
Here’s an example redis-cli session where we HGETALL the counts:
> redis-cli
127.0.0.1:6379> HGETALL progcount
1) "rsyslogd"
2) "35"
3) "rsyslogd-pstats"
4) "4302"
Example 2: Queue mode
In “queue” mode, the syslog message is pushed into a Redis list at “key”, using the LPUSH command. If a template is not supplied, the plugin will default to the RSYSLOG_ForwardFormat template.
module(load="omhiredis")
action(
name="push_redis"
server="my-redis-server.example.com"
serverport="6379"
type="omhiredis"
mode="queue"
key="my_queue")
Results
Here’s an example redis-cli session where we RPOP from the queue:
> redis-cli
127.0.0.1:6379> RPOP my_queue
"<46>2015-09-17T10:54:50.080252-04:00 myhost rsyslogd: [origin software=\"rsyslogd\" swVersion=\"8.13.0.master\" x-pid=\"6452\" x-info=\"http://www.rsyslog.com\"] start"
127.0.0.1:6379>
Example 3: Publish mode
In “publish” mode, the syslog message is published to a Redis topic set by “key”. If a template is not supplied, the plugin will default to the RSYSLOG_ForwardFormat template.
module(load="omhiredis")
action(
name="publish_redis"
server="my-redis-server.example.com"
serverport="6379"
type="omhiredis"
mode="publish"
key="my_channel")
Results
Here’s an example redis-cli session where we SUBSCRIBE to the topic:
> redis-cli
127.0.0.1:6379> subscribe my_channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "my_channel"
3) (integer) 1
1) "message"
2) "my_channel"
3) "<46>2015-09-17T10:55:44.486416-04:00 myhost rsyslogd-pstats: {\"name\":\"imuxsock\",\"origin\":\"imuxsock\",\"submitted\":0,\"ratelimit.discarded\":0,\"ratelimit.numratelimiters\":0}"
Example 4: Set mode
In “set” mode, the syslog message is set as a Redis key at “key”. If a template is not supplied, the plugin will default to the RSYSLOG_ForwardFormat template.
module(load="omhiredis")
action(
name="set_redis"
server="my-redis-server.example.com"
serverport="6379"
type="omhiredis"
mode="set"
key="my_key")
Results
Here’s an example redis-cli session where we get the key:
> redis-cli
127.0.0.1:6379> get my_key
"<46>2019-12-17T20:16:54.781239+00:00 localhost rsyslogd-pstats: { \"name\": \"main Q\", \"origin\": \"core.queue\",
\"size\": 3, \"enqueued\": 7, \"full\": 0, \"discarded.full\": 0, \"discarded.nf\": 0, \"maxqsize\": 3 }"
127.0.0.1:6379> ttl my_key
(integer) -1
Example 5: Set mode with expiration
In “set” mode when “expiration” is set to a positive integer, the syslog message is set as a Redis key at “key”, with expiration “expiration”. If a template is not supplied, the plugin will default to the RSYSLOG_ForwardFormat template.
module(load="omhiredis")
action(
name="set_redis"
server="my-redis-server.example.com"
serverport="6379"
type="omhiredis"
mode="set"
key="my_key"
expiration="10")
Results
Here’s an example redis-cli session where we get the key and test the expiration:
> redis-cli
127.0.0.1:6379> get my_key
"<46>2019-12-17T20:16:54.781239+00:00 localhost rsyslogd-pstats: { \"name\": \"main Q\", \"origin\": \"core.queue\",
\"size\": 3, \"enqueued\": 7, \"full\": 0, \"discarded.full\": 0, \"discarded.nf\": 0, \"maxqsize\": 3 }"
127.0.0.1:6379> ttl my_key
(integer) 10
127.0.0.1:6379> ttl my_key
(integer) 3
127.0.0.1:6379> ttl my_key
(integer) -2
127.0.0.1:6379> get my_key
(nil)
Example 6: Set mode with dynamic key
In any mode with “key” defined and “dynakey” as “on”, the key used during operation will be dynamically generated by Rsyslog using templating.
module(load="omhiredis")
template(name="example-template" type="string" string="%hostname%")
action(
name="set_redis"
server="my-redis-server.example.com"
serverport="6379"
type="omhiredis"
mode="set"
key="example-template"
dynakey="on")
Results
Here’s an example redis-cli session where we get the dynamic key:
> redis-cli
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> keys *
1) "localhost"
Example 7: “Simple” stream mode
module(load="omhiredis")
template(name="example-template" type="string" string="%hostname%")
action(
type="omhiredis"
server="my-redis-server.example.com"
serverport="6379"
template="example-template"
mode="stream"
key="stream_output"
stream.outField="data")
Results
Here’s an example redis-cli session where we get the newly inserted stream index:
> redis-cli
127.0.0.1:6379> XLEN stream_output
1
127.0.0.1:6379> xread STREAMS stream_output 0
1) 1) "stream_output"
2) 1) 1) "1684507855284-0"
2) 1) "data"
2) "localhost"
Example 8: Get from a stream with imhiredis, then insert in another one with omhiredis
module(load="imhiredis")
module(load="omhiredis")
template(name="redisJsonMessage" type="list") {
property(name="$!output")
}
template(name="indexTemplate" type="list") {
property(name="$.redis!index")
}
# Not used in this example, but can be used to replace the static declarations in omhiredis' configuration below
template(name="groupTemplate" type="list") {
property(name="$.redis!group")
}
template(name="keyTemplate" type="list") {
property(name="$.redis!stream")
}
input(type="imhiredis"
server="127.0.0.1"
port="6379"
mode="stream"
key="stream_input"
stream.consumerGroup="group1"
stream.consumerName="consumer1"
stream.consumerACK="off"
ruleset="receive_redis")
ruleset(name="receive_redis") {
action(type="omhiredis"
server="127.0.0.1"
serverport="6379"
mode="stream"
key="stream_output"
stream.ack="on"
# The key and group values are set statically, but the index value is taken from imhiredis metadata
stream.dynaKeyAck="off"
stream.keyAck="stream_input"
stream.dynaGroupAck="off"
stream.groupAck="group1"
stream.indexAck="indexTemplate"
stream.dynaIndexAck="on"
template="redisJsonMessage"
)
}
Results
Here’s an example redis-cli session where we get the pending entries at the end of the log re-insertion:
> redis-cli
127.0.0.1:6379> XINFO GROUPS stream_input
1) 1) "name"
1) "group1"
2) "consumers"
3) (integer) 1
4) "pending"
5) (integer) 0
6) "last-delivered-id"
7) "1684509391900-0"
8) "entries-read"
9) (integer) 1
10) "lag"
11) (integer) 0
Example 9: Ensuring streams don’t grow indefinitely
stream.del to delete processed entries (can also be used as a complement of ACK’ing)
stream.capacityLimit that allows to ensure a hard-limit of logs can be inserted before dropping entries
module(load="imhiredis")
module(load="omhiredis")
template(name="redisJsonMessage" type="list") {
property(name="$!output")
}
template(name="indexTemplate" type="list") {
property(name="$.redis!index")
}
template(name="keyTemplate" type="list") {
property(name="$.redis!stream")
}
input(type="imhiredis"
server="127.0.0.1"
port="6379"
mode="stream"
key="stream_input"
ruleset="receive_redis")
ruleset(name="receive_redis") {
action(type="omhiredis"
server="127.0.0.1"
serverport="6379"
mode="stream"
key="stream_output"
stream.capacityLimit="1000000"
stream.del="on"
stream.dynaKeyAck="on"
stream.keyAck="keyTemplate"
stream.dynaIndexAck="on"
stream.indexAck="indexTemplate"
template="redisJsonMessage"
)
}
Results
Here, the result of this configuration is:
entries are deleted from the source stream stream_input after being inserted by omhiredis to stream_output
stream_output won’t hold more than (approximately) a million entries at a time
Warning
The stream.capacityLimit is an approximate maximum! see redis documentation on MAXLEN and the ‘~’ option to understand how it works!
See also
Help with configuring/using Rsyslog
:
Mailing list - best route for general questions
GitHub: rsyslog source project - detailed questions, reporting issues that are believed to be bugs with
Rsyslog
See also
Contributing to Rsyslog
:
Source project: rsyslog project README.
Documentation: rsyslog-doc project README
Copyright 2008-2023 Rainer Gerhards (Großrinderfeld), and Others.