2008年07月30日

社内コードコンペ - お題:最速なCIDRブロックマッチ判定 〜 安井の場合: バイナリサーチのあれとこれ〜

はてなブックマークに登録

おさらい

前回に引き続きコードコンペのお話で、今回は安井の出番です。 前回は導入だったせいもあり、あまり速いコードはできてきませんでしたが、さてさて今回はどうでしょうか。


このコードのウリ

「IPアドレスのマッチング」とは、言い替えれば「32ビット値の検索」です。そこで私の中で真っ先に頭をよぎったのは、バイナリサーチ(二分探索)という手法でした。今回のネタはデータ数が300個弱ということなので、 IP::Country のようにビット単位で二分木検索するよりも、ソート済みのリストからバイナリサーチしたほうが計算量は少なくて済むだろうと考えました。

コード解説

泥臭い処理(ファイルからCIDRブロックを読み込んでソート済みの配列を生成する部分)は末尾に掲載します。 ここでは、ソート済みのCIDRブロックリストから、IPアドレスを検索する関数を紹介します。(とはいってもある種お決まりのコードですけどね(^^;

cide_lookup()

char *cidr_lookup(void *ipaddr, int len)
{
  int half;
  int cmin = 0;
  int cmax = listcount;
  uint32_t addr;

  if(len != 4)
    return(NULL);
  addr = ntohl(*(uint32_t *)ipaddr);

  /* バイナリサーチで範囲を絞りこむ */
  while(cmax - cmin > 7){
    half = (cmin + cmax)>>1;
    if(addr < cidrlist[half]){
      cmax = half;
    }else{
      cmin = half;
    }
  }

  /* 絞りこんだ結果((最大7個)の中からマッチするものを探す */
  while(cmin<(cmax--)){
    if((addr & masklist[cmax]) == cidrlist[cmax]){
      return(typelist[cmax]);
    }
  }
  return(NULL);
}

通常のバイナリサーチの実装では、検索終了条件を以下のようにすると思います。

  • 目的の値が見付かったとき
  • (cmin == cmax)になっても目的の値が見付からなかったとき
cidr_lookup()では、検索要素数が7個未満になった時点でバイナリサーチを中断し、残りの要素をリニアサーチして結果を返すようにしています。それはなぜかというと、

  • 配列に格納されているのはCIDRブロックのネットワークアドレスなので
    • 検索対象のIPアドレスとの同値データは存在しない
    • ネットマスクとのANDをとらなければマッチしているか判定できない
  • ネットワークアドレスが同じでネットマスク長が異なるケースが考えられる
    1. 192.168.0.0/16
    2. 192.168.0.0/17
    3. 192.168.0.0/18
  • このような場合、192.168.0.1は(3)にマッチしなければいけない
という理由があるためです。そこで、リストの絞りこみにバイナリサーチを利用し、絞りこんだ結果からネットワークアドレスが一致する要素を検索するようにしました。今回の例ではCIDRブロックの数が512個未満なので、バイナリサーチのループ回数は以下の通り7回となります。
検索回数
検索要素数
1
511
2
255
3
127
4
63
5
31
6
15
7
7
そして、残った7個のCIDRブロックに対してネットワークアドレスのマッチングをすればよいので、最大でも14回の数値比較で結果を得ることができます。これならば、32ビットの二分木検索(IP::Country)よりも計算量は少なくて済むはずです。

アセンブラでも書いてみた

もう、だいぶ昔の話になりますが、アセンブラ(6502,Z80,68000)で遊んでいた時期がありました。ちょうどそのころ、バイナリサーチ、バブルソート、クイックソートなどの「アルゴリズム」と呼ばれるものにはじめて遭遇し、「これはすごい!」と純粋に感動していたことを覚えています。

その記憶が甦ったのか、なにを血迷ったのかわかりませんが、なぜかふと、「cidr_lookupをアセンブラで書き直せばもっと速くなるんでね?」と思い、インラインアセンブラで書き直してみたのがこちらのコードです。処理の内容は上記のものとまったく同じです。

そして、それぞれでベンチマークをとってみたところ、このような結果になりました。
※gccの最適化オプションは-O3を指定しました。

結果をみてびっくりしました。Cで書いた方が圧倒的に速かったのです。 アセンブラで書いたコードは、そのままでは64bit環境で動かないですし、メンテナンス性もわるいですし、コーディングにも時間がかかります。それでも高速に動作すれば使いどころはあるかなあと思っていましたが、今回は少し残念な結果に終わってしまいました。コンパイラの最適化ってすごいです!さすがです。

おわりに

今回の件で、単純にアセンブラで書き直しただけでは速くならないことを体感しました。しかし、このアセンブラのコードも、まだまだ高速化できる余地がいっぱい残っています。(というか、ツッコミどころ満載かもしれません(^^;;

どれだけコンパイラに近付くことができるかわかりませんが、もっと速くなるようにもう少しいじってみたいと思います。「この辺をこうしたら速くなるぞ」みたいなアドバイスをいただけると、大変ありがたいです。


ベンチマーク

5つのとあるIPアドレスのそれぞれについてどのグループに属するか判定する、という処理を5,000,000回実行したときの総所要時間(elapsed)と1回の判定に要した時間の平均(average)です。

ベンチマークは、同じハードウエアのx86とx86_64の環境の2つでとりました。

x86

name        elapsed[sec]  average[usec]
========================================
apr        59.379924      2.375197
ip-country  3.739187      0.149567
yasui-a     2.727045      0.109082   # インラインアセンブラ
yasui-c     0.975544      0.039022   # C言語

x86_64

name        elapsed[sec]  average[usec]
========================================
apr        52.340651      2.093626
ip-country  0.664034      0.026561
yasui-c     0.706095      0.028244

コード

cide_lookup()のインラインアセンブラバージョン

uint32_t half=0;
uint32_t csub=0;
uint32_t cmin=0;
uint32_t cmax=0;
uint32_t addr=0;
char *cidr_lookup(void *ipaddr, int len)
{
  if(len != 4)
    return(0);
  addr = ntohl(*(uint32_t *)ipaddr);
  asm volatile(
    "lookup_start:"
      "push %eax;"
      "push %ebx;"
      "push %ecx;"
      "push %edx;"
      "xor  %eax, %eax;"
      "movl %eax, cmin;"
      "movl listcount, %eax;"
      "movl %eax, cmax;"
      "movl %eax, csub;"
    "lookup_loop:"
      "shr  %eax;"
      "add  cmin, %eax;"
      "movl %eax, half;"
      "movl csub, %eax;"
      "cmp  $7, %eax;"
      "jb lookup_search;"
      "movl half, %eax;"
      "shl  $2, %eax;"
      "movl cidrlist, %edx;"
      "add  %eax, %edx;"
      "movl addr, %eax;"
      "cmp (%edx), %eax;"
      "jc lookup_low;"
    "lookup_high:"         /* addr > cidr */
      "movl half, %eax;"
      "movl %eax, cmin;"
      "jmp lookup_endif;"
    "lookup_low:"          /* addr < cidr */
      "movl half, %eax;"
      "movl %eax, cmax;"
    "lookup_endif:"
      "movl cmax, %eax;"
      "sub  cmin, %eax;"
      "movl %eax, csub;"
      "jmp lookup_loop;"
    "lookup_search:"
      "movl cmax, %ecx;"
      "sub  cmin, %ecx;"
      "movl cmax, %eax;"
      "dec  %eax;"
      "movl %eax, half;"
      "shl  $2, %eax;"
      "movl masklist, %ebx;"
      "movl cidrlist, %edx;"
      "add  %eax, %ebx;"
      "add  %eax, %edx;"
    "lookup_search_loop:"
      "movl (%ebx), %eax;"
      "and  addr, %eax;"
      "cmp  (%edx), %eax;"
      "je lookup_end;"     /* if(addr == cidr) */
      "sub  $4, %ebx;"
      "sub  $4, %edx;"
      "movl half, %eax;"
      "dec %eax;"
      "mov %eax, half;"
      "loop lookup_search_loop;"
      "movl $0xffffffff, %eax;"
      "movl %eax, half;"
    "lookup_end:"
      "pop %edx;"
      "pop %ecx;"
      "pop %ebx;"
      "pop %eax;"
  );
  if(half == -1)
    return(NULL);
  return(typelist[half]);
}

cidr_initialize()

#include <stdio.h>

#include <stdlib.h>
#include <unistd.h>
#include <dirent.h>
#include <string.h>
#include <fcntl.h>
#include <libgen.h>

#include <sys/stat.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define DBDIR "../data"
uint32_t listsize  = 0;
uint32_t listcount = 0;
uint32_t *cidrlist = NULL;
uint32_t *masklist = NULL;
char    **typelist = NULL;

int getnetmask(int n, uint32_t *netmask)
{
  uint32_t h = 1;
  uint32_t m = 0xffffffff;
  if(n>32)
    return(0);
  n = 32 - n;
  while(n--){
    m -= h;
    h *= 2;
  }
  *netmask = m;
  return(1);
}

int initsort()
{
  int i;
  int j;
  uint32_t t;
  char   *tt;
  for(i=0;i cidrlist[j]){
        t = cidrlist[i];
        cidrlist[i] = cidrlist[j];
        cidrlist[j] = t;
        t = masklist[i];
        masklist[i] = masklist[j];
        masklist[j] = t;
        tt = typelist[i];
        typelist[i] = typelist[j];
        typelist[j] = tt;
      }
    }
  }
  for(i=1;i< masklist[j]){
        t = cidrlist[i];
        cidrlist[i] = cidrlist[j];
        cidrlist[j] = t;
        t = masklist[i];
        masklist[i] = masklist[j];
        masklist[j] = t;
        tt = typelist[i];
        typelist[i] = typelist[j];
        typelist[j] = tt;
      }
    }
  }
  return(0);
}

