One of the major new APIs in TIBCO Spotfire 3.1 is data functions. The data functions API is used to connect to TIBCO Spotfire Statistics Services but it can be used for much more than that. Operations that was previously performed using the data source, transformations and calculations API can now also be performed in the new API. The advantage of the data functions API is that it automatically takes care of executing operations on background threads to easily take advantage of multi-core CPUs.
Conceptually the data functions API is split into three different parts:
- The DataFunctionDefinition which describes the metadata about what a data function expects for input and the output provided by the function. These definitions can be stored and retrieved from the library and are not connected directly to any particular document.
- The DataFunction which includes a function definition and also specifies how input and outputs should be retrieved from the document.
- The DataFunctionExecutor which receives input from the document combined with the function definition and is responsible for calculating the output.
Have a look at the DataFunctionsExample project in the SDK for code examples on how to use the API.
Compared to the threading framework
This component uses the threading framework but does not replace it. However for many operations such as calculations and data modification operations that one wants to execute in a background thread this is the preferred way to do it. Calculations written using data functions will automatically be executed on a background thread while still you only need to write simple sequential code.
Compared to calculations
In many cases it is preferable to use data functions rather than the calculations framework since you will get automatic background execution and the data functions API should be easier to use. However the calculations are more general and are needed for some scenarios.
Compared to transformations
Data functions are quite similar to transformations since they both takes have an API that is based on the DataRowReader enumerators, however data functions are more general since it can take multiple row readers as input and return multiple row readers as output. Simple data functions can automatically be used as transformations by using the DataFunctionTransformation. It should also be easy to expose an existing transformation by wrapping it in a data function executor.
Compared to data sources
Since data sources returns data row readers and an ImportContext is available to the executor in the DataFunctionInvocation object it is be possible to wrap a data source inside a data function executor.
DataFunctionDefinition
The data function definition object and child objects is immutable and a build class DataFunctionDefinitionBuilder is used to construct instances. To support the use-case where you want to edit a definition it is possible to create a build from a definition in which case the builder will be populated with the previous definition input.
Each function definition is connected to a TypeIdentifier which specifies how the function should be executed, the pre-defined type identifiers can be found in the DataFunctionExecutorTypeIdentifiers static class.
The design of the function definition is based on the assumption that these definitions should be applicable to large range of possible implementations such as web services, internal calculation functions or an external executable. In order to support this, a dictionary from string to string is included to allow custom information to be stored inside the definition. For S+ and R scripts the script is added to the settings with the key “script”.
Here is an example that shows how to create a function definition for a simple script that performs loess smoothing:
DataFunctionDefinitionBuilder functionDefinitionBuilder =
new DataFunctionDefinitionBuilder(
"Loess",
DataFunctionExecutorTypeIdentifiers.SPlusScriptExecutor);
InputParameterBuilder yBuilder =
new InputParameterBuilder("y", ParameterType.Column);
yBuilder.AddAllowedDataType(DataType.Integer);
yBuilder.AddAllowedDataType(DataType.Real);
InputParameter y = yBuilder.Build();
functionDefinitionBuilder.InputParameters.Add(y);
InputParameterBuilder xBuilder =
new InputParameterBuilder("x", ParameterType.Column);
InputParameter x = xBuilder.Build();
functionDefinitionBuilder.InputParameters.Add(x);
OutputParameterBuilder outputBuilder = new OutputParameterBuilder("smooth", ParameterType.Table);
OutputParameter output = outputBuilder.Build();
functionDefinitionBuilder.OutputParameters.Add(output);
DataTable inputTable = context.Data.DataTableReference;
StringBuilder scriptBuilder = new StringBuilder();
scriptBuilder.AppendLine("predicted <- predict(loess(y ~ x, data.frame(x=x, y=y)), data.frame(x=x))");
scriptBuilder.AppendLine("smooth <- data.frame(loess.x = x, loess.y = predicted)");
functionDefinitionBuilder.Settings.Add("script", scriptBuilder.ToString());
DataFunctionDefinition functionDefinition = functionDefinitionBuilder.Build();
DataFunction
The purpose of the data function is to allow functions to get input from the document, do some processing on a background thread and then apply the results to the document.
The inputs to a data function are expressions in the expression language, it multiple columns need to be sent then just separate the expression with commas as you would in the custom expressions dialog. It is possible to specify on which subset the expressions should be calculated on using any combination of markings and filtering. The Active Filtering property also allows you to make the data function use the filtering on the current page. If you specify multiple markings and filtering then the intersection will be used.
For the output the following operations are allowed:
- Adding a new table.
- Adding columns to a table, both as a join and directly added to the table.
- Adding rows to a table.
- Replacing all the data in a table.
- Setting a document property.
- Setting a column property.
- Setting a table property.
The following example shows how to create a data function from the expressions in a scatter plot and also update the scatter plot when the calculation has finished:
Document document = context.Context.GetService<Document>();
DataSelection[] selections = new DataSelection[context.Data.Filterings.Count];
for (int i = 0; i < context.Data.Filterings.Count; ++i)
{
selections[i] = context.Data.Filterings[i];
}
string xExpression = null;
string yExpression = null;
try
{
ColumnExpression xColumnExpression = ColumnExpression.Create(context.XAxis.Expression);
ColumnExpression yColumnExpression = ColumnExpression.Create(context.YAxis.Expression);
if (!xColumnExpression.IsValid && !yColumnExpression.IsValid)
{
return;
}
xColumnExpression.QualifyNames(inputTable.Name);
yColumnExpression.QualifyNames(inputTable.Name);
xExpression = xColumnExpression.Expression;
yExpression = yColumnExpression.Expression;
}
catch (ExpressionException)
{
return;
}
DataFunction function = document.Data.DataFunctions.AddNew(context.Title, functionDefinition);
function.Inputs.SetInput(x, xExpression, true, selections);
function.Inputs.SetInput(y, yExpression, true, selections);
// Add columns to the input table.
ColumnsOutputBuilder addColumnsBuilder =
new ColumnsOutputBuilder(inputTable);
addColumnsBuilder.InputToAdaptSelectionsFrom = x;
function.Outputs.SetColumnsOutput(output, addColumnsBuilder);
ApplicationThread applicationThread = context.Context.GetService<ApplicationThread>();
function.Execute(
delegate
{
applicationThread.InvokeAsynchronously(
delegate
{
context.Transactions.ExecuteTransaction(
delegate
{
DataColumn xColumn = inputTable.Columns["loess.x"];
DataColumn yColumn = inputTable.Columns["loess.y"];
ColumnValuesLine fittingModel =
context.FittingModels.AddColumnValuesLine(
inputTable,
xColumn,
yColumn);
fittingModel.SortColumnReference = xColumn;
fittingModel.Enabled = true;
});
// Start updating automatically
function.UpdateBehavior = DataFunctionUpdateBehavior.Automatic;
});
});
DataFunctionExecutor
The data functions executor is the only extension point in the data functions API and one needs to inherit from the CustomDataFunctionExecutor class. This class has a very simple API with only one method that receives a DataFunctionInvocation object which contains a the named input parameters and the function is expected to set the output parameters with the calculated result.
The following example shows a subset of simple function executor that performs a simple K-means clustering algorithm.
public sealed class KMeansDataFunctionExecutor : CustomDataFunctionExecutor
{
/// <summary>
/// Execute a function invocation. The executor is expected to add the output results to the invocation
/// object.
/// </summary>
/// <param name="invocation">The function invocation.</param>
/// <returns>
/// The prompt models to return, the implementor is expected to use the yield return pattern.
/// </returns>
protected override IEnumerable<object> ExecuteFunctionCore(DataFunctionInvocation invocation)
{
// This function only expects function definitions with two parameters:
// Values -> Table of double values.
// NumberOfClusters -> Integer value.
DataRowReader valuesReader;
if (!invocation.TryGetInput("Values", out valuesReader))
{
throw new InvalidOperationException("Could not find values in input.");
}
int numberOfClusters = GetNumberOfClusters(invocation);
if (numberOfClusters <= 0)
{
throw new InvalidOperationException("Too few number of clusters.");
}
string resultName = GetResultName(invocation);
// Get cursors for the input as double values.
List<DataValueCursor<double>> numericCursors = new List<DataValueCursor<double>>();
foreach (DataRowReaderColumn column in valuesReader.Columns)
{
DataValueCursor<double> numericCursor = column.Cursor as DataValueCursor<double>;
if (numericCursor == null)
{
throw new InvalidOperationException("Non double cursor sent as input.");
}
numericCursors.Add(numericCursor);
}
// An array that contains the values for a row.
double[] row = new double[numericCursors.Count];
// We keep it simple and select the first numberOfClusters rows to be the initial
// center for the clusters.
Cluster[] clusters = new Cluster[numberOfClusters];
valuesReader.Reset();
for (int i = 0; i < numberOfClusters; ++i)
{
if (!valuesReader.MoveNext())
{
throw new InvalidOperationException("Number of clusters is larger than the available rows.");
}
FillRow(numericCursors, row);
// Initialize the row with the current center.
clusters[i].Initialize(row);
}
PageableListSettings settings = new PageableListSettings();
settings.CanReplaceValue = true;
PageableList<int> rowClusterIndex = new PageableList<int>(settings);
// Calculate the initial cluster for each point.
valuesReader.Reset();
while (valuesReader.MoveNext())
{
FillRow(numericCursors, row);
// Calculate the distance from all clusters and find
// the nearest cluster.
int nearestCluster = FindNearestCluster(row, clusters);
// Store the resulting index and add the row as a member of the nearest cluster.
rowClusterIndex.Add(nearestCluster);
clusters[nearestCluster].AddMember(row);
}
// Iterate until the maximum number of iterations is each or if no change
// has been performed.
int maxIterations = 100;
int iterationCount = 0;
do
{
// Calculate the centroids for all clusters.
for (int i = 0; i < numberOfClusters; ++i)
{
clusters[i].CalculateCentroid();
}
// Calculate the new nearest cluster for each point.
valuesReader.Reset();
bool hasChanged = false;
int rowIndex = 0;
while (valuesReader.MoveNext())
{
FillRow(numericCursors, row);
// Calculate the distance from all clusters and find
// the nearest cluster.
int nearestCluster = FindNearestCluster(row, clusters);
int oldClusterIndex = rowClusterIndex[rowIndex];
// This point has changed cluster.
if (nearestCluster != oldClusterIndex)
{
hasChanged = true;
rowClusterIndex[rowIndex] = nearestCluster;
clusters[oldClusterIndex].RemoveMember(row);
clusters[nearestCluster].AddMember(row);
}
rowIndex++;
}
if (!hasChanged)
{
break;
}
iterationCount++;
}
while (iterationCount < maxIterations);
// Assign the output result.
invocation.SetOutput("K-Means", new PageableListDataRowReader(rowClusterIndex, resultName));
// Done
yield break;
}
/// <summary>
/// Find the cluster that is nearest the row.
/// </summary>
/// <param name="row">The row.</param>
/// <param name="clusters">The clusters.</param>
/// <returns>The index of the nearest cluster.</returns>
private static int FindNearestCluster(double[] row, Cluster[] clusters)
{
double minDistance = double.MaxValue;
int nearestCluster = -1;
for (int i = 0; i < clusters.Length; ++i)
{
double distance = clusters[i].Distance(row);
if (distance < minDistance)
{
minDistance = distance;
nearestCluster = i;
}
}
return nearestCluster;
}
/// <summary>
/// Fill the row array with values for the current row.
/// </summary>
/// <param name="numericCursors">The numeric cursors.</param>
/// <param name="row">The row array to full.</param>
private static void FillRow(List<DataValueCursor<double>> numericCursors, double[] row)
{
for (int j = 0; j < numericCursors.Count; ++j)
{
if (!numericCursors[j].IsCurrentValueValid)
{
throw new InvalidOperationException("Invalid values are not allowed.");
}
row[j] = numericCursors[j].CurrentValue;
}
}
/// <summary>
/// Get result name.
/// </summary>
/// <param name="invocation">The invocation.</param>
/// <returns>Get the result name.</returns>
private static string GetResultName(DataFunctionInvocation invocation)
{
DataRowReader resultNameReader;
if (!invocation.TryGetInput("ResultName", out resultNameReader))
{
throw new InvalidOperationException("Could not find the result name.");
}
if (resultNameReader.Columns.Count != 1)
{
throw new InvalidOperationException("The result name reader contains more than one argument.");
}
DataRowReaderColumn resultNameColumn = resultNameReader.Columns[0];
DataValueCursor<string> resultNameCursor = resultNameColumn.Cursor as DataValueCursor<string>;
if (resultNameColumn == null)
{
throw new InvalidOperationException("The result name input was not an string.");
}
if (!resultNameReader.MoveNext())
{
throw new InvalidOperationException("No rows in result name input.");
}
if (!resultNameCursor.IsCurrentValueValid)
{
throw new InvalidOperationException("The result name input was not valid.");
}
return resultNameCursor.CurrentValue;
}
/// <summary>
/// Get the number of clusters sent in the invocation.
/// </summary>
/// <param name="invocation">The invocation.</param>
/// <returns>The number of clusters.</returns>
private static int GetNumberOfClusters(DataFunctionInvocation invocation)
{
DataRowReader numberOfClustersReader;
if (!invocation.TryGetInput("NumberOfClusters", out numberOfClustersReader))
{
throw new InvalidOperationException("Could not find number of clusters in input.");
}
if (numberOfClustersReader.Columns.Count != 1)
{
throw new InvalidOperationException("The number of clusters reader contains more than one argument.");
}
DataRowReaderColumn numClustersColumn = numberOfClustersReader.Columns[0];
DataValueCursor<int> numClustersCursor = numClustersColumn.Cursor as DataValueCursor<int>;
if (numClustersColumn == null)
{
throw new InvalidOperationException("The number of clusters input was not an integer.");
}
if (!numberOfClustersReader.MoveNext())
{
throw new InvalidOperationException("No rows in number of clusters input.");
}
if (!numClustersCursor.IsCurrentValueValid)
{
throw new InvalidOperationException("The number of clusters input was not valid.");
}
int numberOfClusters = numClustersCursor.CurrentValue;
return numberOfClusters;
}
/// <summary>
/// A cluster.
/// </summary>
private struct Cluster
{
/// <summary>
/// The location of the clusters.
/// </summary>
private double[] location;
/// <summary>
/// The sum of the parts of the points in the clusters.
/// </summary>
private double[] sumOfParts;
/// <summary>
/// The number of observations.
/// </summary>
private int observations;
/// <summary>
/// Initialize the cluster with the specified center.
/// </summary>
/// <param name="center">The row with the initial center.</param>
public void Initialize(double[] center)
{
this.location = new double[center.Length];
this.sumOfParts = new double[center.Length];
this.observations = 0;
for (int i = 0; i < center.Length; ++i)
{
this.location[i] = center[i];
}
}
/// <summary>Add a member to the cluster.
/// </summary>
/// <param name="member">The member to add.</param>
public void AddMember(double[] member)
{
for (int i = 0; i < member.Length; ++i)
{
this.sumOfParts[i] += member[i];
}
this.observations++;
}
/// <summary>Remove a member from the cluster.
/// </summary>
/// <param name="member">The member to remove.</param>
public void RemoveMember(double[] member)
{
for (int i = 0; i < member.Length; ++i)
{
this.sumOfParts[i] -= member[i];
}
this.observations--;
}
/// <summary>
/// Calculate the new centroid.
/// </summary>
public void CalculateCentroid()
{
for (int i = 0; i < this.sumOfParts.Length; ++i)
{
this.location[i] = this.sumOfParts[i] / this.observations;
}
}
/// <summary>
/// Calculates the euclidean distance.
/// </summary>
/// <param name="point">The point to calculate the distance to this cluster.</param>
/// <returns>The distance from the cluster.</returns>
public double Distance(double[] point)
{
double sum = 0;
for (int i = 0; i < point.Length; ++i)
{
double difference = this.location[i] - point[i];
sum += difference * difference;
}
return Math.Sqrt(sum);
}
}
/// <summary>
/// A data row reader which returns a pageable list.
/// </summary>
private sealed class PageableListDataRowReader : CustomDataRowReader
{
/// <summary>
/// The data to return.
/// </summary>
private readonly PageableList<int> data;
/// <summary>
/// The result cursor.
/// </summary>
private readonly MutableValueCursor<int> resultCursor;
/// <summary>
/// The result name.
/// </summary>
private readonly string resultName;
/// <summary>
/// The current row index.
/// </summary>
private int index = -1;
/// <summary>
/// Initializes a new instance of the PagableListDataRowReader class.
/// </summary>
/// <param name="data">The data to return.</param>
/// <param name="resultName">The result name.</param>
public PageableListDataRowReader(PageableList<int> data, string resultName)
{
this.data = data;
this.resultName = resultName;
this.resultCursor = (MutableValueCursor<int>)DataValueCursor.CreateMutableCursor(DataType.Integer);
}
/// <summary>
/// The implementor should provide a list of <see cref="T:Spotfire.Dxp.Data.DataRowReaderColumn"/>s that it
/// returns.
/// </summary>
/// <returns></returns>
/// <remarks>
/// This method is only called once.
/// </remarks>
protected override IEnumerable<DataRowReaderColumn> GetColumnsCore()
{
List<DataRowReaderColumn> columns = new List<DataRowReaderColumn>();
DataRowReaderColumn column =
new DataRowReaderColumn(this.resultName, DataType.Integer, this.resultCursor);
columns.Add(column);
return columns;
}
/// <summary>
/// The implementor should provide the result properties.
/// </summary>
/// <returns></returns>
/// <remarks>
/// This method is only called once.
/// </remarks>
protected override ResultProperties GetResultPropertiesCore()
{
return new ResultProperties();
}
/// <summary>
/// Advance to the next row.
/// The implementor should update all <see cref="T:Spotfire.Dxp.Data.DataValueCursor"/>s in the <see cref="T:Spotfire.Dxp.Data.DataRowReaderColumn"/>s
/// with values for the next row.
/// </summary>
/// <returns>
/// <c>true</c> if there are more rows; otherwise <c>false</c>.
/// </returns>
protected override bool MoveNextCore()
{
this.index++;
if (this.index >= this.data.Count)
{
return false;
}
// Add one to keep index one based.
this.resultCursor.MutableDataValue.ValidValue = this.data[this.index] + 1;
return true;
}
/// <summary>
/// The implementor should implement this method to reset the
/// enumerator. If this method is called then the <see cref="M:Spotfire.Dxp.Data.DataRowReader.MoveNextCore"/>
/// method should return the first row again when called the next time.
/// </summary>
protected override void ResetCore()
{
this.index = -1;
}
}
}