Motr  M0
service.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
24 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC
25 #include "lib/errno.h"
26 #include "lib/trace.h"
27 #include "lib/tlist.h"
28 #include "lib/time.h" /* m0_time_from_now */
29 #include "lib/bob.h"
30 #include "lib/memory.h" /* m0_alloc() */
31 #include "rpc/link.h"
32 #include "rpc/service.h"
33 #include "rpc/rev_conn.h"
34 #include "rpc/rpc_internal.h"
35 #include "reqh/reqh.h"
36 #include "reqh/reqh_service.h"
37 #include "ha/note_fops.h" /* m0_ha_state_fop_init */
38 
45 M0_TL_DESCR_DEFINE(rev_conn, "Reverse Connections", static,
46  struct m0_reverse_connection, rcf_linkage, rcf_magic,
48 M0_TL_DEFINE(rev_conn, static, struct m0_reverse_connection);
49 
50 static const struct m0_bob_type rpc_svc_bob = {
51  .bt_name = "rpc service",
52  .bt_magix_offset = offsetof(struct m0_rpc_service, rps_magix),
53  .bt_magix = M0_RPC_SERVICE_MAGIC,
54  .bt_check = NULL
55 };
57 
58 static bool rev_conn_disconnected_cb(struct m0_clink *link);
59 
61 {
62  struct m0_rpc_service *svc;
63 
65  rev_conn_tlist_init(&svc->rps_rev_conns);
66 
67  return 0;
68 }
69 
71 {
72  struct m0_rpc_service *svc;
73 
76  rev_conn_tlist_fini(&svc->rps_rev_conns);
77 }
78 
80 {
81  struct m0_rpc_service *svc;
82 
84  m0_rpc_service_bob_fini(svc);
85  m0_free(svc);
86 }
87 
88 static int
90 {
91  return 0;
92 }
93 
94 static const struct m0_reqh_service_ops rpc_ops = {
96  .rso_stop = rpc_service_stop,
97  .rso_fini = rpc_service_fini,
98  .rso_fop_accept = rpc_service_fop_accept
99 };
100 
102  const struct m0_reqh_service_type *stype)
103 {
104  struct m0_rpc_service *svc;
105 
106  M0_PRE(stype != NULL && service != NULL);
107 
108  M0_ALLOC_PTR(svc);
109  if (svc == NULL)
110  return M0_ERR(-ENOMEM);
111 
112  m0_rpc_service_bob_init(svc);
113  svc->rps_svc.rs_ops = &rpc_ops;
114  *service = &svc->rps_svc;
115  return 0;
116 }
117 
120 };
121 
123  .rst_name = "rpcservice",
124  .rst_ops = &rpc_service_type_ops,
125  .rst_level = M0_RPC_SVC_LEVEL,
126  .rst_keep_alive = true,
127 };
128 M0_EXPORTED(m0_rpc_service_type);
129 
130 M0_INTERNAL int m0_rpc_service_register(void)
131 {
134 }
135 
136 M0_INTERNAL void m0_rpc_service_unregister(void)
137 {
140 }
141 
142 M0_INTERNAL struct m0_rpc_session *
144  const struct m0_rpc_item *item)
145 {
146 
147  struct m0_reverse_connection *revc;
148  struct m0_rpc_service *svc;
149  const struct m0_rpc_item_header2 *header;
150 
152  header = &item->ri_header;
153  svc = bob_of(service, struct m0_rpc_service, rps_svc, &rpc_svc_bob);
154  if (header->osr_sender_id == SENDER_ID_INVALID)
155  revc = m0_tl_find(rev_conn, revc, &svc->rps_rev_conns,
157  &header->osr_uuid) == 0);
158  else
159  revc = m0_tl_find(rev_conn, revc, &svc->rps_rev_conns,
161  header->osr_sender_id);
162  return revc == NULL ? NULL : &revc->rcf_rlink.rlk_sess;
163 }
164 
165 M0_INTERNAL int
167  const struct m0_rpc_item *item,
168  struct m0_clink *clink,
169  struct m0_rpc_session **session)
170 {
171  int rc;
172  const char *rem_ep;
173  struct m0_reverse_connection *revc;
174  struct m0_rpc_service *svc;
175 
176  M0_ENTRY();
178 
181 
182  M0_ALLOC_PTR(revc);
183  if (revc != NULL) {
185  rc = m0_rpc_link_init(&revc->rcf_rlink,
186  item->ri_rmachine, NULL, rem_ep,
188  if (rc == 0) {
191  clink);
192  *session = &revc->rcf_rlink.rlk_sess;
193  rev_conn_tlink_init_at_tail(revc, &svc->rps_rev_conns);
194  }
195  } else
196  rc = M0_ERR(-ENOMEM);
197  return M0_RC(rc);
198 }
199 
200 static void rev_conn_free(struct m0_sm_group *grp, struct m0_sm_ast *ast)
201 {
202  struct m0_reverse_connection *revc = ast->sa_datum;
203 
204  M0_ENTRY("revc=%p", revc);
205  m0_rpc_link_fini(&revc->rcf_rlink);
206  rev_conn_tlink_del_fini(revc);
207  m0_free(revc);
208  M0_LEAVE("revc=%p", revc);
209 }
210 
211 static bool rev_conn_disconnected_cb(struct m0_clink *link)
212 {
213  struct m0_reverse_connection *revc;
214 
215  revc = container_of(link, struct m0_reverse_connection, rcf_disc_wait);
216  revc->rcf_free_ast = (struct m0_sm_ast){
217  .sa_cb = &rev_conn_free,
218  .sa_datum = revc,
219  };
220  m0_sm_ast_post(m0_locality0_get()->lo_grp, &revc->rcf_free_ast);
221  return true;
222 }
223 
224 M0_INTERNAL void
226 {
227  struct m0_rpc_link *rlk;
228  struct m0_reverse_connection *revc;
229 
230  M0_ENTRY();
231 
232  rlk = container_of(sess, struct m0_rpc_link, rlk_sess);
233  revc = container_of(rlk, struct m0_reverse_connection, rcf_rlink);
234 
235  if (revc->rcf_rlink.rlk_connected) {
240  revc->rcf_disc_wait.cl_is_oneshot = true;
243  &revc->rcf_disc_wait);
244  }
245 
246  M0_LEAVE();
247 }
248 
249 M0_INTERNAL void
251 {
252  struct m0_rpc_service *svc;
253  struct m0_reverse_connection *revc;
254 
255  M0_ENTRY();
257 
258  /*
259  * We take the lock here on locality0 to avoid races with
260  * rev_conn_free() AST which runs in locality0.
261  */
263  svc = bob_of(service, struct m0_rpc_service, rps_svc, &rpc_svc_bob);
264  m0_tl_for(rev_conn, &svc->rps_rev_conns, revc) {
266  false);
267  } m0_tlist_endfor;
268  m0_tl_teardown(rev_conn, &svc->rps_rev_conns, revc) {
269  if (revc->rcf_disc_wait.cl_group != NULL) {
270  m0_chan_wait(&revc->rcf_disc_wait);
272  }
273  m0_rpc_link_fini(&revc->rcf_rlink);
274  rev_conn_tlink_fini(revc);
275  m0_free(revc);
276  }
278 
279  M0_LEAVE();
280 }
281 
283 {
284  return container_of(session, struct m0_rpc_link, rlk_sess)->rlk_rc;
285 }
286 
287 M0_INTERNAL struct m0_reqh_service *
289 {
291 }
292 
293 M0_INTERNAL int m0_rpc_service_start(struct m0_reqh *reqh)
294 {
295  return reqh->rh_rpc_service == NULL ?
298  0;
299 }
300 
301 M0_INTERNAL void m0_rpc_service_stop(struct m0_reqh *reqh)
302 {
303  if (m0_reqh_rpc_mach_tlist_is_empty(&reqh->rh_rpc_machines)) {
306  }
307 }
308 
309 #undef M0_TRACE_SUBSYSTEM
310 
312 /*
313  * Local variables:
314  * c-indentation-style: "K&R"
315  * c-basic-offset: 8
316  * tab-width: 8
317  * fill-column: 80
318  * scroll-step: 1
319  * End:
320  */
M0_INTERNAL int m0_uint128_cmp(const struct m0_uint128 *u0, const struct m0_uint128 *u1)
Definition: misc.c:45
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
#define M0_PRE(cond)
static void rev_conn_free(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: service.c:200
int(* rso_start)(struct m0_reqh_service *service)
Definition: reqh_service.h:361
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
struct chs_dev disc
Definition: net_test.c:96
M0_INTERNAL void m0_ha_state_fop_fini(void)
Definition: note_fops.c:37
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
static int rpc_service_allocate(struct m0_reqh_service **service, const struct m0_reqh_service_type *stype)
Definition: service.c:101
static struct m0_sm_group * grp
Definition: bytecount.c:38
M0_LEAVE()
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
M0_INTERNAL int m0_ha_state_fop_init(void)
Definition: note_fops.c:44
static void rpc_service_fini(struct m0_reqh_service *service)
Definition: service.c:79
M0_INTERNAL void m0_rpc_service_reverse_session_put(struct m0_rpc_session *sess, bool disc)
Definition: service.c:225
M0_TL_DESCR_DEFINE(dopr, "dtm0_process", static, struct dtm0_process, dop_link, dop_magic, M0_DTM0_PROC_MAGIC, M0_DTM0_PROC_HEAD_MAGIC)
static const struct m0_reqh_service_type_ops rpc_service_type_ops
Definition: service.c:118
M0_INTERNAL bool m0_clink_is_armed(const struct m0_clink *link)
Definition: chan.c:303
Definition: sm.h:504
#define container_of(ptr, type, member)
Definition: misc.h:33
static struct m0_rpc_session session
Definition: formation2.c:38
static struct m0_rpc_item * item
Definition: item.c:56
struct m0_reqh_service * rh_rpc_service
Definition: reqh.h:140
int m0_reqh_service_type_register(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:473
const char * bt_name
Definition: bob.h:73
M0_INTERNAL void m0_rpc_service_reverse_sessions_cleanup(struct m0_reqh_service *service)
Definition: service.c:250
static void rpc_service_stop(struct m0_reqh_service *service)
Definition: service.c:70
return M0_RC(rc)
#define M0_ENTRY(...)
Definition: trace.h:170
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
Definition: sm.c:96
static const struct m0_reqh_service_ops rpc_ops
Definition: service.c:94
return M0_ERR(-EOPNOTSUPP)
void * sa_datum
Definition: sm.h:508
uint64_t c_sender_id
Definition: conn.h:269
M0_INTERNAL struct m0_reqh_service * m0_reqh_rpc_service_find(struct m0_reqh *reqh)
Definition: service.c:288
static const struct socktype stype[]
Definition: sock.c:1156
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
M0_INTERNAL void m0_reqh_service_quit(struct m0_reqh_service *svc)
Definition: reqh_service.c:588
M0_INTERNAL int m0_rpc_session_status(struct m0_rpc_session *session)
Definition: service.c:282
struct m0_rpc_item_header2 ri_header
Definition: item.h:193
const char * rst_name
Definition: reqh_service.h:448
#define bob_of(ptr, type, field, bt)
Definition: bob.h:140
struct m0_tl rh_rpc_machines
Definition: reqh.h:135
static bool rev_conn_disconnected_cb(struct m0_clink *link)
Definition: service.c:211
M0_INTERNAL void m0_rpc_service_stop(struct m0_reqh *reqh)
Definition: service.c:301
M0_INTERNAL struct m0_rpc_session * m0_rpc_service_reverse_session_lookup(struct m0_reqh_service *service, const struct m0_rpc_item *item)
Definition: service.c:143
Definition: reqh.h:94
static const struct m0_bob_type rpc_svc_bob
Definition: service.c:50
static struct m0_clink clink[RDWR_REQUEST_MAX]
M0_INTERNAL int m0_reqh_service_setup(struct m0_reqh_service **out, struct m0_reqh_service_type *stype, struct m0_reqh *reqh, struct m0_reqh_context *rctx, const struct m0_fid *fid)
Definition: reqh_service.c:565
struct m0_reqh_service rps_svc
Definition: service.h:39
M0_TL_DEFINE(dopr, static, struct dtm0_process)
struct m0_uint128 c_uuid
Definition: conn.h:272
struct m0_rpc_link rcf_rlink
Definition: rev_conn.h:45
M0_BOB_DEFINE(static, &dtm0_service_bob, m0_dtm0_service)
M0_INTERNAL void m0_rpc_service_unregister(void)
Definition: service.c:136
M0_INTERNAL struct m0_reqh_service * m0_reqh_service_find(const struct m0_reqh_service_type *st, const struct m0_reqh *reqh)
Definition: reqh_service.c:538
struct m0_reqh reqh
Definition: rm_foms.c:48
int(* rsto_service_allocate)(struct m0_reqh_service **service, const struct m0_reqh_service_type *stype)
Definition: reqh_service.h:436
struct m0_reqh_service_type m0_rpc_service_type
Definition: service.c:122
M0_INTERNAL struct m0_locality * m0_locality0_get(void)
Definition: locality.c:169
const struct m0_reqh_service_type * rs_type
Definition: reqh_service.h:228
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
M0_INTERNAL int m0_rpc_service_reverse_session_get(struct m0_reqh_service *service, const struct m0_rpc_item *item, struct m0_clink *clink, struct m0_rpc_session **session)
Definition: service.c:166
static struct m0_net_test_service svc
Definition: service.c:34
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
static struct m0_fop * fop
Definition: item.c:57
#define m0_tlist_endfor
Definition: tlist.h:448
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
Definition: sm.c:83
static int rpc_service_fop_accept(struct m0_reqh_service *service, struct m0_fop *fop)
Definition: service.c:89
struct m0_rpc_machine * ri_rmachine
Definition: item.h:160
struct m0_clink rcf_disc_wait
Definition: rev_conn.h:48
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
struct m0_sm_ast rcf_free_ast
Definition: rev_conn.h:49
void m0_free(void *data)
Definition: memory.c:146
void m0_reqh_service_type_unregister(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:490
M0_INTERNAL int m0_rpc_service_start(struct m0_reqh *reqh)
Definition: service.c:293
static struct m0_reqh_service * service[REQH_IN_UT_MAX]
Definition: long_lock_ut.c:46
int32_t rc
Definition: trigger_fop.h:47
M0_INTERNAL int m0_rpc_service_register(void)
Definition: service.c:130
#define offsetof(typ, memb)
Definition: misc.h:29
static int rpc_service_start(struct m0_reqh_service *service)
Definition: service.c:60
Definition: fop.h:80
M0_INTERNAL const char * m0_rpc_item_remote_ep_addr(const struct m0_rpc_item *item)
Definition: item.c:1188