diff options
Diffstat (limited to 'meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch')
-rw-r--r-- | meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch | 494 |
1 files changed, 0 insertions, 494 deletions
diff --git a/meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch b/meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch deleted file mode 100644 index 42e181bb1..000000000 --- a/meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch +++ /dev/null | |||
@@ -1,494 +0,0 @@ | |||
1 | From 23e80b75508187baaa823a68ea019b72e0b2305c Mon Sep 17 00:00:00 2001 | ||
2 | From: Budai Laszlo <lbudai@balabit.hu> | ||
3 | Date: Tue, 12 Nov 2013 13:19:04 +0100 | ||
4 | Subject: [PATCH] afsql: afsql_dd_insert_db() refactor | ||
5 | |||
6 | Upstream-Status: Backport | ||
7 | |||
8 | A lot of the code that was previously in afsql_dd_insert_db() have been | ||
9 | extracted to smaller functions, and afsql_dd_insert_db() was rebuilt on | ||
10 | top of these. At the same time, memory leaks were plugged, and in case | ||
11 | of a transaction error, backlog rewinding has been fixed too, to not | ||
12 | loose messages since the last BEGIN command. | ||
13 | |||
14 | Signed-off-by: Juhasz Viktor <jviktor@balabit.hu> | ||
15 | Signed-off-by: Laszlo Budai <lbudai@balabit.hu> | ||
16 | --- | ||
17 | modules/afsql/afsql.c | 301 ++++++++++++++++++++++++++++++++------------------ | ||
18 | 1 file changed, 192 insertions(+), 109 deletions(-) | ||
19 | |||
20 | diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c | ||
21 | index 12f6aab..a6a8190 100644 | ||
22 | --- a/modules/afsql/afsql.c | ||
23 | +++ b/modules/afsql/afsql.c | ||
24 | @@ -456,24 +456,21 @@ afsql_dd_create_index(AFSqlDestDriver *s | ||
25 | * | ||
26 | * NOTE: This function can only be called from the database thread. | ||
27 | **/ | ||
28 | -static GString * | ||
29 | -afsql_dd_validate_table(AFSqlDestDriver *self, LogMessage *msg) | ||
30 | +static gboolean | ||
31 | +afsql_dd_validate_table(AFSqlDestDriver *self, GString *table) | ||
32 | { | ||
33 | - GString *query_string, *table; | ||
34 | + GString *query_string; | ||
35 | dbi_result db_res; | ||
36 | gboolean success = FALSE; | ||
37 | gint i; | ||
38 | |||
39 | - table = g_string_sized_new(32); | ||
40 | - log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table); | ||
41 | - | ||
42 | if (self->flags & AFSQL_DDF_DONT_CREATE_TABLES) | ||
43 | - return table; | ||
44 | + return TRUE; | ||
45 | |||
46 | afsql_dd_check_sql_identifier(table->str, TRUE); | ||
47 | |||
48 | if (g_hash_table_lookup(self->validated_tables, table->str)) | ||
49 | - return table; | ||
50 | + return TRUE; | ||
51 | |||
52 | query_string = g_string_sized_new(32); | ||
53 | g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table->str); | ||
54 | @@ -544,14 +541,9 @@ afsql_dd_validate_table(AFSqlDestDriver | ||
55 | /* we have successfully created/altered the destination table, record this information */ | ||
56 | g_hash_table_insert(self->validated_tables, g_strdup(table->str), GUINT_TO_POINTER(TRUE)); | ||
57 | } | ||
58 | - else | ||
59 | - { | ||
60 | - g_string_free(table, TRUE); | ||
61 | - table = NULL; | ||
62 | - } | ||
63 | g_string_free(query_string, TRUE); | ||
64 | |||
65 | - return table; | ||
66 | + return success; | ||
67 | } | ||
68 | |||
69 | /** | ||
70 | @@ -581,6 +573,20 @@ afsql_dd_begin_txn(AFSqlDestDriver *self | ||
71 | } | ||
72 | |||
73 | /** | ||
74 | + * afsql_dd_handle_transaction_error: | ||
75 | + * | ||
76 | + * Handle errors inside during a SQL transaction (e.g. INSERT or COMMIT failures). | ||
77 | + * | ||
78 | + * NOTE: This function can only be called from the database thread. | ||
79 | + **/ | ||
80 | +static void | ||
81 | +afsql_dd_handle_transaction_error(AFSqlDestDriver *self) | ||
82 | +{ | ||
83 | + log_queue_rewind_backlog(self->queue); | ||
84 | + self->flush_lines_queued = 0; | ||
85 | +} | ||
86 | + | ||
87 | +/** | ||
88 | * afsql_dd_begin_txn: | ||
89 | * | ||
90 | * Commit SQL transaction. | ||
91 | @@ -596,14 +602,14 @@ afsql_dd_commit_txn(AFSqlDestDriver *sel | ||
92 | if (success) | ||
93 | { | ||
94 | log_queue_ack_backlog(self->queue, self->flush_lines_queued); | ||
95 | + self->flush_lines_queued = 0; | ||
96 | } | ||
97 | else | ||
98 | { | ||
99 | - msg_notice("SQL transaction commit failed, rewinding backlog and starting again", | ||
100 | - NULL); | ||
101 | - log_queue_rewind_backlog(self->queue); | ||
102 | + msg_error("SQL transaction commit failed, rewinding backlog and starting again", | ||
103 | + NULL); | ||
104 | + afsql_dd_handle_transaction_error(self); | ||
105 | } | ||
106 | - self->flush_lines_queued = 0; | ||
107 | return success; | ||
108 | } | ||
109 | |||
110 | @@ -644,12 +650,13 @@ afsql_dd_set_dbd_opt_numeric(gpointer ke | ||
111 | } | ||
112 | |||
113 | static gboolean | ||
114 | -afsql_dd_connect(AFSqlDestDriver *self) | ||
115 | +afsql_dd_ensure_initialized_connection(AFSqlDestDriver *self) | ||
116 | { | ||
117 | if (self->dbi_ctx) | ||
118 | return TRUE; | ||
119 | |||
120 | self->dbi_ctx = dbi_conn_new(self->type); | ||
121 | + | ||
122 | if (!self->dbi_ctx) | ||
123 | { | ||
124 | msg_error("No such DBI driver", | ||
125 | @@ -659,10 +666,12 @@ afsql_dd_connect(AFSqlDestDriver *self) | ||
126 | } | ||
127 | |||
128 | dbi_conn_set_option(self->dbi_ctx, "host", self->host); | ||
129 | + | ||
130 | if (strcmp(self->type, "mysql")) | ||
131 | dbi_conn_set_option(self->dbi_ctx, "port", self->port); | ||
132 | else | ||
133 | dbi_conn_set_option_numeric(self->dbi_ctx, "port", atoi(self->port)); | ||
134 | + | ||
135 | dbi_conn_set_option(self->dbi_ctx, "username", self->user); | ||
136 | dbi_conn_set_option(self->dbi_ctx, "password", self->password); | ||
137 | dbi_conn_set_option(self->dbi_ctx, "dbname", self->database); | ||
138 | @@ -691,6 +700,7 @@ afsql_dd_connect(AFSqlDestDriver *self) | ||
139 | evt_tag_str("database", self->database), | ||
140 | evt_tag_str("error", dbi_error), | ||
141 | NULL); | ||
142 | + | ||
143 | return FALSE; | ||
144 | } | ||
145 | |||
146 | @@ -713,104 +723,145 @@ afsql_dd_connect(AFSqlDestDriver *self) | ||
147 | return TRUE; | ||
148 | } | ||
149 | |||
150 | -static gboolean | ||
151 | -afsql_dd_insert_fail_handler(AFSqlDestDriver *self, LogMessage *msg, | ||
152 | - LogPathOptions *path_options) | ||
153 | +static GString * | ||
154 | +afsql_dd_ensure_accessible_database_table(AFSqlDestDriver *self, LogMessage *msg) | ||
155 | { | ||
156 | - if (self->failed_message_counter < self->num_retries - 1) | ||
157 | - { | ||
158 | - log_queue_push_head(self->queue, msg, path_options); | ||
159 | - | ||
160 | - /* database connection status sanity check after failed query */ | ||
161 | - if (dbi_conn_ping(self->dbi_ctx) != 1) | ||
162 | - { | ||
163 | - const gchar *dbi_error; | ||
164 | - | ||
165 | - dbi_conn_error(self->dbi_ctx, &dbi_error); | ||
166 | - msg_error("Error, no SQL connection after failed query attempt", | ||
167 | - evt_tag_str("type", self->type), | ||
168 | - evt_tag_str("host", self->host), | ||
169 | - evt_tag_str("port", self->port), | ||
170 | - evt_tag_str("username", self->user), | ||
171 | - evt_tag_str("database", self->database), | ||
172 | - evt_tag_str("error", dbi_error), | ||
173 | - NULL); | ||
174 | - return FALSE; | ||
175 | - } | ||
176 | + GString *table = g_string_sized_new(32); | ||
177 | + log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table); | ||
178 | |||
179 | - self->failed_message_counter++; | ||
180 | - return FALSE; | ||
181 | + if (!afsql_dd_validate_table(self, table)) | ||
182 | + { | ||
183 | + /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */ | ||
184 | + msg_error("Error checking table, disconnecting from database, trying again shortly", | ||
185 | + evt_tag_int("time_reopen", self->time_reopen), | ||
186 | + NULL); | ||
187 | + g_string_free(table, TRUE); | ||
188 | + return NULL; | ||
189 | } | ||
190 | |||
191 | - msg_error("Multiple failures while inserting this record into the database, message dropped", | ||
192 | - evt_tag_int("attempts", self->num_retries), | ||
193 | - NULL); | ||
194 | - stats_counter_inc(self->dropped_messages); | ||
195 | - log_msg_drop(msg, path_options); | ||
196 | - self->failed_message_counter = 0; | ||
197 | - return TRUE; | ||
198 | + return table; | ||
199 | } | ||
200 | |||
201 | static GString * | ||
202 | -afsql_dd_construct_query(AFSqlDestDriver *self, GString *table, | ||
203 | - LogMessage *msg) | ||
204 | +afsql_dd_build_insert_command(AFSqlDestDriver *self, LogMessage *msg, GString *table) | ||
205 | { | ||
206 | - GString *value; | ||
207 | - GString *query_string; | ||
208 | - gint i; | ||
209 | + GString *insert_command = g_string_sized_new(256); | ||
210 | + GString *value = g_string_sized_new(512); | ||
211 | + gint i, j; | ||
212 | |||
213 | - value = g_string_sized_new(256); | ||
214 | - query_string = g_string_sized_new(512); | ||
215 | + g_string_printf(insert_command, "INSERT INTO %s (", table->str); | ||
216 | |||
217 | - g_string_printf(query_string, "INSERT INTO %s (", table->str); | ||
218 | for (i = 0; i < self->fields_len; i++) | ||
219 | { | ||
220 | - g_string_append(query_string, self->fields[i].name); | ||
221 | - if (i != self->fields_len - 1) | ||
222 | - g_string_append(query_string, ", "); | ||
223 | + if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL) | ||
224 | + { | ||
225 | + g_string_append(insert_command, self->fields[i].name); | ||
226 | + | ||
227 | + j = i + 1; | ||
228 | + while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT) | ||
229 | + j++; | ||
230 | + | ||
231 | + if (j < self->fields_len) | ||
232 | + g_string_append(insert_command, ", "); | ||
233 | + } | ||
234 | } | ||
235 | - g_string_append(query_string, ") VALUES ("); | ||
236 | + | ||
237 | + g_string_append(insert_command, ") VALUES ("); | ||
238 | |||
239 | for (i = 0; i < self->fields_len; i++) | ||
240 | { | ||
241 | gchar *quoted; | ||
242 | |||
243 | - if (self->fields[i].value == NULL) | ||
244 | - { | ||
245 | - /* the config used the 'default' value for this column -> the fields[i].value is NULL, use SQL default */ | ||
246 | - g_string_append(query_string, "DEFAULT"); | ||
247 | - } | ||
248 | - else | ||
249 | + if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL) | ||
250 | { | ||
251 | log_template_format(self->fields[i].value, msg, &self->template_options, LTZ_SEND, self->seq_num, NULL, value); | ||
252 | - | ||
253 | if (self->null_value && strcmp(self->null_value, value->str) == 0) | ||
254 | { | ||
255 | - g_string_append(query_string, "NULL"); | ||
256 | + g_string_append(insert_command, "NULL"); | ||
257 | } | ||
258 | else | ||
259 | { | ||
260 | dbi_conn_quote_string_copy(self->dbi_ctx, value->str, "ed); | ||
261 | if (quoted) | ||
262 | { | ||
263 | - g_string_append(query_string, quoted); | ||
264 | + g_string_append(insert_command, quoted); | ||
265 | free(quoted); | ||
266 | } | ||
267 | else | ||
268 | { | ||
269 | - g_string_append(query_string, "''"); | ||
270 | + g_string_append(insert_command, "''"); | ||
271 | } | ||
272 | } | ||
273 | - } | ||
274 | |||
275 | - if (i != self->fields_len - 1) | ||
276 | - g_string_append(query_string, ", "); | ||
277 | + j = i + 1; | ||
278 | + while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT) | ||
279 | + j++; | ||
280 | + if (j < self->fields_len) | ||
281 | + g_string_append(insert_command, ", "); | ||
282 | + } | ||
283 | } | ||
284 | - g_string_append(query_string, ")"); | ||
285 | + | ||
286 | + g_string_append(insert_command, ")"); | ||
287 | |||
288 | g_string_free(value, TRUE); | ||
289 | |||
290 | - return query_string; | ||
291 | + return insert_command; | ||
292 | +} | ||
293 | + | ||
294 | +static inline gboolean | ||
295 | +afsql_dd_is_transaction_handling_enabled(const AFSqlDestDriver *self) | ||
296 | +{ | ||
297 | + return self->flush_lines_queued != -1; | ||
298 | +} | ||
299 | + | ||
300 | +static inline gboolean | ||
301 | +afsql_dd_should_start_new_transaction(const AFSqlDestDriver *self) | ||
302 | +{ | ||
303 | + return self->flush_lines_queued == 0; | ||
304 | +} | ||
305 | + | ||
306 | +static inline gboolean | ||
307 | +afsql_dd_should_commit_transaction(const AFSqlDestDriver *self) | ||
308 | +{ | ||
309 | + return afsql_dd_is_transaction_handling_enabled(self) && self->flush_lines_queued == self->flush_lines; | ||
310 | +} | ||
311 | + | ||
312 | +static inline gboolean | ||
313 | +afsql_dd_handle_insert_row_error_depending_on_connection_availability(AFSqlDestDriver *self, | ||
314 | + LogMessage *msg, | ||
315 | + LogPathOptions *path_options) | ||
316 | +{ | ||
317 | + const gchar *dbi_error, *error_message; | ||
318 | + | ||
319 | + if (dbi_conn_ping(self->dbi_ctx) == 1) | ||
320 | + { | ||
321 | + log_queue_push_head(self->queue, msg, path_options); | ||
322 | + return TRUE; | ||
323 | + } | ||
324 | + | ||
325 | + if (afsql_dd_is_transaction_handling_enabled(self)) | ||
326 | + { | ||
327 | + error_message = "SQL connection lost in the middle of a transaction," | ||
328 | + " rewinding backlog and starting again"; | ||
329 | + afsql_dd_handle_transaction_error(self); | ||
330 | + } | ||
331 | + else | ||
332 | + { | ||
333 | + error_message = "Error, no SQL connection after failed query attempt"; | ||
334 | + log_queue_push_head(self->queue, msg, path_options); | ||
335 | + } | ||
336 | + | ||
337 | + dbi_conn_error(self->dbi_ctx, &dbi_error); | ||
338 | + msg_error(error_message, | ||
339 | + evt_tag_str("type", self->type), | ||
340 | + evt_tag_str("host", self->host), | ||
341 | + evt_tag_str("port", self->port), | ||
342 | + evt_tag_str("username", self->user), | ||
343 | + evt_tag_str("database", self->database), | ||
344 | + evt_tag_str("error", dbi_error), | ||
345 | + NULL); | ||
346 | + | ||
347 | + return FALSE; | ||
348 | } | ||
349 | |||
350 | /** | ||
351 | @@ -824,61 +875,93 @@ afsql_dd_construct_query(AFSqlDestDriver | ||
352 | static gboolean | ||
353 | afsql_dd_insert_db(AFSqlDestDriver *self) | ||
354 | { | ||
355 | - GString *table, *query_string; | ||
356 | + GString *table = NULL; | ||
357 | + GString *insert_command = NULL; | ||
358 | LogMessage *msg; | ||
359 | gboolean success; | ||
360 | LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; | ||
361 | |||
362 | - afsql_dd_connect(self); | ||
363 | + if (!afsql_dd_ensure_initialized_connection(self)) | ||
364 | + return FALSE; | ||
365 | |||
366 | - success = log_queue_pop_head(self->queue, &msg, &path_options, (self->flags & AFSQL_DDF_EXPLICIT_COMMITS), FALSE); | ||
367 | + /* connection established, try to insert a message */ | ||
368 | + success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, self->flags & AFSQL_DDF_EXPLICIT_COMMITS); | ||
369 | if (!success) | ||
370 | return TRUE; | ||
371 | |||
372 | msg_set_context(msg); | ||
373 | |||
374 | - table = afsql_dd_validate_table(self, msg); | ||
375 | + table = afsql_dd_ensure_accessible_database_table(self, msg); | ||
376 | + | ||
377 | if (!table) | ||
378 | { | ||
379 | - /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */ | ||
380 | - msg_error("Error checking table, disconnecting from database, trying again shortly", | ||
381 | - evt_tag_int("time_reopen", self->time_reopen), | ||
382 | - NULL); | ||
383 | - msg_set_context(NULL); | ||
384 | - g_string_free(table, TRUE); | ||
385 | - return afsql_dd_insert_fail_handler(self, msg, &path_options); | ||
386 | + success = FALSE; | ||
387 | + goto out; | ||
388 | } | ||
389 | |||
390 | - query_string = afsql_dd_construct_query(self, table, msg); | ||
391 | + if (afsql_dd_should_start_new_transaction(self) && !afsql_dd_begin_txn(self)) | ||
392 | + { | ||
393 | + success = FALSE; | ||
394 | + goto out; | ||
395 | + } | ||
396 | |||
397 | - if (self->flush_lines_queued == 0 && !afsql_dd_begin_txn(self)) | ||
398 | - return FALSE; | ||
399 | + insert_command = afsql_dd_build_insert_command(self, msg, table); | ||
400 | + success = afsql_dd_run_query(self, insert_command->str, FALSE, NULL); | ||
401 | |||
402 | - success = afsql_dd_run_query(self, query_string->str, FALSE, NULL); | ||
403 | if (success && self->flush_lines_queued != -1) | ||
404 | { | ||
405 | self->flush_lines_queued++; | ||
406 | |||
407 | - if (self->flush_lines && self->flush_lines_queued == self->flush_lines && !afsql_dd_commit_txn(self)) | ||
408 | - return FALSE; | ||
409 | + if (afsql_dd_should_commit_transaction(self) && !afsql_dd_commit_txn(self)) | ||
410 | + { | ||
411 | + /* Assuming that in case of error, the queue is rewound by afsql_dd_commit_txn() */ | ||
412 | + | ||
413 | + g_string_free(insert_command, TRUE); | ||
414 | + msg_set_context(NULL); | ||
415 | + | ||
416 | + return FALSE; | ||
417 | + } | ||
418 | } | ||
419 | |||
420 | - g_string_free(table, TRUE); | ||
421 | - g_string_free(query_string, TRUE); | ||
422 | + out: | ||
423 | + | ||
424 | + if (table != NULL) | ||
425 | + g_string_free(table, TRUE); | ||
426 | + | ||
427 | + if (insert_command != NULL) | ||
428 | + g_string_free(insert_command, TRUE); | ||
429 | |||
430 | msg_set_context(NULL); | ||
431 | |||
432 | - if (!success) | ||
433 | - return afsql_dd_insert_fail_handler(self, msg, &path_options); | ||
434 | + if (success) | ||
435 | + { | ||
436 | + log_msg_ack(msg, &path_options); | ||
437 | + log_msg_unref(msg); | ||
438 | + step_sequence_number(&self->seq_num); | ||
439 | + self->failed_message_counter = 0; | ||
440 | + } | ||
441 | + else | ||
442 | + { | ||
443 | + if (self->failed_message_counter < self->num_retries - 1) | ||
444 | + { | ||
445 | + if (!afsql_dd_handle_insert_row_error_depending_on_connection_availability(self, msg, &path_options)) | ||
446 | + return FALSE; | ||
447 | |||
448 | - /* we only ACK if each INSERT is a separate transaction */ | ||
449 | - if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0) | ||
450 | - log_msg_ack(msg, &path_options); | ||
451 | - log_msg_unref(msg); | ||
452 | - step_sequence_number(&self->seq_num); | ||
453 | - self->failed_message_counter = 0; | ||
454 | + self->failed_message_counter++; | ||
455 | + } | ||
456 | + else | ||
457 | + { | ||
458 | + msg_error("Multiple failures while inserting this record into the database, message dropped", | ||
459 | + evt_tag_int("attempts", self->num_retries), | ||
460 | + NULL); | ||
461 | + stats_counter_inc(self->dropped_messages); | ||
462 | + log_msg_drop(msg, &path_options); | ||
463 | + self->failed_message_counter = 0; | ||
464 | + success = TRUE; | ||
465 | + } | ||
466 | + } | ||
467 | |||
468 | - return TRUE; | ||
469 | + return success; | ||
470 | } | ||
471 | |||
472 | static void | ||
473 | @@ -895,7 +978,7 @@ afsql_dd_message_became_available_in_the | ||
474 | static void | ||
475 | afsql_dd_wait_for_suspension_wakeup(AFSqlDestDriver *self) | ||
476 | { | ||
477 | - /* we got suspended, probably because of a connection error, | ||
478 | + /* we got suspended, probably because of a connection error, | ||
479 | * during this time we only get wakeups if we need to be | ||
480 | * terminated. */ | ||
481 | if (!self->db_thread_terminate) | ||
482 | @@ -974,8 +1057,7 @@ afsql_dd_database_thread(gpointer arg) | ||
483 | |||
484 | afsql_dd_commit_txn(self); | ||
485 | } | ||
486 | - | ||
487 | - exit: | ||
488 | +exit: | ||
489 | afsql_dd_disconnect(self); | ||
490 | |||
491 | msg_verbose("Database thread finished", | ||
492 | -- | ||
493 | 1.8.4.1 | ||
494 | |||