Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
fca2e8e8
Commit
fca2e8e8
authored
Jun 21, 2010
by
Martin Hurton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add SWAP support
parent
10c28c1f
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
545 additions
and
39 deletions
+545
-39
Makefile.am
src/Makefile.am
+2
-0
msg_store.cpp
src/msg_store.cpp
+313
-0
msg_store.hpp
src/msg_store.hpp
+114
-0
pipe.cpp
src/pipe.cpp
+90
-24
pipe.hpp
src/pipe.hpp
+20
-9
session.cpp
src/session.cpp
+2
-2
socket_base.cpp
src/socket_base.cpp
+4
-4
No files found.
src/Makefile.am
View file @
fca2e8e8
...
@@ -76,6 +76,7 @@ libzmq_la_SOURCES = app_thread.hpp \
...
@@ -76,6 +76,7 @@ libzmq_la_SOURCES = app_thread.hpp \
lb.hpp
\
lb.hpp
\
likely.hpp
\
likely.hpp
\
msg_content.hpp
\
msg_content.hpp
\
msg_store.hpp
\
mutex.hpp
\
mutex.hpp
\
object.hpp
\
object.hpp
\
options.hpp
\
options.hpp
\
...
@@ -134,6 +135,7 @@ libzmq_la_SOURCES = app_thread.hpp \
...
@@ -134,6 +135,7 @@ libzmq_la_SOURCES = app_thread.hpp \
ip.cpp
\
ip.cpp
\
kqueue.cpp
\
kqueue.cpp
\
lb.cpp
\
lb.cpp
\
msg_store.cpp
\
object.cpp
\
object.cpp
\
options.cpp
\
options.cpp
\
owned.cpp
\
owned.cpp
\
...
...
src/msg_store.cpp
0 → 100644
View file @
fca2e8e8
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <sstream>
#include <algorithm>
#ifdef ZMQ_HAVE_WINDOWS
#include <io.h>
#else
#include <unistd.h>
#endif
#include "atomic_counter.hpp"
#include "msg_store.hpp"
#include "err.hpp"
zmq
::
msg_store_t
::
msg_store_t
(
int64_t
filesize_
,
size_t
block_size_
)
:
fd
(
-
1
),
filesize
(
filesize_
),
file_pos
(
0
),
write_pos
(
0
),
read_pos
(
0
),
block_size
(
block_size_
),
write_buf_start_addr
(
0
)
{
zmq_assert
(
filesize
>
0
);
zmq_assert
(
block_size
>
0
);
buf1
=
new
(
std
::
nothrow
)
char
[
block_size
];
zmq_assert
(
buf1
);
buf2
=
new
(
std
::
nothrow
)
char
[
block_size
];
zmq_assert
(
buf2
);
read_buf
=
write_buf
=
buf1
;
}
zmq
::
msg_store_t
::~
msg_store_t
()
{
delete
[]
buf1
;
delete
[]
buf2
;
if
(
fd
==
-
1
)
return
;
#ifdef ZMQ_HAVE_WINDOWS
int
rc
=
_close
(
fd
);
#else
int
rc
=
close
(
fd
);
#endif
errno_assert
(
rc
==
0
);
#ifdef ZMQ_HAVE_WINDOWS
rc
=
_unlink
(
filename
.
c_str
());
#else
rc
=
unlink
(
filename
.
c_str
());
#endif
errno_assert
(
rc
==
0
);
}
int
zmq
::
msg_store_t
::
init
()
{
static
zmq
::
atomic_counter_t
seqnum
(
0
);
// Get process ID.
#ifdef ZMQ_HAVE_WINDOWS
int
pid
=
GetCurrentThreadId
();
#else
pid_t
pid
=
getpid
();
#endif
std
::
ostringstream
outs
;
outs
<<
"zmq_"
<<
pid
<<
'_'
<<
seqnum
.
get
()
<<
".swap"
;
filename
=
outs
.
str
();
seqnum
.
add
(
1
);
// Open the backing file.
#ifdef ZMQ_HAVE_WINDOWS
fd
=
_open
(
filename
.
c_str
(),
_O_RDWR
|
_O_CREAT
,
0600
);
#else
fd
=
open
(
filename
.
c_str
(),
O_RDWR
|
O_CREAT
,
0600
);
#endif
if
(
fd
==
-
1
)
return
-
1
;
#ifdef ZMQ_HAVE_LINUX
// Enable more aggresive read-ahead optimization.
posix_fadvise
(
fd
,
0
,
filesize
,
POSIX_FADV_SEQUENTIAL
);
#endif
return
0
;
}
bool
zmq
::
msg_store_t
::
store
(
zmq_msg_t
*
msg_
)
{
size_t
msg_size
=
zmq_msg_size
(
msg_
);
// Check buffer space availability.
// NOTE: We always keep one byte open.
if
(
buffer_space
()
<=
(
int64_t
)
(
sizeof
msg_size
+
1
+
msg_size
))
return
false
;
// Don't store the ZMQ_MSG_SHARED flag.
uint8_t
msg_flags
=
msg_
->
flags
&
~
ZMQ_MSG_SHARED
;
// Write message length, flags, and message body.
copy_to_file
(
&
msg_size
,
sizeof
msg_size
);
copy_to_file
(
&
msg_flags
,
sizeof
msg_flags
);
copy_to_file
(
zmq_msg_data
(
msg_
),
msg_size
);
zmq_msg_close
(
msg_
);
return
true
;
}
void
zmq
::
msg_store_t
::
fetch
(
zmq_msg_t
*
msg_
)
{
// There must be at least one message available.
zmq_assert
(
read_pos
!=
write_pos
);
// Retrieve the message size.
size_t
msg_size
;
copy_from_file
(
&
msg_size
,
sizeof
msg_size
);
// Initialize the message.
zmq_msg_init_size
(
msg_
,
msg_size
);
// Retrieve the message flags.
copy_from_file
(
&
msg_
->
flags
,
sizeof
msg_
->
flags
);
// Retrieve the message payload.
copy_from_file
(
zmq_msg_data
(
msg_
),
msg_size
);
}
void
zmq
::
msg_store_t
::
commit
()
{
commit_pos
=
write_pos
;
}
void
zmq
::
msg_store_t
::
rollback
()
{
if
(
commit_pos
==
write_pos
||
read_pos
==
write_pos
)
return
;
if
(
write_pos
>
read_pos
)
zmq_assert
(
read_pos
<=
commit_pos
&&
commit_pos
<=
write_pos
);
else
zmq_assert
(
read_pos
<=
commit_pos
||
commit_pos
<=
write_pos
);
if
(
commit_pos
/
block_size
==
read_pos
/
block_size
)
{
write_buf_start_addr
=
commit_pos
%
block_size
;
write_buf
=
read_buf
;
}
else
if
(
commit_pos
/
block_size
!=
write_pos
/
block_size
)
{
write_buf_start_addr
=
commit_pos
%
block_size
;
fill_buf
(
write_buf
,
write_buf_start_addr
);
}
write_pos
=
commit_pos
;
}
bool
zmq
::
msg_store_t
::
empty
()
{
return
read_pos
==
write_pos
;
}
bool
zmq
::
msg_store_t
::
full
()
{
return
buffer_space
()
==
1
;
}
void
zmq
::
msg_store_t
::
copy_from_file
(
void
*
buffer_
,
size_t
count_
)
{
char
*
ptr
=
(
char
*
)
buffer_
;
size_t
n
,
n_left
=
count_
;
while
(
n_left
>
0
)
{
n
=
std
::
min
(
n_left
,
std
::
min
((
size_t
)
(
filesize
-
read_pos
),
(
size_t
)
(
block_size
-
read_pos
%
block_size
)));
memcpy
(
ptr
,
&
read_buf
[
read_pos
%
block_size
],
n
);
ptr
+=
n
;
read_pos
=
(
read_pos
+
n
)
%
filesize
;
if
(
read_pos
%
block_size
==
0
)
{
if
(
read_pos
/
block_size
==
write_pos
/
block_size
)
read_buf
=
write_buf
;
else
fill_buf
(
read_buf
,
read_pos
);
}
n_left
-=
n
;
}
}
void
zmq
::
msg_store_t
::
copy_to_file
(
const
void
*
buffer_
,
size_t
count_
)
{
char
*
ptr
=
(
char
*
)
buffer_
;
size_t
n
,
n_left
=
count_
;
while
(
n_left
>
0
)
{
n
=
std
::
min
(
n_left
,
std
::
min
((
size_t
)
(
filesize
-
write_pos
),
(
size_t
)
(
block_size
-
write_pos
%
block_size
)));
memcpy
(
&
write_buf
[
write_pos
%
block_size
],
ptr
,
n
);
ptr
+=
n
;
write_pos
=
(
write_pos
+
n
)
%
filesize
;
if
(
write_pos
%
block_size
==
0
)
{
save_write_buf
();
write_buf_start_addr
=
write_pos
;
if
(
write_buf
==
read_buf
)
{
if
(
read_buf
==
buf2
)
write_buf
=
buf1
;
else
write_buf
=
buf2
;
}
}
n_left
-=
n
;
}
}
void
zmq
::
msg_store_t
::
fill_buf
(
char
*
buf
,
int64_t
pos
)
{
if
(
file_pos
!=
pos
)
{
#ifdef ZMQ_HAVE_WINDOWS
__int64
offset
=
_lseeki64
(
fd
,
pos
,
SEEK_SET
);
#else
off_t
offset
=
lseek
(
fd
,
(
off_t
)
pos
,
SEEK_SET
);
#endif
errno_assert
(
offset
==
pos
);
file_pos
=
pos
;
}
size_t
i
=
0
;
size_t
n
=
std
::
min
(
block_size
,
(
size_t
)
(
filesize
-
file_pos
));
while
(
i
<
n
)
{
#ifdef ZMQ_HAVE_WINDOWS
int
rc
=
_read
(
fd
,
&
buf
[
i
],
n
-
i
);
#else
ssize_t
rc
=
read
(
fd
,
&
buf
[
i
],
n
-
i
);
#endif
errno_assert
(
rc
>
0
);
i
+=
rc
;
}
file_pos
+=
n
;
}
void
zmq
::
msg_store_t
::
save_write_buf
()
{
if
(
file_pos
!=
write_buf_start_addr
)
{
#ifdef ZMQ_HAVE_WINDOWS
__int64
offset
=
_lseeki64
(
fd
,
write_buf_start_addr
,
SEEK_SET
);
#else
off_t
offset
=
lseek
(
fd
,
(
off_t
)
write_buf_start_addr
,
SEEK_SET
);
#endif
errno_assert
(
offset
==
write_buf_start_addr
);
file_pos
=
write_buf_start_addr
;
}
size_t
i
=
0
;
size_t
n
=
std
::
min
(
block_size
,
(
size_t
)
(
filesize
-
file_pos
));
while
(
i
<
n
)
{
#ifdef ZMQ_HAVE_WINDOWS
int
rc
=
_write
(
fd
,
&
write_buf
[
i
],
n
-
i
);
#else
ssize_t
rc
=
write
(
fd
,
&
write_buf
[
i
],
n
-
i
);
#endif
errno_assert
(
rc
>
0
);
i
+=
rc
;
}
file_pos
+=
n
;
}
int64_t
zmq
::
msg_store_t
::
buffer_space
()
{
if
(
write_pos
<
read_pos
)
return
read_pos
-
write_pos
;
return
filesize
-
(
write_pos
-
read_pos
);
}
src/msg_store.hpp
0 → 100644
View file @
fca2e8e8
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_MSG_STORE_HPP_INCLUDED__
#define __ZMQ_MSG_STORE_HPP_INCLUDED__
#include "../include/zmq.h"
#include <string>
#include "stdint.hpp"
namespace
zmq
{
// This class implements a message store. Messages are retrieved from
// the store in the same order as they entered it.
class
msg_store_t
{
public
:
enum
{
default_block_size
=
8192
};
// Creates message store.
msg_store_t
(
int64_t
filesize_
,
size_t
block_size_
=
default_block_size
);
~
msg_store_t
();
int
init
();
// Stores the message into the message store. The function
// returns false if the message store is full; true otherwise.
bool
store
(
zmq_msg_t
*
msg_
);
// Fetches the oldest message from the message store. It is an error
// to call this function when the message store is empty.
void
fetch
(
zmq_msg_t
*
msg_
);
void
commit
();
void
rollback
();
// Returns true if the message store is empty; false otherwise.
bool
empty
();
// Returns true if and only if the store is full.
bool
full
();
private
:
// Copies data from a memory buffer to the backing file.
// Wraps around when reaching maximum file size.
void
copy_from_file
(
void
*
buffer_
,
size_t
count_
);
// Copies data from the backing file to the memory buffer.
// Wraps around when reaching end-of-file.
void
copy_to_file
(
const
void
*
buffer_
,
size_t
count_
);
// Returns the buffer space available.
int64_t
buffer_space
();
void
fill_buf
(
char
*
buf
,
int64_t
pos
);
void
save_write_buf
();
// File descriptor to the backing file.
int
fd
;
// Name of the backing file.
std
::
string
filename
;
// Maximum size of the backing file.
int64_t
filesize
;
// File offset associated with the fd file descriptor.
int64_t
file_pos
;
// File offset the next message will be stored at.
int64_t
write_pos
;
// File offset the next message will be read from.
int64_t
read_pos
;
int64_t
commit_pos
;
size_t
block_size
;
char
*
buf1
;
char
*
buf2
;
char
*
read_buf
;
char
*
write_buf
;
int64_t
write_buf_start_addr
;
};
}
#endif
src/pipe.cpp
View file @
fca2e8e8
...
@@ -21,20 +21,14 @@
...
@@ -21,20 +21,14 @@
#include "pipe.hpp"
#include "pipe.hpp"
zmq
::
reader_t
::
reader_t
(
object_t
*
parent_
,
zmq
::
reader_t
::
reader_t
(
object_t
*
parent_
,
uint64_t
lwm_
)
:
uint64_t
hwm_
,
uint64_t
lwm_
)
:
object_t
(
parent_
),
object_t
(
parent_
),
pipe
(
NULL
),
pipe
(
NULL
),
peer
(
NULL
),
peer
(
NULL
),
hwm
(
hwm_
),
lwm
(
lwm_
),
lwm
(
lwm_
),
msgs_read
(
0
),
msgs_read
(
0
),
endpoint
(
NULL
)
endpoint
(
NULL
)
{
{}
// Adjust lwm and hwm.
if
(
lwm
==
0
||
lwm
>
hwm
)
lwm
=
hwm
;
}
zmq
::
reader_t
::~
reader_t
()
zmq
::
reader_t
::~
reader_t
()
{
{
...
@@ -113,20 +107,28 @@ void zmq::reader_t::process_pipe_term_ack ()
...
@@ -113,20 +107,28 @@ void zmq::reader_t::process_pipe_term_ack ()
}
}
zmq
::
writer_t
::
writer_t
(
object_t
*
parent_
,
zmq
::
writer_t
::
writer_t
(
object_t
*
parent_
,
uint64_t
hwm_
,
uint64_t
lwm
_
)
:
uint64_t
hwm_
,
int64_t
swap_size
_
)
:
object_t
(
parent_
),
object_t
(
parent_
),
pipe
(
NULL
),
pipe
(
NULL
),
peer
(
NULL
),
peer
(
NULL
),
hwm
(
hwm_
),
hwm
(
hwm_
),
lwm
(
lwm_
),
msgs_read
(
0
),
msgs_read
(
0
),
msgs_written
(
0
),
msgs_written
(
0
),
msg_store
(
NULL
),
extra_msg_flag
(
false
),
stalled
(
false
),
stalled
(
false
),
pending_close
(
false
),
endpoint
(
NULL
)
endpoint
(
NULL
)
{
{
// Adjust lwm and hwm.
if
(
swap_size_
>
0
)
{
if
(
lwm
==
0
||
lwm
>
hwm
)
msg_store
=
new
(
std
::
nothrow
)
msg_store_t
(
swap_size_
);
lwm
=
hwm
;
if
(
msg_store
!=
NULL
)
{
if
(
msg_store
->
init
()
<
0
)
{
delete
msg_store
;
msg_store
=
NULL
;
}
}
}
}
}
void
zmq
::
writer_t
::
set_endpoint
(
i_endpoint
*
endpoint_
)
void
zmq
::
writer_t
::
set_endpoint
(
i_endpoint
*
endpoint_
)
...
@@ -136,6 +138,10 @@ void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
...
@@ -136,6 +138,10 @@ void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
zmq
::
writer_t
::~
writer_t
()
zmq
::
writer_t
::~
writer_t
()
{
{
if
(
extra_msg_flag
)
zmq_msg_close
(
&
extra_msg
);
delete
msg_store
;
}
}
void
zmq
::
writer_t
::
set_pipe
(
pipe_t
*
pipe_
)
void
zmq
::
writer_t
::
set_pipe
(
pipe_t
*
pipe_
)
...
@@ -147,7 +153,7 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_)
...
@@ -147,7 +153,7 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_)
bool
zmq
::
writer_t
::
check_write
()
bool
zmq
::
writer_t
::
check_write
()
{
{
if
(
pipe_full
())
{
if
(
pipe_full
()
&&
(
msg_store
==
NULL
||
msg_store
->
full
()
||
extra_msg_flag
)
)
{
stalled
=
true
;
stalled
=
true
;
return
false
;
return
false
;
}
}
...
@@ -157,28 +163,45 @@ bool zmq::writer_t::check_write ()
...
@@ -157,28 +163,45 @@ bool zmq::writer_t::check_write ()
bool
zmq
::
writer_t
::
write
(
zmq_msg_t
*
msg_
)
bool
zmq
::
writer_t
::
write
(
zmq_msg_t
*
msg_
)
{
{
if
(
pipe_full
())
{
if
(
!
check_write
())
stalled
=
true
;
return
false
;
return
false
;
if
(
pipe_full
())
{
if
(
msg_store
->
store
(
msg_
))
{
if
(
!
(
msg_
->
flags
&
ZMQ_MSG_MORE
))
msg_store
->
commit
();
}
else
{
extra_msg
=
*
msg_
;
extra_msg_flag
=
true
;
}
}
else
{
pipe
->
write
(
*
msg_
,
msg_
->
flags
&
ZMQ_MSG_MORE
);
if
(
!
(
msg_
->
flags
&
ZMQ_MSG_MORE
))
msgs_written
++
;
}
}
pipe
->
write
(
*
msg_
,
msg_
->
flags
&
ZMQ_MSG_MORE
);
if
(
!
(
msg_
->
flags
&
ZMQ_MSG_MORE
))
msgs_written
++
;
return
true
;
return
true
;
}
}
void
zmq
::
writer_t
::
rollback
()
void
zmq
::
writer_t
::
rollback
()
{
{
zmq_msg_t
msg
;
if
(
extra_msg_flag
&&
extra_msg
.
flags
&
ZMQ_MSG_MORE
)
{
zmq_msg_close
(
&
extra_msg
);
extra_msg_flag
=
false
;
}
if
(
msg_store
!=
NULL
)
msg_store
->
rollback
();
zmq_msg_t
msg
;
// Remove all incomplete messages from the pipe.
// Remove all incomplete messages from the pipe.
while
(
pipe
->
unwrite
(
&
msg
))
{
while
(
pipe
->
unwrite
(
&
msg
))
{
zmq_assert
(
msg
.
flags
&
ZMQ_MSG_MORE
);
zmq_assert
(
msg
.
flags
&
ZMQ_MSG_MORE
);
zmq_msg_close
(
&
msg
);
zmq_msg_close
(
&
msg
);
}
}
if
(
stalled
&&
endpoint
!=
NULL
&&
!
pipe_full
())
{
if
(
stalled
&&
endpoint
!=
NULL
&&
check_write
())
{
stalled
=
false
;
stalled
=
false
;
endpoint
->
revive
(
this
);
endpoint
->
revive
(
this
);
}
}
...
@@ -197,6 +220,14 @@ void zmq::writer_t::term ()
...
@@ -197,6 +220,14 @@ void zmq::writer_t::term ()
// Rollback any unfinished messages.
// Rollback any unfinished messages.
rollback
();
rollback
();
if
(
msg_store
==
NULL
||
(
msg_store
->
empty
()
&&
!
extra_msg_flag
))
write_delimiter
();
else
pending_close
=
true
;
}
void
zmq
::
writer_t
::
write_delimiter
()
{
// Push delimiter into the pipe.
// Push delimiter into the pipe.
// Trick the compiler to belive that the tag is a valid pointer.
// Trick the compiler to belive that the tag is a valid pointer.
zmq_msg_t
msg
;
zmq_msg_t
msg
;
...
@@ -209,7 +240,42 @@ void zmq::writer_t::term ()
...
@@ -209,7 +240,42 @@ void zmq::writer_t::term ()
void
zmq
::
writer_t
::
process_reader_info
(
uint64_t
msgs_read_
)
void
zmq
::
writer_t
::
process_reader_info
(
uint64_t
msgs_read_
)
{
{
zmq_msg_t
msg
;
msgs_read
=
msgs_read_
;
msgs_read
=
msgs_read_
;
if
(
msg_store
)
{
// Move messages from backing store into pipe.
while
(
!
pipe_full
()
&&
!
msg_store
->
empty
())
{
msg_store
->
fetch
(
&
msg
);
// Write message into the pipe.
pipe
->
write
(
msg
,
msg
.
flags
&
ZMQ_MSG_MORE
);
if
(
!
(
msg
.
flags
&
ZMQ_MSG_MORE
))
msgs_written
++
;
}
if
(
extra_msg_flag
)
{
if
(
!
pipe_full
())
{
pipe
->
write
(
extra_msg
,
extra_msg
.
flags
&
ZMQ_MSG_MORE
);
if
(
!
(
extra_msg
.
flags
&
ZMQ_MSG_MORE
))
msgs_written
++
;
extra_msg_flag
=
false
;
}
else
if
(
msg_store
->
store
(
&
extra_msg
))
{
if
(
!
(
extra_msg
.
flags
&
ZMQ_MSG_MORE
))
msg_store
->
commit
();
extra_msg_flag
=
false
;
}
}
if
(
pending_close
&&
msg_store
->
empty
()
&&
!
extra_msg_flag
)
{
write_delimiter
();
pending_close
=
false
;
}
flush
();
}
if
(
stalled
&&
endpoint
!=
NULL
)
{
if
(
stalled
&&
endpoint
!=
NULL
)
{
stalled
=
false
;
stalled
=
false
;
endpoint
->
revive
(
this
);
endpoint
->
revive
(
this
);
...
@@ -232,9 +298,9 @@ bool zmq::writer_t::pipe_full ()
...
@@ -232,9 +298,9 @@ bool zmq::writer_t::pipe_full ()
}
}
zmq
::
pipe_t
::
pipe_t
(
object_t
*
reader_parent_
,
object_t
*
writer_parent_
,
zmq
::
pipe_t
::
pipe_t
(
object_t
*
reader_parent_
,
object_t
*
writer_parent_
,
uint64_t
hwm_
)
:
uint64_t
hwm_
,
int64_t
swap_size_
)
:
reader
(
reader_parent_
,
hwm_
,
compute_lwm
(
hwm_
)),
reader
(
reader_parent_
,
compute_lwm
(
hwm_
)),
writer
(
writer_parent_
,
hwm_
,
compute_lwm
(
hwm_
)
)
writer
(
writer_parent_
,
hwm_
,
swap_size_
)
{
{
reader
.
set_pipe
(
this
);
reader
.
set_pipe
(
this
);
writer
.
set_pipe
(
this
);
writer
.
set_pipe
(
this
);
...
...
src/pipe.hpp
View file @
fca2e8e8
...
@@ -26,6 +26,7 @@
...
@@ -26,6 +26,7 @@
#include "i_endpoint.hpp"
#include "i_endpoint.hpp"
#include "yarray_item.hpp"
#include "yarray_item.hpp"
#include "ypipe.hpp"
#include "ypipe.hpp"
#include "msg_store.hpp"
#include "config.hpp"
#include "config.hpp"
#include "object.hpp"
#include "object.hpp"
...
@@ -36,8 +37,7 @@ namespace zmq
...
@@ -36,8 +37,7 @@ namespace zmq
{
{
public
:
public
:
reader_t
(
class
object_t
*
parent_
,
reader_t
(
class
object_t
*
parent_
,
uint64_t
lwm_
);
uint64_t
hwm_
,
uint64_t
lwm_
);
~
reader_t
();
~
reader_t
();
void
set_pipe
(
class
pipe_t
*
pipe_
);
void
set_pipe
(
class
pipe_t
*
pipe_
);
...
@@ -64,8 +64,7 @@ namespace zmq
...
@@ -64,8 +64,7 @@ namespace zmq
// Pipe writer associated with the other side of the pipe.
// Pipe writer associated with the other side of the pipe.
class
writer_t
*
peer
;
class
writer_t
*
peer
;
// High and low watermarks for in-memory storage (in bytes).
// Low watermark for in-memory storage (in bytes).
uint64_t
hwm
;
uint64_t
lwm
;
uint64_t
lwm
;
// Number of messages read so far.
// Number of messages read so far.
...
@@ -82,8 +81,7 @@ namespace zmq
...
@@ -82,8 +81,7 @@ namespace zmq
{
{
public
:
public
:
writer_t
(
class
object_t
*
parent_
,
writer_t
(
class
object_t
*
parent_
,
uint64_t
hwm_
,
int64_t
swap_size_
);
uint64_t
hwm_
,
uint64_t
lwm_
);
~
writer_t
();
~
writer_t
();
void
set_pipe
(
class
pipe_t
*
pipe_
);
void
set_pipe
(
class
pipe_t
*
pipe_
);
...
@@ -117,15 +115,18 @@ namespace zmq
...
@@ -117,15 +115,18 @@ namespace zmq
// Tests whether the pipe is already full.
// Tests whether the pipe is already full.
bool
pipe_full
();
bool
pipe_full
();
// Write special message to the pipe so that the reader
// can find out we are finished.
void
write_delimiter
();
// The underlying pipe.
// The underlying pipe.
class
pipe_t
*
pipe
;
class
pipe_t
*
pipe
;
// Pipe reader associated with the other side of the pipe.
// Pipe reader associated with the other side of the pipe.
class
reader_t
*
peer
;
class
reader_t
*
peer
;
// High
and low watermarks
for in-memory storage (in bytes).
// High
watermark
for in-memory storage (in bytes).
uint64_t
hwm
;
uint64_t
hwm
;
uint64_t
lwm
;
// Last confirmed number of messages read from the pipe.
// Last confirmed number of messages read from the pipe.
// The actual number can be higher.
// The actual number can be higher.
...
@@ -134,9 +135,19 @@ namespace zmq
...
@@ -134,9 +135,19 @@ namespace zmq
// Number of messages we have written so far.
// Number of messages we have written so far.
uint64_t
msgs_written
;
uint64_t
msgs_written
;
// Pointer to backing store. If NULL, messages are always
// kept in main memory.
msg_store_t
*
msg_store
;
bool
extra_msg_flag
;
zmq_msg_t
extra_msg
;
// True iff the last attempt to write a message has failed.
// True iff the last attempt to write a message has failed.
bool
stalled
;
bool
stalled
;
bool
pending_close
;
// Endpoint (either session or socket) the pipe is attached to.
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint
*
endpoint
;
i_endpoint
*
endpoint
;
...
@@ -150,7 +161,7 @@ namespace zmq
...
@@ -150,7 +161,7 @@ namespace zmq
public
:
public
:
pipe_t
(
object_t
*
reader_parent_
,
object_t
*
writer_parent_
,
pipe_t
(
object_t
*
reader_parent_
,
object_t
*
writer_parent_
,
uint64_t
hwm_
);
uint64_t
hwm_
,
int64_t
swap_size_
);
~
pipe_t
();
~
pipe_t
();
reader_t
reader
;
reader_t
reader
;
...
...
src/session.cpp
View file @
fca2e8e8
...
@@ -265,7 +265,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
...
@@ -265,7 +265,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
writer_t
*
socket_writer
=
NULL
;
writer_t
*
socket_writer
=
NULL
;
if
(
options
.
requires_in
&&
!
out_pipe
)
{
if
(
options
.
requires_in
&&
!
out_pipe
)
{
pipe_t
*
pipe
=
new
(
std
::
nothrow
)
pipe_t
(
owner
,
this
,
options
.
hwm
);
pipe_t
*
pipe
=
new
(
std
::
nothrow
)
pipe_t
(
owner
,
this
,
options
.
hwm
,
options
.
swap
);
zmq_assert
(
pipe
);
zmq_assert
(
pipe
);
out_pipe
=
&
pipe
->
writer
;
out_pipe
=
&
pipe
->
writer
;
out_pipe
->
set_endpoint
(
this
);
out_pipe
->
set_endpoint
(
this
);
...
@@ -273,7 +273,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
...
@@ -273,7 +273,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
}
}
if
(
options
.
requires_out
&&
!
in_pipe
)
{
if
(
options
.
requires_out
&&
!
in_pipe
)
{
pipe_t
*
pipe
=
new
(
std
::
nothrow
)
pipe_t
(
this
,
owner
,
options
.
hwm
);
pipe_t
*
pipe
=
new
(
std
::
nothrow
)
pipe_t
(
this
,
owner
,
options
.
hwm
,
options
.
swap
);
zmq_assert
(
pipe
);
zmq_assert
(
pipe
);
in_pipe
=
&
pipe
->
reader
;
in_pipe
=
&
pipe
->
reader
;
in_pipe
->
set_endpoint
(
this
);
in_pipe
->
set_endpoint
(
this
);
...
...
src/socket_base.cpp
View file @
fca2e8e8
...
@@ -195,13 +195,13 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -195,13 +195,13 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
// Create inbound pipe, if required.
if
(
options
.
requires_in
)
{
if
(
options
.
requires_in
)
{
in_pipe
=
new
(
std
::
nothrow
)
pipe_t
(
this
,
peer
,
options
.
hwm
);
in_pipe
=
new
(
std
::
nothrow
)
pipe_t
(
this
,
peer
,
options
.
hwm
,
options
.
swap
);
zmq_assert
(
in_pipe
);
zmq_assert
(
in_pipe
);
}
}
// Create outbound pipe, if required.
// Create outbound pipe, if required.
if
(
options
.
requires_out
)
{
if
(
options
.
requires_out
)
{
out_pipe
=
new
(
std
::
nothrow
)
pipe_t
(
peer
,
this
,
options
.
hwm
);
out_pipe
=
new
(
std
::
nothrow
)
pipe_t
(
peer
,
this
,
options
.
hwm
,
options
.
swap
);
zmq_assert
(
out_pipe
);
zmq_assert
(
out_pipe
);
}
}
...
@@ -234,14 +234,14 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -234,14 +234,14 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
// Create inbound pipe, if required.
if
(
options
.
requires_in
)
{
if
(
options
.
requires_in
)
{
in_pipe
=
new
(
std
::
nothrow
)
pipe_t
(
this
,
session
,
options
.
hwm
);
in_pipe
=
new
(
std
::
nothrow
)
pipe_t
(
this
,
session
,
options
.
hwm
,
options
.
swap
);
zmq_assert
(
in_pipe
);
zmq_assert
(
in_pipe
);
}
}
// Create outbound pipe, if required.
// Create outbound pipe, if required.
if
(
options
.
requires_out
)
{
if
(
options
.
requires_out
)
{
out_pipe
=
new
(
std
::
nothrow
)
pipe_t
(
session
,
this
,
options
.
hwm
);
out_pipe
=
new
(
std
::
nothrow
)
pipe_t
(
session
,
this
,
options
.
hwm
,
options
.
swap
);
zmq_assert
(
out_pipe
);
zmq_assert
(
out_pipe
);
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment