mtrie.cpp 13.3 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31 32 33 34 35 36 37
#include <stdlib.h>

#include <new>
#include <algorithm>

#include "err.hpp"
#include "pipe.hpp"
38
#include "macros.hpp"
39 40 41
#include "mtrie.hpp"

zmq::mtrie_t::mtrie_t () :
42
    pipes (0),
43
    min (0),
44 45
    count (0),
    live_nodes (0)
46 47 48 49 50
{
}

zmq::mtrie_t::~mtrie_t ()
{
Akhil Thampy's avatar
Akhil Thampy committed
51
    LIBZMQ_DELETE(pipes);
52

53 54
    if (count == 1) {
        zmq_assert (next.node);
55
        LIBZMQ_DELETE(next.node);
56
    }
57 58 59 60
    else if (count > 1) {
        for (unsigned short i = 0; i != count; ++i) {
            LIBZMQ_DELETE(next.table[i]);
        }
61 62 63 64 65
        free (next.table);
    }
}

bool zmq::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_)
66 67 68 69 70 71
{
    return add_helper (prefix_, size_, pipe_);
}

bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
    pipe_t *pipe_)
72 73 74
{
    //  We are at the node corresponding to the prefix. We are done.
    if (!size_) {
75
        bool result = !pipes;
76 77 78 79
        if (!pipes) {
            pipes = new (std::nothrow) pipes_t;
            alloc_assert (pipes);
        }
80
        pipes->insert (pipe_);
81 82 83 84 85 86 87
        return result;
    }

    unsigned char c = *prefix_;
    if (c < min || c >= min + count) {

        //  The character is out of range of currently handled
88
        //  characters. We have to extend the table.
89 90 91 92 93
        if (!count) {
            min = c;
            count = 1;
            next.node = NULL;
        }
94
        else
95
        if (count == 1) {
96 97 98 99 100
            unsigned char oldc = min;
            mtrie_t *oldp = next.node;
            count = (min < c ? c - min : min - c) + 1;
            next.table = (mtrie_t**)
                malloc (sizeof (mtrie_t*) * count);
101
            alloc_assert (next.table);
102 103 104 105 106
            for (unsigned short i = 0; i != count; ++i)
                next.table [i] = 0;
            min = std::min (min, c);
            next.table [oldc - min] = oldp;
        }
107
        else
108
        if (min < c) {
109 110 111
            //  The new character is above the current character range.
            unsigned short old_count = count;
            count = c - min + 1;
112
            next.table = (mtrie_t**) realloc (next.table,
113
                sizeof (mtrie_t*) * count);
114
            alloc_assert (next.table);
115 116 117 118 119 120 121
            for (unsigned short i = old_count; i != count; i++)
                next.table [i] = NULL;
        }
        else {
            //  The new character is below the current character range.
            unsigned short old_count = count;
            count = (min + old_count) - c;
122
            next.table = (mtrie_t**) realloc (next.table,
123
                sizeof (mtrie_t*) * count);
124
            alloc_assert (next.table);
125 126 127 128 129 130 131 132 133 134 135 136
            memmove (next.table + min - c, next.table,
                old_count * sizeof (mtrie_t*));
            for (unsigned short i = 0; i != min - c; i++)
                next.table [i] = NULL;
            min = c;
        }
    }

    //  If next node does not exist, create one.
    if (count == 1) {
        if (!next.node) {
            next.node = new (std::nothrow) mtrie_t;
137
            alloc_assert (next.node);
138
            ++live_nodes;
139
        }
140
        return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_);
141 142 143 144
    }
    else {
        if (!next.table [c - min]) {
            next.table [c - min] = new (std::nothrow) mtrie_t;
145
            alloc_assert (next.table [c - min]);
146
            ++live_nodes;
147
        }
148
        return next.table [c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_);
149 150 151 152 153 154
    }
}


void zmq::mtrie_t::rm (pipe_t *pipe_,
    void (*func_) (unsigned char *data_, size_t size_, void *arg_),
155
    void *arg_, bool call_on_uniq_)
156 157
{
    unsigned char *buff = NULL;
158
    rm_helper (pipe_, &buff, 0, 0, func_, arg_, call_on_uniq_);
159 160 161 162 163 164
    free (buff);
}

void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
    size_t buffsize_, size_t maxbuffsize_,
    void (*func_) (unsigned char *data_, size_t size_, void *arg_),
165
    void *arg_, bool call_on_uniq_)
166 167
{
    //  Remove the subscription from this node.
168 169 170 171 172 173
    if (pipes && pipes->erase (pipe_)) {
        if (!call_on_uniq_ || pipes->empty ()) {
            func_ (*buff_, buffsize_, arg_);
        }

        if (pipes->empty ()) {
174
            LIBZMQ_DELETE(pipes);
175
        }
176
    }
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193

    //  Adjust the buffer.
    if (buffsize_ >= maxbuffsize_) {
        maxbuffsize_ = buffsize_ + 256;
        *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
        alloc_assert (*buff_);
    }

    //  If there are no subnodes in the trie, return.
    if (count == 0)
        return;

    //  If there's one subnode (optimisation).
    if (count == 1) {
        (*buff_) [buffsize_] = min;
        buffsize_++;
        next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_,
194
            func_, arg_, call_on_uniq_);
195 196

        //  Prune the node if it was made redundant by the removal
197
        if (next.node->is_redundant ()) {
198
            LIBZMQ_DELETE(next.node);
199
            count = 0;
200
            --live_nodes;
201
            zmq_assert (live_nodes == 0);
202
        }
203 204 205 206
        return;
    }

    //  If there are multiple subnodes.
207 208
    //
    //  New min non-null character in the node table after the removal
209
    unsigned char new_min = min + count - 1;
210
    //  New max non-null character in the node table after the removal
211
    unsigned char new_max = min;
212
    for (unsigned short c = 0; c != count; c++) {
213
        (*buff_) [buffsize_] = min + c;
214
        if (next.table [c]) {
215
            next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1,
216
                maxbuffsize_, func_, arg_, call_on_uniq_);
217

218
            //  Prune redundant nodes from the mtrie
219
            if (next.table [c]->is_redundant ()) {
220
                LIBZMQ_DELETE(next.table[c]);
221 222

                zmq_assert (live_nodes > 0);
223 224
                --live_nodes;
            }
225
            else {
226 227 228 229 230 231 232 233
                //  The node is not redundant, so it's a candidate for being
                //  the new min/max node.
                //
                //  We loop through the node array from left to right, so the
                //  first non-null, non-redundant node encountered is the new
                //  minimum index. Conversely, the last non-redundant, non-null
                //  node encountered is the new maximum index.
                if (c + min < new_min)
234
                    new_min = c + min;
235
                if (c + min > new_max)
236 237
                    new_max = c + min;
            }
238 239
        }
    }
240 241 242

    zmq_assert (count > 1);

243 244 245 246 247 248
    //  Free the node table if it's no longer used.
    if (live_nodes == 0) {
        free (next.table);
        next.table = NULL;
        count = 0;
    }
249
    //  Compact the node table if possible
250
    else
251
    if (live_nodes == 1) {
252 253 254 255 256 257 258 259 260 261 262 263
        //  If there's only one live node in the table we can
        //  switch to using the more compact single-node
        //  representation
        zmq_assert (new_min == new_max);
        zmq_assert (new_min >= min && new_min < min + count);
        mtrie_t *node = next.table [new_min - min];
        zmq_assert (node);
        free (next.table);
        next.node = node;
        count = 1;
        min = new_min;
    }
264 265
    else
    if (new_min > min || new_max < min + count - 1) {
266 267 268 269 270 271 272 273 274 275
        zmq_assert (new_max - new_min + 1 > 1);

        mtrie_t **old_table = next.table;
        zmq_assert (new_min > min || new_max < min + count - 1);
        zmq_assert (new_min >= min);
        zmq_assert (new_max <= min + count - 1);
        zmq_assert (new_max - new_min + 1 < count);

        count = new_max - new_min + 1;
        next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
276
        alloc_assert (next.table);
277 278 279 280 281 282 283

        memmove (next.table, old_table + (new_min - min),
                 sizeof (mtrie_t*) * count);
        free (old_table);

        min = new_min;
    }
284 285 286 287
}

bool zmq::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_)
{
288 289
    return rm_helper (prefix_, size_, pipe_);
}
290