int initread(int fd, char *type)
{
  int  r;
  int  l;
  char buff[256];
  char *p = buff;
  uint32_t ad=0;
  uint32_t nm=0;
  char ip[64];

  while((r=read(fd,p,1)) != 0){
    if(r == -1)
      return(-1);
    if(*p == '#')
      *p = 0;
    if(*p == '/')
      *p = ' ';
    if(*p == '\r')
      *p = 0;
    if(*p == '\n'){
      *p = 0;
      l = 32;
      if(sscanf(buff,"%s%d", ip, &l) < 1){
        /***** not ip addr ******/
        if(buff[0]){
          fprintf(stderr,"read error: %s(%s/%d)\n",buff,ip,l);
        }
      }else{
        /*----- alloc -----*/
        if(listcount == listsize){
          listsize += 512;
          typelist = (char    **)realloc(typelist, listsize * sizeof(char *));
          cidrlist = (uint32_t *)realloc(cidrlist, listsize * sizeof(uint32_t));
          masklist = (uint32_t *)realloc(masklist, listsize * sizeof(uint32_t));
        }
        if(!inet_aton(ip, (struct in_addr *)&ad)){
          fprintf(stderr,"ip addr error: %s\n", buff);
        }
        if(!getnetmask(l,&nm)){
          fprintf(stderr,"netmask error: %s\n", buff);
        }
        ad  = ntohl(ad);
        ad &= nm;
        typelist[listcount] = type;
        cidrlist[listcount] = ad;
        masklist[listcount] = nm;
        listcount++;
      }
      p = buff;
      continue;
    }
    p++;
  }
  return(0);
}

