Custom Tag History Aggregates
Python Aggregation Functions
The Tag History system has many built-in aggregate function, such as Average, Sum, and Count. However a custom aggregate may be defined via Python scripting. These functions are used for calculations across timeframes, and process multiple values in a “window” into a single result value.
For example, if a query defines a single row result, but covers an hour of time (either by requesting a single row, or using the Tag Calculations feature), the system must decide how to combine the values. There are many built in functions, such as Average, Sum, Count, etc. Using a custom python aggregate, however, allows you to extend these functions and perform any type of calculation.
When calling a custom tag history aggregate, the returnSize argument must be set to natural ( returnSize = 0). If the returnSize is set to -1, or left with it's default value, the the custom aggregate will be ignored.
Description
As values come in, they will be delivered to this function. The interpolator will create and deliver values.
For each window (or data block), the function will get a fresh copy of blockContext. The block context is a dictionary that can be used to as a memory space. The function should not use global variables. If values must be persisted across blocks, they can be stored in the queryContext, which is also a dictionary.
The function can choose what data to include, such as allowing interpolation or not, and allowing bad quality or not.
The window will receive the following values, many of which are generally interpolated (unless a raw value happens to fall exactly at the time):
- The start of the window
- 1 ms before each raw value (due to the difference between discrete and analog interpolation. A value equal to the previous raw value indicates discrete interpolation)
- The raw value
- The end of the window.
At the end of the window, the function will be called with “finished=true”. The function should return the calculated value(s). The resulting value will have a timestamp that corresponds to the beginning of the block timeframe.
Parameters
- qval - The incoming QualifiedValue. This has:
- value : Object
- quality : Quality (which has ‘name’, ‘isGood())
- timestamp : Date
- interpolated - Boolean indicating if the value is interpolated (true) or raw (false)
- finished - Boolean indicating that the window is finished. If true, the return of this particular call is what will be used for the results. If false, the return will be ignored.
- blockContext - A dictionary created fresh for this particular window. The function may use this as temporary storage for calculations. This object also has:
- blockId - Integer roughly indicating the row id (doesn’t take into account aggregates that return multiple rows)
- blockStart - Long UTC time of the start of the window
- blockEnd - Long UTC time of the end of the window
- previousRawValue - QualifiedValue, the previous non-interpolated value received before this window
- previousBlockResults - QualifiedValue[], the results of the previous window.
- insideBlock(long) - Returns boolean indicating if the time is covered by this window.
- get(key, default) - A helper function that conforms to python’s dictionary “get with default return”.
- queryContext - A dictionary that is shared by all windows in a query. It also has:
- queryId - String, an id that can be used to identify this query in logging
- blockSize - Long, time in ms covered by each window
- queryStart - Long, the start time of the query
- queryEnd - Long, the end time of the query
- logTrace(), logDebug(), logInfo() - all take (formatString, Object... args)
Return Value
- Object - Turned into Good Quality qualified value
- List - Used to return up to 2 values per window
- Tuple - (value, quality_int)
- List of quality tuples
Usage
Custom python aggregates can be used in two ways:
- Defined as a shared script, where the full path to the function is passed to the query.
- Defined as a string, prefaced with “python:”, and passed to the query.
Currently both options are only available through the system.tag.queryTagHistory/queryTagCalculations functions.
Both of these options are used with the “aggregationMode” and “aggregationModes” parameters to system.tag.queryTagHistory, and the “calculations” parameter of system.tag.queryTagCalculations. If the value is not an Enum value from the defined AggregationModes, it will be assumed to be a custom aggregate. The system will first see if it’s the path to a shared script, and if not, will then try to compile it as a full function.
For performance reasons, it is generally recommended to use the shared script whenever possible.
Examples
Using a Shared Script
This example assumes a Shared Scripts named "aggregates" contained the function listed below.
# this is a simple count function, called for each value in a time window
def myCount(qval, interpolated, finished, blockContext, queryContext):
cnt = blockContext.getOrDefault('cnt',0)
if qval.quality.isGood():
blockContext['cnt']=int(cnt)+1
if finished:
return blockContext.getOrDefault('cnt', 0)
The custom function could be used like in the example below:
#Return tag history using a custom aggregate function you wrote.
system.tag.queryTagHistory(paths=[‘MyTag’], rangeHours=1, aggregationModes=['shared.aggregates.myCount'], returnSize = 0)
Creating an Aggregate Function on the Fly
#Create a function on the fly to pass in as a custom aggregate.
wrapper = """\
python:def wrapper(qval, interpolated, finished, blockContext, queryContext):
return shared.aggregates.customFunction(qval, interpolated, finished, blockContext, queryContext)
"""
system.tag.queryTagHistory(paths=[‘MyTag’], rangeHours=1, aggregationModes=[wrapper], returnSize = 0)