XRootD
Loading...
Searching...
No Matches
XrdThrottleFileSystemConfig.cc
Go to the documentation of this file.
1
2#include <fcntl.h>
3
5#include "XrdOuc/XrdOuca2x.hh"
6#include "XrdOuc/XrdOucEnv.hh"
8
11
12using namespace XrdThrottle;
13
14#define OFS_NAME "libXrdOfs.so"
15
16/*
17 * Note nothing in this file is thread-safe.
18 */
19
20static XrdSfsFileSystem *
21LoadFS(const std::string &fslib, XrdSysError &eDest, const std::string &config_file){
22 // Load the library
23 XrdSysPlugin ofsLib(&eDest, fslib.c_str(), "fslib", NULL);
25 if (fslib == OFS_NAME)
26 {
28 XrdSysLogger *lp,
29 const char *configfn,
30 XrdOucEnv *EnvInfo);
31
32 if (!(fs = XrdSfsGetDefaultFileSystem(0, eDest.logger(), config_file.c_str(), 0)))
33 {
34 eDest.Emsg("Config", "Unable to load OFS filesystem.");
35 }
36 }
37 else
38 {
39 XrdSfsFileSystem *(*ep)(XrdSfsFileSystem *, XrdSysLogger *, const char *);
40 if (!(ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *,XrdSysLogger *,const char *))
41 ofsLib.getPlugin("XrdSfsGetFileSystem")))
42 return NULL;
43 if (!(fs = (*ep)(0, eDest.logger(), config_file.c_str())))
44 {
45 eDest.Emsg("Config", "Unable to create file system object via", fslib.c_str());
46 return NULL;
47 }
48 }
49 ofsLib.Persist();
50
51 return fs;
52}
53
54namespace XrdThrottle {
57 XrdSysLogger *lp,
58 const char *configfn,
59 XrdOucEnv *envP)
60{
61 FileSystem* fs = NULL;
62 FileSystem::Initialize(fs, native_fs, lp, configfn, envP);
63 return fs;
64}
65}
66
67// Export the symbol necessary for this to be dynamically loaded.
68extern "C" {
71 XrdSysLogger *lp,
72 const char *configfn)
73{
74 return XrdSfsGetFileSystem_Internal(native_fs, lp, configfn, nullptr);
75}
76
79 XrdSysLogger *lp,
80 const char *configfn,
81 XrdOucEnv *envP)
82{
83 return XrdSfsGetFileSystem_Internal(native_fs, lp, configfn, envP);
84}
85}
86
89
90FileSystem* FileSystem::m_instance = 0;
91
92FileSystem::FileSystem()
93 : m_eroute(0), m_trace(&m_eroute), m_sfs_ptr(0), m_initialized(false), m_throttle(&m_eroute, &m_trace)
94{
95 myVersion = &XrdVERSIONINFOVAR(XrdSfsGetFileSystem);
96}
97
98FileSystem::~FileSystem() {}
99
100void
101FileSystem::Initialize(FileSystem *&fs,
102 XrdSfsFileSystem *native_fs,
103 XrdSysLogger *lp,
104 const char *configfn,
105 XrdOucEnv *envP)
106{
107 fs = NULL;
108 if (m_instance == NULL && !(m_instance = new FileSystem()))
109 {
110 return;
111 }
112 fs = m_instance;
113 if (!fs->m_initialized)
114 {
115 fs->m_config_file = configfn;
116 fs->m_eroute.logger(lp);
117 fs->m_eroute.Say("Initializing a Throttled file system.");
118 if (fs->Configure(fs->m_eroute, native_fs, envP))
119 {
120 fs->m_eroute.Say("Initialization of throttled file system failed.");
121 fs = NULL;
122 return;
123 }
124 fs->m_throttle.Init();
125 fs->m_initialized = true;
126 }
127}
128
129#define TS_Xeq(key, func) NoGo = (strcmp(key, var) == 0) ? func(Config) : 0
130int
132{
133 XrdOucEnv myEnv;
134 XrdOucStream Config(&m_eroute, getenv("XRDINSTANCE"), &myEnv, "(Throttle Config)> ");
135 int cfgFD;
136 if (m_config_file.length() == 0)
137 {
138 log.Say("No filename specified.");
139 return 1;
140 }
141 if ((cfgFD = open(m_config_file.c_str(), O_RDONLY)) < 0)
142 {
143 log.Emsg("Config", errno, "Unable to open configuration file", m_config_file.c_str());
144 return 1;
145 }
146 Config.Attach(cfgFD);
147 static const char *cvec[] = { "*** throttle (ofs) plugin config:", 0 };
148 Config.Capture(cvec);
149
150 std::string fslib = OFS_NAME;
151
152 char *var, *val;
153 int NoGo = 0;
154 while( (var = Config.GetMyFirstWord()) )
155 {
156 if (strcmp("throttle.fslib", var) == 0)
157 {
158 val = Config.GetWord();
159 if (!val || !val[0]) {log.Emsg("Config", "fslib not specified."); continue;}
160 fslib = val;
161 }
162 TS_Xeq("throttle.max_open_files", xmaxopen);
163 TS_Xeq("throttle.max_active_connections", xmaxconn);
164 TS_Xeq("throttle.throttle", xthrottle);
165 TS_Xeq("throttle.loadshed", xloadshed);
166 TS_Xeq("throttle.trace", xtrace);
167 if (NoGo)
168 {
169 log.Emsg("Config", "Throttle configuration failed.");
170 }
171 }
172
173 // Load the filesystem object.
174 m_sfs_ptr = native_fs ? native_fs : LoadFS(fslib, m_eroute, m_config_file);
175 if (!m_sfs_ptr) return 1;
176
177 // Overwrite the environment variable saying that throttling is the fslib.
178 XrdOucEnv::Export("XRDOFSLIB", fslib.c_str());
179
180 if (envP)
181 {
182 auto gstream = reinterpret_cast<XrdXrootdGStream*>(envP->GetPtr("Throttle.gStream*"));
183 log.Say("Config", "Throttle g-stream has", gstream ? "" : " NOT", " been configured via xrootd.mongstream directive");
184 m_throttle.SetMonitor(gstream);
185 }
186
187
188 return 0;
189}
190
191/******************************************************************************/
192/* x m a x o p e n */
193/******************************************************************************/
194
195/* Function: xmaxopen
196
197 Purpose: Parse the directive: throttle.max_open_files <limit>
198
199 <limit> maximum number of open file handles for a unique entity.
200
201 Output: 0 upon success or !0 upon failure.
202*/
203int
204FileSystem::xmaxopen(XrdOucStream &Config)
205{
206 auto val = Config.GetWord();
207 if (!val || val[0] == '\0')
208 {m_eroute.Emsg("Config", "Max open files not specified! Example usage: throttle.max_open_files 16000");}
209 long long max_open = -1;
210 if (XrdOuca2x::a2sz(m_eroute, "max open files value", val, &max_open, 1)) return 1;
211
212 m_throttle.SetMaxOpen(max_open);
213 return 0;
214}
215
216
217/******************************************************************************/
218/* x m a x c o n n */
219/******************************************************************************/
220
221/* Function: xmaxconn
222
223 Purpose: Parse the directive: throttle.max_active_connections <limit>
224
225 <limit> maximum number of connections with at least one open file for a given entity
226
227 Output: 0 upon success or !0 upon failure.
228*/
229int
230FileSystem::xmaxconn(XrdOucStream &Config)
231{
232 auto val = Config.GetWord();
233 if (!val || val[0] == '\0')
234 {m_eroute.Emsg("Config", "Max active cconnections not specified! Example usage: throttle.max_active_connections 4000");}
235 long long max_conn = -1;
236 if (XrdOuca2x::a2sz(m_eroute, "max active connections value", val, &max_conn, 1)) return 1;
237
238 m_throttle.SetMaxConns(max_conn);
239 return 0;
240}
241
242
243/******************************************************************************/
244/* x t h r o t t l e */
245/******************************************************************************/
246
247/* Function: xthrottle
248
249 Purpose: To parse the directive: throttle [data <drate>] [iops <irate>] [concurrency <climit>] [interval <rint>]
250
251 <drate> maximum bytes per second through the server.
252 <irate> maximum IOPS per second through the server.
253 <climit> maximum number of concurrent IO connections.
254 <rint> minimum interval in milliseconds between throttle re-computing.
255
256 Output: 0 upon success or !0 upon failure.
257*/
258int
259FileSystem::xthrottle(XrdOucStream &Config)
260{
261 long long drate = -1, irate = -1, rint = 1000, climit = -1;
262 char *val;
263
264 while ((val = Config.GetWord()))
265 {
266 if (strcmp("data", val) == 0)
267 {
268 if (!(val = Config.GetWord()))
269 {m_eroute.Emsg("Config", "data throttle limit not specified."); return 1;}
270 if (XrdOuca2x::a2sz(m_eroute,"data throttle value",val,&drate,1)) return 1;
271 }
272 else if (strcmp("iops", val) == 0)
273 {
274 if (!(val = Config.GetWord()))
275 {m_eroute.Emsg("Config", "IOPS throttle limit not specified."); return 1;}
276 if (XrdOuca2x::a2sz(m_eroute,"IOPS throttle value",val,&irate,1)) return 1;
277 }
278 else if (strcmp("rint", val) == 0)
279 {
280 if (!(val = Config.GetWord()))
281 {m_eroute.Emsg("Config", "recompute interval not specified."); return 1;}
282 if (XrdOuca2x::a2sp(m_eroute,"recompute interval value",val,&rint,10)) return 1;
283 }
284 else if (strcmp("concurrency", val) == 0)
285 {
286 if (!(val = Config.GetWord()))
287 {m_eroute.Emsg("Config", "Concurrency limit not specified."); return 1;}
288 if (XrdOuca2x::a2sz(m_eroute,"Concurrency limit value",val,&climit,1)) return 1;
289 }
290 else
291 {
292 m_eroute.Emsg("Config", "Warning - unknown throttle option specified", val, ".");
293 }
294 }
295
296 m_throttle.SetThrottles(drate, irate, climit, static_cast<float>(rint)/1000.0);
297 return 0;
298}
299
300/******************************************************************************/
301/* x l o a d s h e d */
302/******************************************************************************/
303
304/* Function: xloadshed
305
306 Purpose: To parse the directive: loadshed host <hostname> [port <port>] [frequency <freq>]
307
308 <hostname> hostname of server to shed load to. Required
309 <port> port of server to shed load to. Defaults to 1094
310 <freq> A value from 1 to 100 specifying how often to shed load
311 (1 = 1% chance; 100 = 100% chance; defaults to 10).
312
313 Output: 0 upon success or !0 upon failure.
314*/
315int FileSystem::xloadshed(XrdOucStream &Config)
316{
317 long long port = 0, freq = 0;
318 char *val;
319 std::string hostname;
320
321 while ((val = Config.GetWord()))
322 {
323 if (strcmp("host", val) == 0)
324 {
325 if (!(val = Config.GetWord()))
326 {m_eroute.Emsg("Config", "loadshed hostname not specified."); return 1;}
327 hostname = val;
328 }
329 else if (strcmp("port", val) == 0)
330 {
331 if (!(val = Config.GetWord()))
332 {m_eroute.Emsg("Config", "Port number not specified."); return 1;}
333 if (XrdOuca2x::a2sz(m_eroute,"Port number",val,&port,1, 65536)) return 1;
334 }
335 else if (strcmp("frequency", val) == 0)
336 {
337 if (!(val = Config.GetWord()))
338 {m_eroute.Emsg("Config", "Loadshed frequency not specified."); return 1;}
339 if (XrdOuca2x::a2sz(m_eroute,"Loadshed frequency",val,&freq,1,100)) return 1;
340 }
341 else
342 {
343 m_eroute.Emsg("Config", "Warning - unknown loadshed option specified", val, ".");
344 }
345 }
346
347 if (hostname.empty())
348 {
349 m_eroute.Emsg("Config", "must specify hostname for loadshed parameter.");
350 return 1;
351 }
352
353 m_throttle.SetLoadShed(hostname, port, freq);
354 return 0;
355}
356
357/******************************************************************************/
358/* x t r a c e */
359/******************************************************************************/
360
361/* Function: xtrace
362
363 Purpose: To parse the directive: trace <events>
364
365 <events> the blank separated list of events to trace. Trace
366 directives are cummalative.
367
368 Output: 0 upon success or 1 upon failure.
369*/
370
371int FileSystem::xtrace(XrdOucStream &Config)
372{
373 char *val;
374 static const struct traceopts {const char *opname; int opval;} tropts[] =
375 {
376 {"all", TRACE_ALL},
377 {"off", TRACE_NONE},
378 {"none", TRACE_NONE},
379 {"debug", TRACE_DEBUG},
380 {"iops", TRACE_IOPS},
381 {"bandwidth", TRACE_BANDWIDTH},
382 {"ioload", TRACE_IOLOAD},
383 {"files", TRACE_FILES},
384 {"connections",TRACE_CONNS},
385 };
386 int i, neg, trval = 0, numopts = sizeof(tropts)/sizeof(struct traceopts);
387
388 if (!(val = Config.GetWord()))
389 {
390 m_eroute.Emsg("Config", "trace option not specified");
391 return 1;
392 }
393 while (val)
394 {
395 if (!strcmp(val, "off"))
396 {
397 trval = 0;
398 }
399 else
400 {
401 if ((neg = (val[0] == '-' && val[1])))
402 {
403 val++;
404 }
405 for (i = 0; i < numopts; i++)
406 {
407 if (!strcmp(val, tropts[i].opname))
408 {
409 if (neg)
410 {
411 if (tropts[i].opval) trval &= ~tropts[i].opval;
412 else trval = TRACE_ALL;
413 }
414 else if (tropts[i].opval) trval |= tropts[i].opval;
415 else trval = TRACE_NONE;
416 break;
417 }
418 }
419 if (i >= numopts)
420 {
421 m_eroute.Say("Config warning: ignoring invalid trace option '", val, "'.");
422 }
423 }
424 val = Config.GetWord();
425 }
426 m_trace.What = trval;
427 return 0;
428}
429
#define TS_Xeq(x, m)
Definition XrdConfig.cc:157
static XrdSysError eDest(0,"crypto_")
XrdSfsFileSystem * XrdSfsGetDefaultFileSystem(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *EnvInfo)
Definition XrdOfsFS.cc:49
#define open
Definition XrdPosix.hh:76
static XrdSfsFileSystem * LoadFS(const std::string &fslib, XrdSysError &eDest, const std::string &config_file)
XrdSfsFileSystem * XrdSfsGetFileSystem2(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *envP)
XrdSfsFileSystem * XrdSfsGetFileSystem(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn)
XrdVERSIONINFO(XrdSfsGetFileSystem, FileSystem)
#define TRACE_IOLOAD
#define TRACE_BANDWIDTH
#define TRACE_FILES
#define TRACE_CONNS
#define TRACE_IOPS
#define TRACE_NONE
Definition XrdTrace.hh:34
#define TRACE_DEBUG
Definition XrdTrace.hh:36
#define TRACE_ALL
Definition XrdTrace.hh:35
static int Export(const char *Var, const char *Val)
Definition XrdOucEnv.cc:170
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:263
static int a2sp(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:213
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:257
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
XrdSysLogger * logger(XrdSysLogger *lp=0)
void * getPlugin(const char *pname, int optional=0)
void * Persist()
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
void SetMonitor(XrdXrootdGStream *gstream)
void SetMaxConns(unsigned long max_conns)
virtual int Configure(XrdSysError &, XrdSfsFileSystem *native_fs, XrdOucEnv *envP)
XrdCmsConfig Config
XrdSfsFileSystem * XrdSfsGetFileSystem_Internal(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *envP)