char *alloctype(char *name)
{
  char *type;
  char *base;
  char path[PATH_MAX];
  strcpy(path, name);
  base=basename(path);
  type=malloc(strlen(base)+1);
  strcpy(type,base);
  return(type);
}

void cidr_initialize()
{
  int  f;
  DIR *d;
  char path[PATH_MAX];
  struct dirent *e;
  struct stat   st;
  if((d=opendir(DBDIR)) != 0){
    while((e=readdir(d)) != 0){
      sprintf(path,"%s/%s", DBDIR, e->d_name);
      if(stat(path, &st) != 0)
        continue;
      if(!S_ISREG(st.st_mode))
        continue;
      f = open(path, O_RDONLY);
      if(f == -1){
        fprintf(stderr,"file open error %s\n", path);
      }else{
        initread(f,alloctype(path));
        close(f);
      }
    }
  }
  initsort();
}
klab_gijutsu2 at 08:00│Comments(0)TrackBack(1)開発 | codecompe

トラックバックURL

この記事へのトラックバック

1.  今日だけ x86 とアセンブラな記録  [ 当面C#と.NETな記録 ]   2008年08月19日 23:40
こんばんは、siokoshou です。忘れられない CPU 命令は eieio です。 最近 x86 アセンブラがマイブームです。 x86 に詳しくなかったんですがインテルのマニュアルと古い本を何冊か読んでちょっとわかってきました。で、ネットをふらふらしてたらおもしろそうなネタが転がっ

この記事にコメントする

名前:
URL:
  情報を記憶: 評価: 顔   
 
 
 
Blog内検索
Archives
このブログについて
DSASとは、KLab が構築し運用しているコンテンツサービス用のLinuxベースのインフラです。現在5ヶ所のデータセンタにて構築し、運用していますが、我々はDSASをより使いやすく、より安全に、そしてより省力で運用できることを目指して、日々改良に勤しんでいます。
このブログでは、そんな DSAS で使っている技術の紹介や、実験してみた結果の報告、トラブルに巻き込まれた時の経験談など、広く深く、色々な話題を織りまぜて紹介していきたいと思います。
最新コメント