291 292 293 294
bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,
    pipe_t *pipe_)
{
    if (!size_) {
295 296 297 298
        if (pipes) {
            pipes_t::size_type erased = pipes->erase (pipe_);
            zmq_assert (erased == 1);
            if (pipes->empty ()) {
299
                LIBZMQ_DELETE(pipes);
300 301 302
            }
        }
        return !pipes;
303 304 305 306 307
    }

    unsigned char c = *prefix_;
    if (!count || c < min || c >= min + count)
        return false;
308

309 310
    mtrie_t *next_node =
        count == 1 ? next.node : next.table [c - min];
311

312 313
    if (!next_node)
        return false;
314

315 316 317
    bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_);

    if (next_node->is_redundant ()) {
318
        LIBZMQ_DELETE(next_node);
319 320
        zmq_assert (count > 0);

321
        if (count == 1) {
322
            next.node = 0;
323
            count = 0;
324 325
            --live_nodes;
            zmq_assert (live_nodes == 0);
326
        }
327
        else {
328
            next.table [c - min] = 0;
329 330 331 332 333 334 335 336
            zmq_assert (live_nodes > 1);
            --live_nodes;

            //  Compact the table if possible
            if (live_nodes == 1) {
                //  If there's only one live node in the table we can
                //  switch to using the more compact single-node
                //  representation
337 338 339
                unsigned short i;
                for (i = 0; i < count; ++i)
                    if (next.table [i])
340 341
                        break;

342 343
                zmq_assert (i < count);
                min += i;
344
                count = 1;
345 346 347
                mtrie_t *oldp = next.table [i];
                free (next.table);
                next.node = oldp;
348
            }
349 350
            else
            if (c == min) {
351
                //  We can compact the table "from the left"
352 353 354
                unsigned short i;
                for (i = 1; i < count; ++i)
                    if (next.table [i])
355 356
                        break;

357 358 359
                zmq_assert (i < count);
                min += i;
                count -= i;
360 361
                mtrie_t **old_table = next.table;
                next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
362
                alloc_assert (next.table);
363
                memmove (next.table, old_table + i, sizeof (mtrie_t*) * count);
364 365
                free (old_table);
            }
366 367
            else
            if (c == min + count - 1) {
368
                //  We can compact the table "from the right"
369 370 371
                unsigned short i;
                for (i = 1; i < count; ++i)
                    if (next.table [count - 1 - i])
372 373
                        break;

374 375
                zmq_assert (i < count);
                count -= i;
376 377
                mtrie_t **old_table = next.table;
                next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
378
                alloc_assert (next.table);
379 380 381 382
                memmove (next.table, old_table, sizeof (mtrie_t*) * count);
                free (old_table);
            }
        }
383 384 385
    }

    return ret;
386 387
}

388 389
void zmq::mtrie_t::match (unsigned char *data_, size_t size_,
    void (*func_) (pipe_t *pipe_, void *arg_), void *arg_)
390
{
391
    mtrie_t *current = this;
392
    while (true) {
393 394

        //  Signal the pipes attached to this node.
395 396 397 398 399
        if (current->pipes) {
            for (pipes_t::iterator it = current->pipes->begin ();
                  it != current->pipes->end (); ++it)
                func_ (*it, arg_);
        }
400

401 402 403 404
        //  If we are at the end of the message, there's nothing more to match.
        if (!size_)
            break;

405 406 407 408 409
        //  If there are no subnodes in the trie, return.
        if (current->count == 0)
            break;

        //  If there's one subnode (optimisation).
410
        if (current->count == 1) {
411 412 413 414 415
            if (data_ [0] != current->min)
                break;
            current = current->next.node;
            data_++;
            size_--;
416 417
            continue;
        }
418

419
        //  If there are multiple subnodes.
420 421
        if (data_ [0] < current->min || data_ [0] >=
              current->min + current->count)
422
            break;
423
        if (!current->next.table [data_ [0] - current->min])
424
            break;
425
        current = current->next.table [data_ [0] - current->min];
426 427
        data_++;
        size_--;
428 429 430
    }
}

431
bool zmq::mtrie_t::is_redundant () const
432
{
433
    return !pipes && live_nodes == 0;
434
}