mtrie.cpp 12.7 KB
Newer Older
1
/*
2
    Copyright (c) 2011 250bpm s.r.o.
3
    Copyright (c) 2011-2012 Spotify AB
4
    Copyright (c) 2011 Other contributors as noted in the AUTHORS file
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include <stdlib.h>

#include <new>
#include <algorithm>

#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#endif

#include "err.hpp"
#include "pipe.hpp"
#include "mtrie.hpp"

zmq::mtrie_t::mtrie_t () :
37
    pipes (0),
38
    min (0),
39 40
    count (0),
    live_nodes (0)
41 42 43 44 45
{
}

zmq::mtrie_t::~mtrie_t ()
{
46 47 48 49 50
    if (pipes) {
        delete pipes;
        pipes = 0;
    }

51 52
    if (count == 1) {
        zmq_assert (next.node);
53
        delete next.node;
54 55
        next.node = 0;
    }
56 57
    else 
    if (count > 1) {
58 59 60 61 62 63 64 65
        for (unsigned short i = 0; i != count; ++i)
            if (next.table [i])
                delete next.table [i];
        free (next.table);
    }
}

bool zmq::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_)
66 67 68 69 70 71
{
    return add_helper (prefix_, size_, pipe_);
}

bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
    pipe_t *pipe_)
72 73 74
{
    //  We are at the node corresponding to the prefix. We are done.
    if (!size_) {
75
        bool result = !pipes;
76 77 78 79
        if (!pipes) {
            pipes = new (std::nothrow) pipes_t;
            alloc_assert (pipes);
        }
80
        pipes->insert (pipe_);
81 82 83 84 85 86 87 88 89 90 91 92 93
        return result;
    }

    unsigned char c = *prefix_;
    if (c < min || c >= min + count) {

        //  The character is out of range of currently handled
        //  charcters. We have to extend the table.
        if (!count) {
            min = c;
            count = 1;
            next.node = NULL;
        }
94 95
        else 
        if (count == 1) {
96 97 98 99 100
            unsigned char oldc = min;
            mtrie_t *oldp = next.node;
            count = (min < c ? c - min : min - c) + 1;
            next.table = (mtrie_t**)
                malloc (sizeof (mtrie_t*) * count);
101
            alloc_assert (next.table);
102 103 104 105 106
            for (unsigned short i = 0; i != count; ++i)
                next.table [i] = 0;
            min = std::min (min, c);
            next.table [oldc - min] = oldp;
        }
107 108
        else 
        if (min < c) {
109 110 111
            //  The new character is above the current character range.
            unsigned short old_count = count;
            count = c - min + 1;
112
            next.table = (mtrie_t**) realloc (next.table,
113
                sizeof (mtrie_t*) * count);
114
            alloc_assert (next.table);
115 116 117 118 119 120 121
            for (unsigned short i = old_count; i != count; i++)
                next.table [i] = NULL;
        }
        else {
            //  The new character is below the current character range.
            unsigned short old_count = count;
            count = (min + old_count) - c;
122
            next.table = (mtrie_t**) realloc (next.table,
123
                sizeof (mtrie_t*) * count);
124
            alloc_assert (next.table);
125 126 127 128 129 130 131 132 133 134 135 136
            memmove (next.table + min - c, next.table,
                old_count * sizeof (mtrie_t*));
            for (unsigned short i = 0; i != min - c; i++)
                next.table [i] = NULL;
            min = c;
        }
    }

    //  If next node does not exist, create one.
    if (count == 1) {
        if (!next.node) {
            next.node = new (std::nothrow) mtrie_t;
137
            alloc_assert (next.node);
138
            ++live_nodes;
139
        }
140
        return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_);
141 142 143 144
    }
    else {
        if (!next.table [c - min]) {
            next.table [c - min] = new (std::nothrow) mtrie_t;
145
            alloc_assert (next.table [c - min]);
146
            ++live_nodes;
147
        }
148
        return next.table [c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_);
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
    }
}


void zmq::mtrie_t::rm (pipe_t *pipe_,
    void (*func_) (unsigned char *data_, size_t size_, void *arg_),
    void *arg_)
{
    unsigned char *buff = NULL;
    rm_helper (pipe_, &buff, 0, 0, func_, arg_);
    free (buff);
}

void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
    size_t buffsize_, size_t maxbuffsize_,
    void (*func_) (unsigned char *data_, size_t size_, void *arg_),
    void *arg_)
{
    //  Remove the subscription from this node.
168
    if (pipes && pipes->erase (pipe_) && pipes->empty ()) {
169
        func_ (*buff_, buffsize_, arg_);
170 171 172
        delete pipes;
        pipes = 0;
    }
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190

    //  Adjust the buffer.
    if (buffsize_ >= maxbuffsize_) {
        maxbuffsize_ = buffsize_ + 256;
        *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
        alloc_assert (*buff_);
    }

    //  If there are no subnodes in the trie, return.
    if (count == 0)
        return;

    //  If there's one subnode (optimisation).
    if (count == 1) {
        (*buff_) [buffsize_] = min;
        buffsize_++;
        next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_,
            func_, arg_);
191 192

        //  Prune the node if it was made redundant by the removal
193 194 195
        if (next.node->is_redundant ()) {
            delete next.node;
            next.node = 0;
196
            count = 0;
197
            --live_nodes;
198
            zmq_assert (live_nodes == 0);
199
        }
200 201 202 203
        return;
    }

    //  If there are multiple subnodes.
204 205
    //
    //  New min non-null character in the node table after the removal
206
    unsigned char new_min = min + count - 1;
207
    //  New max non-null character in the node table after the removal
208
    unsigned char new_max = min;
209
    for (unsigned short c = 0; c != count; c++) {
210
        (*buff_) [buffsize_] = min + c;
211
        if (next.table [c]) {
212 213
            next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1,
                maxbuffsize_, func_, arg_);
214

215
            //  Prune redundant nodes from the mtrie
216 217 218
            if (next.table [c]->is_redundant ()) {
                delete next.table [c];
                next.table [c] = 0;
219 220

                zmq_assert (live_nodes > 0);
221 222
                --live_nodes;
            }
223
            else {
224 225 226 227 228 229 230 231
                //  The node is not redundant, so it's a candidate for being
                //  the new min/max node.
                //
                //  We loop through the node array from left to right, so the
                //  first non-null, non-redundant node encountered is the new
                //  minimum index. Conversely, the last non-redundant, non-null
                //  node encountered is the new maximum index.
                if (c + min < new_min)
232
                    new_min = c + min;
233
                if (c + min > new_max)
234 235
                    new_max = c + min;
            }
236 237
        }
    }
238 239 240

    zmq_assert (count > 1);

241 242 243 244 245 246
    //  Free the node table if it's no longer used.
    if (live_nodes == 0) {
        free (next.table);
        next.table = NULL;
        count = 0;
    }
247
    //  Compact the node table if possible
248 249
    else 
    if (live_nodes == 1) {
250 251 252 253 254 255 256 257 258 259 260 261
        //  If there's only one live node in the table we can
        //  switch to using the more compact single-node
        //  representation
        zmq_assert (new_min == new_max);
        zmq_assert (new_min >= min && new_min < min + count);
        mtrie_t *node = next.table [new_min - min];
        zmq_assert (node);
        free (next.table);
        next.node = node;
        count = 1;
        min = new_min;
    }
262 263
    else
    if (new_min > min || new_max < min + count - 1) {
264 265 266 267 268 269 270 271 272 273
        zmq_assert (new_max - new_min + 1 > 1);

        mtrie_t **old_table = next.table;
        zmq_assert (new_min > min || new_max < min + count - 1);
        zmq_assert (new_min >= min);
        zmq_assert (new_max <= min + count - 1);
        zmq_assert (new_max - new_min + 1 < count);

        count = new_max - new_min + 1;
        next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
274
        alloc_assert (next.table);
275 276 277 278 279 280 281

        memmove (next.table, old_table + (new_min - min),
                 sizeof (mtrie_t*) * count);
        free (old_table);

        min = new_min;
    }
282 283 284 285
}

bool zmq::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_)
{
286 287
    return rm_helper (prefix_, size_, pipe_);
}
288

289 290 291 292
bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,
    pipe_t *pipe_)
{
    if (!size_) {
293 294 295 296 297 298 299 300 301
        if (pipes) {
            pipes_t::size_type erased = pipes->erase (pipe_);
            zmq_assert (erased == 1);
            if (pipes->empty ()) {
                delete pipes;
                pipes = 0;
            }
        }
        return !pipes;
302 303 304 305 306
    }

    unsigned char c = *prefix_;
    if (!count || c < min || c >= min + count)
        return false;
307

308 309
    mtrie_t *next_node =
        count == 1 ? next.node : next.table [c - min];
310

311 312
    if (!next_node)
        return false;
313

314 315 316 317
    bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_);

    if (next_node->is_redundant ()) {
        delete next_node;
318 319
        zmq_assert (count > 0);

320
        if (count == 1) {
321
            next.node = 0;
322
            count = 0;
323 324
            --live_nodes;
            zmq_assert (live_nodes == 0);
325
        }
326
        else {
327
            next.table [c - min] = 0;
328 329 330 331 332 333 334 335
            zmq_assert (live_nodes > 1);
            --live_nodes;

            //  Compact the table if possible
            if (live_nodes == 1) {
                //  If there's only one live node in the table we can
                //  switch to using the more compact single-node
                //  representation
336 337 338
                unsigned short i;
                for (i = 0; i < count; ++i)
                    if (next.table [i])
339 340
                        break;

341 342
                zmq_assert (i < count);
                min += i;
343
                count = 1;
344 345 346
                mtrie_t *oldp = next.table [i];
                free (next.table);
                next.node = oldp;
347
            }
348 349
            else
            if (c == min) {
350
                //  We can compact the table "from the left"
351 352 353
                unsigned short i;
                for (i = 1; i < count; ++i)
                    if (next.table [i])
354 355
                        break;

356 357 358
                zmq_assert (i < count);
                min += i;
                count -= i;
359 360
                mtrie_t **old_table = next.table;
                next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
361
                alloc_assert (next.table);
362
                memmove (next.table, old_table + i, sizeof (mtrie_t*) * count);
363 364
                free (old_table);
            }
365 366
            else
            if (c == min + count - 1) {
367
                //  We can compact the table "from the right"
368 369 370
                unsigned short i;
                for (i = 1; i < count; ++i)
                    if (next.table [count - 1 - i])
371 372
                        break;

373 374
                zmq_assert (i < count);
                count -= i;
375 376
                mtrie_t **old_table = next.table;
                next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
377
                alloc_assert (next.table);
378 379 380 381
                memmove (next.table, old_table, sizeof (mtrie_t*) * count);
                free (old_table);
            }
        }
382 383 384
    }

    return ret;
385 386
}

387 388
void zmq::mtrie_t::match (unsigned char *data_, size_t size_,
    void (*func_) (pipe_t *pipe_, void *arg_), void *arg_)
389
{
390
    mtrie_t *current = this;
391
    while (true) {
392 393

        //  Signal the pipes attached to this node.
394 395 396 397 398
        if (current->pipes) {
            for (pipes_t::iterator it = current->pipes->begin ();
                  it != current->pipes->end (); ++it)
                func_ (*it, arg_);
        }
399

400 401 402 403
        //  If we are at the end of the message, there's nothing more to match.
        if (!size_)
            break;

404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
        //  If there are no subnodes in the trie, return.
        if (current->count == 0)
            break;

        //  If there's one subnode (optimisation).
		if (current->count == 1) {
            if (data_ [0] != current->min)
                break;
            current = current->next.node;
            data_++;
            size_--;
		    continue;
		}

		//  If there are multiple subnodes.
419 420
        if (data_ [0] < current->min || data_ [0] >=
              current->min + current->count)
421
            break;
422
        if (!current->next.table [data_ [0] - current->min])
423
            break;
424
        current = current->next.table [data_ [0] - current->min];
425 426
        data_++;
        size_--;
427 428 429
    }
}

430
bool zmq::mtrie_t::is_redundant () const
431
{
432
    return !pipes && live_nodes == 0;
433
}