Protobuf aggregator¶
Metadata¶
- name:
saturnin.proto.aggregator
- description:
Protobuf data aggregator microservice
- classification:
proto/aggregator
- OID:
1.3.6.1.4.1.53446.1.1.0.3.3.3
- OID name:
iso.org.dod.internet.private.enterprise.firebird.butler.platform.saturnin.micro.proto.aggregator
- UUID:
a676dc59-7eba-5f37-82a2-73115e933a42
- facilities:
None
- API:
None
Usage¶
This microservice is a DATA FILTER that converts protobuf messages to text:
INPUT: protobuf messages
PROCESSING: uses expressions / functions evaluating data from protobuf message to aggregate data for output
OUTPUT: protobuf messages
Supported aggregation functions: count
, min
, max
, sum
, avg
The aggregation is cotroled by two configuration parameters: group_by
and aggreagte
.
Both parameters are of type list of strings. First, the group_by
parameter: each string
consists of the output item name followed by a colon and the specification of the input
protobuf item to be used, in the format “data.<item name>”. Similarly, for the aggregate
parameter, the string consists of the name of the aggregation function followed by a colon,
and then the specification of the input data item to be passed to the aggregation function,
in the same format as for the group_by
parameter.
The output of the filter are messages of type GenericDataRecord.
Important
Data is sent only after the input is closed!
Configuration¶
- agent:
UUID
: Agent identification (service UUID)- logging_id:
str
: Logging ID for this component instance, see Context-based logging for details.- propagate_input_error:
bool
: When input pipe is closed with error, close output with error as well. DEFAULTTrue
.- input_pipe:
str
: Input Data Pipe Identification. REQUIRED option.- input_pipe_address:
ZMQAddress
: Input Data Pipe endpoint address. REQUIRED option.- input_pipe_mode:
SocketMode
: Input Data Pipe Mode (bind/connect). REQUIRED option.- input_pipe_format:
MIME
: Input Pipe data format specification. REQUIRED for CONNECT pipe mode.- input_batch_size:
int
: Input Pipe Data batch size. DEFAULT 50.- input_ready_schedule_interval:
int
: Input Pipe READY message schedule interval in milliseconds. See FBDP documentation for details. DEFAULT 1000.- output_pipe:
str
: Output Data Pipe Identification. REQUIRED option.- output_pipe_address:
ZMQAddress
: Output Data Pipe endpoint address. REQUIRED option.- output_pipe_mode:
SocketMode
: Output Data Pipe Mode (bind/connect). REQUIRED option.- output_pipe_format:
MIME
: Output Pipe data format specification. DEFAULTtext/plain;charset=utf-8
- output_batch_size:
int
: Output Pipe Data batch size. DEFAULT 50.- output_ready_schedule_interval:
int
: Output Pipe READY message schedule interval in milliseconds. See FBDP documentation for details. DEFAULT 1000.- group_by:
List[str]
: Specification of fields that are ‘group by’ key. REQUIRED option.- aggregate:
List[str]
: Specification for aggregates. REQUIRED option.- func:
PyCallable
: Function that returns text representation of data. Python function with signature:def f(data: Any, utils: TransformationUtilities) -> str
Important
The MIME type for ‘input_format’ must be ‘application/x.fb.proto’
The MIME type for ‘output_format’ must be ‘application/x.fb.proto;type=saturnin.core.protobuf.common.GenDataRecord’
The ‘aggregate’ values must have format ‘<aggregate_func>:<field_spec>’, and <aggregate_func> must be from supported functions
Example:
[log-filter]
agent = a676dc59-7eba-5f37-82a2-73115e933a42
input_pipe = pipe-1
input_pipe_address = inproc://${input_pipe}
input_pipe_mode = connect
input_pipe_format = application/x.fb.proto;type=saturnin.core.protobuf.fblog.LogEntry
output_pipe = pipe-2
output_pipe_address = inproc://${output_pipe}
output_pipe_mode = bind
output_pipe_format = application/x.fb.proto;type=saturnin.core.protobuf.GenericDataRecord
group_by =
code:data.code
message:data.message
; The data spec. for "count" function is optional
aggregate =
count:
avg:len(data.params)