-
Notifications
You must be signed in to change notification settings - Fork 5
/
universal-sub-push.c
134 lines (106 loc) · 3.76 KB
/
universal-sub-push.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/* This software is the client component of the ND-OV system.
* Each multipart message it receives from the ND-OV system
* consisting of an envelope and its data is rebroadcasted
* to all connected clients of the serviceprovider.
*
* Requirements: zeromq2 or zeromq3.2
* gcc -lzmq -o universal-sub-push universal-sub-push.c
*
* Changes:
* - Initial version <[email protected]>
* - zeromq 3.2 compatibility added,
* pubsub binding bugfix <[email protected]>
*/
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>
#include "zmq.h"
int main (int argc, char *argv[]) {
/* Our process ID and Session ID */
pid_t pid;
if (argc < 3) {
printf("%s [subscriber] (filter1 filter2 filterN) [pull]\n\nEx.\n" \
"%s tcp://127.0.0.1:7827 tcp://127.0.0.1:7817\n",
argv[0], argv[0]);
exit(-1);
}
/* Fork off the parent process */
pid = fork();
/* If we got a good PID, then we can exit the parent process. */
if (pid > 0) {
exit(EXIT_SUCCESS);
}
/* If forking actually didn't work */
if (pid < 0) {
/* Close out the standard file descriptors */
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
}
chdir("/var/empty");
void *context = zmq_init (1);
void *push = zmq_socket (context, ZMQ_PUSH);
/* Apply a high water mark at the PubSub */
uint64_t hwm = 8192;
zmq_setsockopt (push, ZMQ_SNDHWM, &hwm, sizeof(hwm));
zmq_setsockopt (push, ZMQ_RCVHWM, &hwm, sizeof(hwm));
/* Check if we are root */
if (getuid() == 0 || geteuid() == 0) {
uid_t puid = 65534; /* just use the traditional value */
gid_t pgid = 65534;
/* Now we chroot to this directory, preventing any write access outside it */
chroot("/var/empty");
/* After we bind to the socket and chrooted, we drop priviledges to nobody */
setuid(puid);
setgid(pgid);
}
zmq_connect (push, argv[argc - 1]);
zmq_pollitem_t items[1];
init:
items[0].socket = zmq_socket (context, ZMQ_SUB);
items[0].events = ZMQ_POLLIN;
/* Apply filters to the subscription from the remote source */
if (argc > 3) {
unsigned int i;
for (i = 2; i < (argc - 1); i++) {
zmq_setsockopt (items[0].socket, ZMQ_SUBSCRIBE, argv[i], strlen(argv[i]));
}
} else {
zmq_setsockopt (items[0].socket, ZMQ_SUBSCRIBE, "", 0);
}
zmq_connect (items[0].socket, argv[1]);
int rc;
size_t more_size = sizeof(more_t);
/* Ensure that every 60s there is data */
while ((rc = zmq_poll (items, 1, 60 * 1000 * ZMQ_POLL_MSEC)) >= 0) {
if (rc > 0) {
more_t more;
do {
/* Create an empty 0MQ message to hold the message part */
zmq_msg_t part;
rc = zmq_msg_init (&part);
assert (rc == 0);
/* Block until a message is available to be received from the socket */
rc = zmq_msg_recv (&part, items[0].socket, 0);
assert (rc != -1);
/* Determine if more message parts are to follow */
rc = zmq_getsockopt (items[0].socket, ZMQ_RCVMORE, &more, &more_size);
assert (rc == 0);
/* Send the message, when more is set, apply the flag, otherwise don't */
zmq_msg_send (&part, push, (more ? ZMQ_SNDMORE : 0));
zmq_msg_close (&part);
} while (more);
} else {
zmq_close (items[0].socket);
sleep (1);
goto init;
}
}
zmq_close (items[0].socket);
zmq_close (push);
zmq_ctx_destroy (context);
return rc;
}