9 #include <sys/socket.h>
34 assert(msg_typed_ != NULL);
46 assert(msg_rpc_.IsInitialized());
47 if (msg_typed_ == NULL)
54 : owns_msg_typed_(false)
59 , is_msg_out_of_band_(false) { }
63 : owns_msg_typed_(false)
68 , is_msg_out_of_band_(false) { }
75 assert(msg_rpc_.IsInitialized());
76 if (msg_typed_ == NULL)
78 return is_msg_out_of_band_;
83 msg_rpc_.CheckTypeAndMergeFrom(other.
msg_rpc_);
84 owns_msg_typed_ =
true;
94 bool retval = msg_rpc_.ParseFromArray(buffer, size);
99 owns_msg_typed_ =
true;
108 msg_rpc_.release_msg_refcount_req();
109 msg_rpc_.release_msg_refcount_reply();
110 msg_rpc_.release_msg_read_req();
111 msg_rpc_.release_msg_read_reply();
112 msg_rpc_.release_msg_object_info_req();
113 msg_rpc_.release_msg_object_info_reply();
114 msg_rpc_.release_msg_store_req();
115 msg_rpc_.release_msg_store_abort_req();
116 msg_rpc_.release_msg_store_reply();
117 msg_rpc_.release_msg_handshake();
118 msg_rpc_.release_msg_handshake_ack();
119 msg_rpc_.release_msg_quit();
120 msg_rpc_.release_msg_ioctl();
121 msg_rpc_.release_msg_info_req();
122 msg_rpc_.release_msg_info_reply();
123 msg_rpc_.release_msg_shrink_req();
124 msg_rpc_.release_msg_shrink_reply();
125 msg_rpc_.release_msg_list_req();
126 msg_rpc_.release_msg_list_reply();
127 msg_rpc_.release_msg_detach();
128 msg_rpc_.release_msg_breadcrumb_store_req();
129 msg_rpc_.release_msg_breadcrumb_load_req();
130 msg_rpc_.release_msg_breadcrumb_reply();
136 att_size_ = original_att_size;
138 is_msg_out_of_band_ =
false;
141 owns_msg_typed_ =
false;
146 if (msg_typed_->GetTypeName() ==
"cvmfs.MsgHandshake") {
147 msg_rpc_.set_allocated_msg_handshake(
148 reinterpret_cast<cvmfs::MsgHandshake *>(msg_typed_));
149 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgHandshakeAck") {
150 msg_rpc_.set_allocated_msg_handshake_ack(
151 reinterpret_cast<cvmfs::MsgHandshakeAck *>(msg_typed_));
152 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgQuit") {
153 msg_rpc_.set_allocated_msg_quit(
154 reinterpret_cast<cvmfs::MsgQuit *>(msg_typed_));
155 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgIoctl") {
156 msg_rpc_.set_allocated_msg_ioctl(
157 reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed_));
158 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgRefcountReq") {
159 msg_rpc_.set_allocated_msg_refcount_req(
160 reinterpret_cast<cvmfs::MsgRefcountReq *>(msg_typed_));
161 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgRefcountReply") {
162 msg_rpc_.set_allocated_msg_refcount_reply(
163 reinterpret_cast<cvmfs::MsgRefcountReply *>(msg_typed_));
164 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgObjectInfoReq") {
165 msg_rpc_.set_allocated_msg_object_info_req(
166 reinterpret_cast<cvmfs::MsgObjectInfoReq *>(msg_typed_));
167 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgObjectInfoReply") {
168 msg_rpc_.set_allocated_msg_object_info_reply(
169 reinterpret_cast<cvmfs::MsgObjectInfoReply *>(msg_typed_));
170 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgReadReq") {
171 msg_rpc_.set_allocated_msg_read_req(
172 reinterpret_cast<cvmfs::MsgReadReq *>(msg_typed_));
173 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgReadReply") {
174 msg_rpc_.set_allocated_msg_read_reply(
175 reinterpret_cast<cvmfs::MsgReadReply *>(msg_typed_));
176 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgStoreReq") {
177 msg_rpc_.set_allocated_msg_store_req(
178 reinterpret_cast<cvmfs::MsgStoreReq *>(msg_typed_));
179 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgStoreAbortReq") {
180 msg_rpc_.set_allocated_msg_store_abort_req(
181 reinterpret_cast<cvmfs::MsgStoreAbortReq *>(msg_typed_));
182 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgStoreReply") {
183 msg_rpc_.set_allocated_msg_store_reply(
184 reinterpret_cast<cvmfs::MsgStoreReply *>(msg_typed_));
185 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgInfoReq") {
186 msg_rpc_.set_allocated_msg_info_req(
187 reinterpret_cast<cvmfs::MsgInfoReq *>(msg_typed_));
188 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgInfoReply") {
189 msg_rpc_.set_allocated_msg_info_reply(
190 reinterpret_cast<cvmfs::MsgInfoReply *>(msg_typed_));
191 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgShrinkReq") {
192 msg_rpc_.set_allocated_msg_shrink_req(
193 reinterpret_cast<cvmfs::MsgShrinkReq *>(msg_typed_));
194 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgShrinkReply") {
195 msg_rpc_.set_allocated_msg_shrink_reply(
196 reinterpret_cast<cvmfs::MsgShrinkReply *>(msg_typed_));
197 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgListReq") {
198 msg_rpc_.set_allocated_msg_list_req(
199 reinterpret_cast<cvmfs::MsgListReq *>(msg_typed_));
200 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgListReply") {
201 msg_rpc_.set_allocated_msg_list_reply(
202 reinterpret_cast<cvmfs::MsgListReply *>(msg_typed_));
203 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgBreadcrumbStoreReq") {
204 msg_rpc_.set_allocated_msg_breadcrumb_store_req(
205 reinterpret_cast<cvmfs::MsgBreadcrumbStoreReq *>(msg_typed_));
206 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgBreadcrumbLoadReq") {
207 msg_rpc_.set_allocated_msg_breadcrumb_load_req(
208 reinterpret_cast<cvmfs::MsgBreadcrumbLoadReq *>(msg_typed_));
209 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgBreadcrumbReply") {
210 msg_rpc_.set_allocated_msg_breadcrumb_reply(
211 reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(msg_typed_));
212 }
else if (msg_typed_->GetTypeName() ==
"cvmfs.MsgDetach") {
213 msg_rpc_.set_allocated_msg_detach(
214 reinterpret_cast<cvmfs::MsgDetach *>(msg_typed_));
215 is_msg_out_of_band_ =
true;
225 if (msg_rpc_.has_msg_handshake()) {
226 msg_typed_ = msg_rpc_.mutable_msg_handshake();
227 }
else if (msg_rpc_.has_msg_handshake_ack()) {
228 msg_typed_ = msg_rpc_.mutable_msg_handshake_ack();
229 }
else if (msg_rpc_.has_msg_quit()) {
230 msg_typed_ = msg_rpc_.mutable_msg_quit();
231 }
else if (msg_rpc_.has_msg_ioctl()) {
232 msg_typed_ = msg_rpc_.mutable_msg_ioctl();
233 }
else if (msg_rpc_.has_msg_refcount_req()) {
234 msg_typed_ = msg_rpc_.mutable_msg_refcount_req();
235 }
else if (msg_rpc_.has_msg_refcount_reply()) {
236 msg_typed_ = msg_rpc_.mutable_msg_refcount_reply();
237 }
else if (msg_rpc_.has_msg_object_info_req()) {
238 msg_typed_ = msg_rpc_.mutable_msg_object_info_req();
239 }
else if (msg_rpc_.has_msg_object_info_reply()) {
240 msg_typed_ = msg_rpc_.mutable_msg_object_info_reply();
241 }
else if (msg_rpc_.has_msg_read_req()) {
242 msg_typed_ = msg_rpc_.mutable_msg_read_req();
243 }
else if (msg_rpc_.has_msg_read_reply()) {
244 msg_typed_ = msg_rpc_.mutable_msg_read_reply();
245 }
else if (msg_rpc_.has_msg_store_req()) {
246 msg_typed_ = msg_rpc_.mutable_msg_store_req();
247 }
else if (msg_rpc_.has_msg_store_abort_req()) {
248 msg_typed_ = msg_rpc_.mutable_msg_store_abort_req();
249 }
else if (msg_rpc_.has_msg_store_reply()) {
250 msg_typed_ = msg_rpc_.mutable_msg_store_reply();
251 }
else if (msg_rpc_.has_msg_info_req()) {
252 msg_typed_ = msg_rpc_.mutable_msg_info_req();
253 }
else if (msg_rpc_.has_msg_info_reply()) {
254 msg_typed_ = msg_rpc_.mutable_msg_info_reply();
255 }
else if (msg_rpc_.has_msg_shrink_req()) {
256 msg_typed_ = msg_rpc_.mutable_msg_shrink_req();
257 }
else if (msg_rpc_.has_msg_shrink_reply()) {
258 msg_typed_ = msg_rpc_.mutable_msg_shrink_reply();
259 }
else if (msg_rpc_.has_msg_list_req()) {
260 msg_typed_ = msg_rpc_.mutable_msg_list_req();
261 }
else if (msg_rpc_.has_msg_list_reply()) {
262 msg_typed_ = msg_rpc_.mutable_msg_list_reply();
263 }
else if (msg_rpc_.has_msg_breadcrumb_store_req()) {
264 msg_typed_ = msg_rpc_.mutable_msg_breadcrumb_store_req();
265 }
else if (msg_rpc_.has_msg_breadcrumb_load_req()) {
266 msg_typed_ = msg_rpc_.mutable_msg_breadcrumb_load_req();
267 }
else if (msg_rpc_.has_msg_breadcrumb_reply()) {
268 msg_typed_ = msg_rpc_.mutable_msg_breadcrumb_reply();
269 }
else if (msg_rpc_.has_msg_detach()) {
270 msg_typed_ = msg_rpc_.mutable_msg_detach();
271 is_msg_out_of_band_ =
true;
289 : fd_connection_(fd_connection), flags_(flags) {
295 cvmfs::MsgHash *msg_hash) {
298 msg_hash->set_algorithm(cvmfs::HASH_SHA1);
301 msg_hash->set_algorithm(cvmfs::HASH_RIPEMD160);
304 msg_hash->set_algorithm(cvmfs::HASH_SHAKE128);
314 cvmfs::EnumObjectType *wire_type) {
315 *wire_type = cvmfs::OBJECT_REGULAR;
317 *wire_type = cvmfs::OBJECT_CATALOG;
319 *wire_type = cvmfs::OBJECT_VOLATILE;
325 switch (msg_hash.algorithm()) {
326 case cvmfs::HASH_SHA1:
329 case cvmfs::HASH_RIPEMD160:
332 case cvmfs::HASH_SHAKE128:
339 if (msg_hash.digest().length() != digest_size)
341 memcpy(hash->
digest, msg_hash.digest().data(), digest_size);
350 case cvmfs::OBJECT_REGULAR:
352 case cvmfs::OBJECT_CATALOG:
355 case cvmfs::OBJECT_VOLATILE:
367 bool retval =
RecvHeader(&size, &has_attachment);
373 buffer = alloca(size);
375 buffer = smalloc(size);
377 if ((nbytes < 0) || (static_cast<uint32_t>(nbytes) != size)) {
384 uint32_t msg_size =
size;
385 if (has_attachment) {
394 msg_size = (*
reinterpret_cast<unsigned char *
>(buffer))
395 + ((*(
reinterpret_cast<unsigned char *
>(buffer) + 1)) << 8);
404 void *ptr_msg = has_attachment
415 if (has_attachment) {
417 if (frame->
att_size() < attachment_size) {
425 memcpy(frame->
attachment(), ptr_attachment, attachment_size);
440 if ((nbytes < 0) || (static_cast<unsigned>(nbytes) !=
kHeaderSize))
445 *size = header[1] + (header[2] << 8) + (header[3] << 16);
454 uint32_t total_size = msg_size + att_size
460 "sending message of size %u to cache transport", total_size);
464 header[1] = (total_size & 0x000000FF);
465 header[2] = (total_size & 0x0000FF00) >> 8;
466 header[3] = (total_size & 0x00FF0000) >> 16;
472 iov[0].iov_base = header;
476 inner_header[0] = (msg_size & 0x000000FF);
477 inner_header[1] = (msg_size & 0x0000FF00) >> 8;
478 iov[1].iov_base = inner_header;
480 iov[2].iov_base = message;
481 iov[2].iov_len = msg_size;
482 iov[3].iov_base = attachment;
483 iov[3].iov_len = att_size;
485 iov[1].iov_base = message;
486 iov[1].iov_len = msg_size;
496 "failed to write to external cache transport (%d), aborting", errno);
502 unsigned total_size = 0;
503 for (
unsigned i = 0; i < iovcnt; ++i)
504 total_size += iov[i].iov_len;
505 unsigned char *buffer =
reinterpret_cast<unsigned char *
>(alloca(total_size));
508 for (
unsigned i = 0; i < iovcnt; ++i) {
509 memcpy(buffer + pos, iov[i].iov_base, iov[i].iov_len);
510 pos += iov[i].iov_len;
513 int retval = send(
fd_connection_, buffer, total_size, MSG_DONTWAIT);
515 assert(errno != EMSGSIZE);
518 "failed to write to external cache transport (%d), aborting",
526 cvmfs::MsgRpc *msg_rpc = frame->
GetMsgRpc();
527 int32_t
size = msg_rpc->ByteSize();
530 void *buffer = smalloc(size);
532 void *buffer = alloca(size);
534 bool retval = msg_rpc->SerializeToArray(buffer, size);
static const unsigned kMaxStackAlloc
static const int kLabelCatalog
static const unsigned char kWireProtocolVersion
void SendNonBlocking(struct iovec *iov, unsigned iovcnt)
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
void FillObjectType(int object_flags, cvmfs::EnumObjectType *wire_type)
unsigned char digest[digest_size_]
static const uint32_t kFlagSendIgnoreFailure
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
void MergeFrom(const Frame &other)
static const uint32_t kMaxMsgSize
ssize_t SafeRead(int fd, void *buf, size_t nbyte)
void * attachment() const
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
cvmfs::MsgRpc * GetMsgRpc()
uint32_t att_size() const
void set_att_size(uint32_t size)
static const unsigned char kFlagHasAttachment
google::protobuf::MessageLite * GetMsgTyped()
void SendData(void *message, uint32_t msg_size, void *attachment=NULL, uint32_t att_size=0)
static const int kLabelVolatile
bool ParseMsgRpc(void *buffer, uint32_t size)
bool RecvFrame(Frame *frame)
static const unsigned kInnerHeaderSize
void Reset(uint32_t original_att_size)
const unsigned kDigestSizes[]
static const unsigned kHeaderSize
int fd_connection() const
bool SafeWriteV(int fd, struct iovec *iov, unsigned iovcnt)
static const char * kEnvReadyNotifyFd
static const uint32_t kFlagSendNonBlocking
bool ParseObjectType(cvmfs::EnumObjectType wire_type, int *object_flags)
CacheTransport(int fd_connection)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
bool RecvHeader(uint32_t *size, bool *has_attachment)