Aggregate UDFs in infinidb

3 posts / 0 new
Last post
psillymathhead
psillymathhead's picture
Offline
Last seen: 3 months 4 weeks ago
Joined: Jan 31 2013
Junior Boarder

Posts: 6

Garrett Wright
Aggregate UDFs in infinidb

Hello all,

I'm trying to improve an application I am trialing with infinidb.  I am excited about the ability to use UDFs in infinidb and have been looking at the UDF sdk.  The example I see in git, although very helpful, looks to be a non-aggregate function.  My confusion is how to impliment the xxx_add, xxx_reset, etc functions typical of a regular mysql aggregate UDF so that they perform in a distributed manner with infinidb via group by. I was hoping maybe someone in the community has an IDB aggregate udf example I could use as a template or could perhaps help me with the example below?  

I would like to compute something akin to a shifted median. I understand median is availible as a window function in IDB, but it would be most helpful for me to start with median as an aggregate UDF and alter it to my specific computation.  Attached is an example mysql udf_median() I grabbed from the web.  If someone could point me towards adjusting it to work in idb, or provide another exemplorary agg-udf (even just stubs of the nessecary functions like xxx_add,xxx_reset...) I would be glad to try and take it further and post any successes I have.

Thanks!
 

<!--break-->

///////////////////////////////////////
 

/*
  returns the median of the values in a distribution 
 
  input parameters:
  data (real)
  number of decimals in result (int, optional)
 
  output:
  median value of the distribution (real)
 
  registering the function:
  CREATE AGGREGATE FUNCTION median RETURNS REAL SONAME 'udf_median.so';
 
  getting rid of the function:
  DROP FUNCTION median;
 
*/
 
 
#ifdef STANDARD
#include <stdio.h>
#include <string.h>
#ifdef __WIN__
typedef unsigned __int64 ulonglong;
typedef __int64 longlong;
#else
typedef unsigned long long ulonglong;
typedef long long longlong;
#endif /*__WIN__*/
#else
#include <my_global.h>
#include <my_sys.h>
#endif
#include <mysql.h>
#include <m_ctype.h>
#include <m_string.h>
 
#ifdef HAVE_DLOPEN
 
 
#define BUFFERSIZE 1024
 
 
 
extern "C" {
my_bool median_init( UDF_INIT* initid, UDF_ARGS* args, char* message );
void median_deinit( UDF_INIT* initid );
void median_reset( UDF_INIT* initid, UDF_ARGS* args, char* is_null, char *error );
void median_add( UDF_INIT* initid, UDF_ARGS* args, char* is_null, char *error );
double median( UDF_INIT* initid, UDF_ARGS* args, char* is_null, char *error );
}
 
 
struct median_data
{
  unsigned long count;
  unsigned long abscount;
  unsigned long pages;
  double *values;
};
 
 
my_bool median_init( UDF_INIT* initid, UDF_ARGS* args, char* message )
{
  if (args->arg_count < 1 || args->arg_count>2)
  {
    strcpy(message,"wrong number of arguments: median() requires one or two arguments");
    return 1;
  }
 
  if (args->arg_type[0]!=REAL_RESULT)
  {
    strcpy(message,"median() requires a real as parameter 1");
    return 1;
  }
 
  if (args->arg_count>1 && (args->arg_type[1]!=INT_RESULT))
  {
    strcpy(message,"median() requires an int as parameter 2");
    return 1;
  }
 
  initid->decimals=2;
  if (args->arg_count==2 && (*((ulong*)args->args[1])<=16))
  {
    initid->decimals=*((ulong*)args->args[1]);
  }
 
  median_data *buffer = new median_data;
  buffer->count = 0;
  buffer->abscount=0;
  buffer->pages = 1;
  buffer->values = NULL;
 
  initid->maybe_null = 1;
  initid->max_length = 32;
  initid->ptr = (char*)buffer;
 
  return 0;
}
 
 
void median_deinit( UDF_INIT* initid )
{
  median_data *buffer = (median_data*)initid->ptr;
 
  if (buffer->values != NULL)
  {
    free(buffer->values);
    buffer->values=NULL;
  }
  delete initid->ptr;
}
 
 
 
void median_reset( UDF_INIT* initid, UDF_ARGS* args, char* is_null, char* is_error )
{
  median_data *buffer = (median_data*)initid->ptr;
  buffer->count = 0;
  buffer->abscount=0;
  buffer->pages = 1;
  *is_null = 0;
  *is_error = 0;
 
  if (buffer->values != NULL)
  {
    free(buffer->values);
    buffer->values=NULL;
  }
 
  buffer->values=(double *) malloc(BUFFERSIZE*sizeof(double));
 
  median_add( initid, args, is_null, is_error );
}
 
 
void median_add( UDF_INIT* initid, UDF_ARGS* args, char* is_null, char* is_error )
{
  if (args->args[0]!=NULL)
  {
    median_data *buffer = (median_data*)initid->ptr;
    if (buffer->count>=BUFFERSIZE)
    {
      buffer->pages++;
      buffer->count=0;
      buffer->values=(double *) realloc(buffer->values,BUFFERSIZE*buffer->pages*sizeof(double));
    }
    buffer->values[buffer->abscount++] = *((double*)args->args[0]);
    buffer->count++;
  }
}
 
 
 
int
compare_doubles (const void *a, const void *b)
{
  const double *da = (const double *) a;
  const double *db = (const double *) b;
 
  return (*da > *db) - (*da < *db);
}
 
 
double median( UDF_INIT* initid, UDF_ARGS* args, char* is_null, char* is_error )
{
  median_data* buffer = (median_data*)initid->ptr;
 
  if (buffer->abscount==0 || *is_error!=0)
  {
    *is_null = 1;
    return 0.0;
  }
 
  *is_null=0;
  if (buffer->abscount==1)
  {
    return buffer->values[0];
  }
 
  qsort(buffer->values,buffer->abscount,sizeof(double),compare_doubles);
 
  if (buffer->abscount&1==1) 
  {
    return buffer->values[(buffer->abscount-1)/2];
  } else
  {
    return (buffer->values[(buffer->abscount-2)/2]+buffer->values[buffer->abscount/2])/2.0;
  }
}
 
#endif
 

 

joegh
joegh's picture
Offline
Last seen: 1 month 1 week ago
Joined: Jan 17 2014
Junior Boarder

Posts: 10

Joe Wu
Aggregate UDFs in infinidb

Hello,

 

I have the some question with psillymathhead.

Does InfiniDB support aggregate UDF? If support, can anybody share some samples or function template, Thanks.

djoshi
djoshi's picture
Offline
Last seen: 1 month 4 days ago
Joined: Jun 6 2012
Administrator

Posts: 28

Dipti Joshi
Aggregate UDFs in infinidb

While current 4.5 release does not have Aggregate UDF support, Aggregate UDFs are on our roadmap. If you need it immediately, you can use our parallel data access using LOCAL PM Query feature  to externally perform distributed aggregate computation.

 

Thanks,

Dipti Joshi