From 23e80b75508187baaa823a68ea019b72e0b2305c Mon Sep 17 00:00:00 2001 From: Budai Laszlo Date: Tue, 12 Nov 2013 13:19:04 +0100 Subject: [PATCH] afsql: afsql_dd_insert_db() refactor Upstream-Status: Backport A lot of the code that was previously in afsql_dd_insert_db() have been extracted to smaller functions, and afsql_dd_insert_db() was rebuilt on top of these. At the same time, memory leaks were plugged, and in case of a transaction error, backlog rewinding has been fixed too, to not loose messages since the last BEGIN command. Signed-off-by: Juhasz Viktor Signed-off-by: Laszlo Budai --- modules/afsql/afsql.c | 301 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 192 insertions(+), 109 deletions(-) diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c index 12f6aab..a6a8190 100644 --- a/modules/afsql/afsql.c +++ b/modules/afsql/afsql.c @@ -456,24 +456,21 @@ afsql_dd_create_index(AFSqlDestDriver *s * * NOTE: This function can only be called from the database thread. **/ -static GString * -afsql_dd_validate_table(AFSqlDestDriver *self, LogMessage *msg) +static gboolean +afsql_dd_validate_table(AFSqlDestDriver *self, GString *table) { - GString *query_string, *table; + GString *query_string; dbi_result db_res; gboolean success = FALSE; gint i; - table = g_string_sized_new(32); - log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table); - if (self->flags & AFSQL_DDF_DONT_CREATE_TABLES) - return table; + return TRUE; afsql_dd_check_sql_identifier(table->str, TRUE); if (g_hash_table_lookup(self->validated_tables, table->str)) - return table; + return TRUE; query_string = g_string_sized_new(32); g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table->str); @@ -544,14 +541,9 @@ afsql_dd_validate_table(AFSqlDestDriver /* we have successfully created/altered the destination table, record this information */ g_hash_table_insert(self->validated_tables, g_strdup(table->str), GUINT_TO_POINTER(TRUE)); } - else - { - g_string_free(table, TRUE); - table = NULL; - } g_string_free(query_string, TRUE); - return table; + return success; } /** @@ -581,6 +573,20 @@ afsql_dd_begin_txn(AFSqlDestDriver *self } /** + * afsql_dd_handle_transaction_error: + * + * Handle errors inside during a SQL transaction (e.g. INSERT or COMMIT failures). + * + * NOTE: This function can only be called from the database thread. + **/ +static void +afsql_dd_handle_transaction_error(AFSqlDestDriver *self) +{ + log_queue_rewind_backlog(self->queue); + self->flush_lines_queued = 0; +} + +/** * afsql_dd_begin_txn: * * Commit SQL transaction. @@ -596,14 +602,14 @@ afsql_dd_commit_txn(AFSqlDestDriver *sel if (success) { log_queue_ack_backlog(self->queue, self->flush_lines_queued); + self->flush_lines_queued = 0; } else { - msg_notice("SQL transaction commit failed, rewinding backlog and starting again", - NULL); - log_queue_rewind_backlog(self->queue); + msg_error("SQL transaction commit failed, rewinding backlog and starting again", + NULL); + afsql_dd_handle_transaction_error(self); } - self->flush_lines_queued = 0; return success; } @@ -644,12 +650,13 @@ afsql_dd_set_dbd_opt_numeric(gpointer ke } static gboolean -afsql_dd_connect(AFSqlDestDriver *self) +afsql_dd_ensure_initialized_connection(AFSqlDestDriver *self) { if (self->dbi_ctx) return TRUE; self->dbi_ctx = dbi_conn_new(self->type); + if (!self->dbi_ctx) { msg_error("No such DBI driver", @@ -659,10 +666,12 @@ afsql_dd_connect(AFSqlDestDriver *self) } dbi_conn_set_option(self->dbi_ctx, "host", self->host); + if (strcmp(self->type, "mysql")) dbi_conn_set_option(self->dbi_ctx, "port", self->port); else dbi_conn_set_option_numeric(self->dbi_ctx, "port", atoi(self->port)); + dbi_conn_set_option(self->dbi_ctx, "username", self->user); dbi_conn_set_option(self->dbi_ctx, "password", self->password); dbi_conn_set_option(self->dbi_ctx, "dbname", self->database); @@ -691,6 +700,7 @@ afsql_dd_connect(AFSqlDestDriver *self) evt_tag_str("database", self->database), evt_tag_str("error", dbi_error), NULL); + return FALSE; } @@ -713,104 +723,145 @@ afsql_dd_connect(AFSqlDestDriver *self) return TRUE; } -static gboolean -afsql_dd_insert_fail_handler(AFSqlDestDriver *self, LogMessage *msg, - LogPathOptions *path_options) +static GString * +afsql_dd_ensure_accessible_database_table(AFSqlDestDriver *self, LogMessage *msg) { - if (self->failed_message_counter < self->num_retries - 1) - { - log_queue_push_head(self->queue, msg, path_options); - - /* database connection status sanity check after failed query */ - if (dbi_conn_ping(self->dbi_ctx) != 1) - { - const gchar *dbi_error; - - dbi_conn_error(self->dbi_ctx, &dbi_error); - msg_error("Error, no SQL connection after failed query attempt", - evt_tag_str("type", self->type), - evt_tag_str("host", self->host), - evt_tag_str("port", self->port), - evt_tag_str("username", self->user), - evt_tag_str("database", self->database), - evt_tag_str("error", dbi_error), - NULL); - return FALSE; - } + GString *table = g_string_sized_new(32); + log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table); - self->failed_message_counter++; - return FALSE; + if (!afsql_dd_validate_table(self, table)) + { + /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */ + msg_error("Error checking table, disconnecting from database, trying again shortly", + evt_tag_int("time_reopen", self->time_reopen), + NULL); + g_string_free(table, TRUE); + return NULL; } - msg_error("Multiple failures while inserting this record into the database, message dropped", - evt_tag_int("attempts", self->num_retries), - NULL); - stats_counter_inc(self->dropped_messages); - log_msg_drop(msg, path_options); - self->failed_message_counter = 0; - return TRUE; + return table; } static GString * -afsql_dd_construct_query(AFSqlDestDriver *self, GString *table, - LogMessage *msg) +afsql_dd_build_insert_command(AFSqlDestDriver *self, LogMessage *msg, GString *table) { - GString *value; - GString *query_string; - gint i; + GString *insert_command = g_string_sized_new(256); + GString *value = g_string_sized_new(512); + gint i, j; - value = g_string_sized_new(256); - query_string = g_string_sized_new(512); + g_string_printf(insert_command, "INSERT INTO %s (", table->str); - g_string_printf(query_string, "INSERT INTO %s (", table->str); for (i = 0; i < self->fields_len; i++) { - g_string_append(query_string, self->fields[i].name); - if (i != self->fields_len - 1) - g_string_append(query_string, ", "); + if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL) + { + g_string_append(insert_command, self->fields[i].name); + + j = i + 1; + while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT) + j++; + + if (j < self->fields_len) + g_string_append(insert_command, ", "); + } } - g_string_append(query_string, ") VALUES ("); + + g_string_append(insert_command, ") VALUES ("); for (i = 0; i < self->fields_len; i++) { gchar *quoted; - if (self->fields[i].value == NULL) - { - /* the config used the 'default' value for this column -> the fields[i].value is NULL, use SQL default */ - g_string_append(query_string, "DEFAULT"); - } - else + if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL) { log_template_format(self->fields[i].value, msg, &self->template_options, LTZ_SEND, self->seq_num, NULL, value); - if (self->null_value && strcmp(self->null_value, value->str) == 0) { - g_string_append(query_string, "NULL"); + g_string_append(insert_command, "NULL"); } else { dbi_conn_quote_string_copy(self->dbi_ctx, value->str, "ed); if (quoted) { - g_string_append(query_string, quoted); + g_string_append(insert_command, quoted); free(quoted); } else { - g_string_append(query_string, "''"); + g_string_append(insert_command, "''"); } } - } - if (i != self->fields_len - 1) - g_string_append(query_string, ", "); + j = i + 1; + while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT) + j++; + if (j < self->fields_len) + g_string_append(insert_command, ", "); + } } - g_string_append(query_string, ")"); + + g_string_append(insert_command, ")"); g_string_free(value, TRUE); - return query_string; + return insert_command; +} + +static inline gboolean +afsql_dd_is_transaction_handling_enabled(const AFSqlDestDriver *self) +{ + return self->flush_lines_queued != -1; +} + +static inline gboolean +afsql_dd_should_start_new_transaction(const AFSqlDestDriver *self) +{ + return self->flush_lines_queued == 0; +} + +static inline gboolean +afsql_dd_should_commit_transaction(const AFSqlDestDriver *self) +{ + return afsql_dd_is_transaction_handling_enabled(self) && self->flush_lines_queued == self->flush_lines; +} + +static inline gboolean +afsql_dd_handle_insert_row_error_depending_on_connection_availability(AFSqlDestDriver *self, + LogMessage *msg, + LogPathOptions *path_options) +{ + const gchar *dbi_error, *error_message; + + if (dbi_conn_ping(self->dbi_ctx) == 1) + { + log_queue_push_head(self->queue, msg, path_options); + return TRUE; + } + + if (afsql_dd_is_transaction_handling_enabled(self)) + { + error_message = "SQL connection lost in the middle of a transaction," + " rewinding backlog and starting again"; + afsql_dd_handle_transaction_error(self); + } + else + { + error_message = "Error, no SQL connection after failed query attempt"; + log_queue_push_head(self->queue, msg, path_options); + } + + dbi_conn_error(self->dbi_ctx, &dbi_error); + msg_error(error_message, + evt_tag_str("type", self->type), + evt_tag_str("host", self->host), + evt_tag_str("port", self->port), + evt_tag_str("username", self->user), + evt_tag_str("database", self->database), + evt_tag_str("error", dbi_error), + NULL); + + return FALSE; } /** @@ -824,61 +875,93 @@ afsql_dd_construct_query(AFSqlDestDriver static gboolean afsql_dd_insert_db(AFSqlDestDriver *self) { - GString *table, *query_string; + GString *table = NULL; + GString *insert_command = NULL; LogMessage *msg; gboolean success; LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; - afsql_dd_connect(self); + if (!afsql_dd_ensure_initialized_connection(self)) + return FALSE; - success = log_queue_pop_head(self->queue, &msg, &path_options, (self->flags & AFSQL_DDF_EXPLICIT_COMMITS), FALSE); + /* connection established, try to insert a message */ + success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, self->flags & AFSQL_DDF_EXPLICIT_COMMITS); if (!success) return TRUE; msg_set_context(msg); - table = afsql_dd_validate_table(self, msg); + table = afsql_dd_ensure_accessible_database_table(self, msg); + if (!table) { - /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */ - msg_error("Error checking table, disconnecting from database, trying again shortly", - evt_tag_int("time_reopen", self->time_reopen), - NULL); - msg_set_context(NULL); - g_string_free(table, TRUE); - return afsql_dd_insert_fail_handler(self, msg, &path_options); + success = FALSE; + goto out; } - query_string = afsql_dd_construct_query(self, table, msg); + if (afsql_dd_should_start_new_transaction(self) && !afsql_dd_begin_txn(self)) + { + success = FALSE; + goto out; + } - if (self->flush_lines_queued == 0 && !afsql_dd_begin_txn(self)) - return FALSE; + insert_command = afsql_dd_build_insert_command(self, msg, table); + success = afsql_dd_run_query(self, insert_command->str, FALSE, NULL); - success = afsql_dd_run_query(self, query_string->str, FALSE, NULL); if (success && self->flush_lines_queued != -1) { self->flush_lines_queued++; - if (self->flush_lines && self->flush_lines_queued == self->flush_lines && !afsql_dd_commit_txn(self)) - return FALSE; + if (afsql_dd_should_commit_transaction(self) && !afsql_dd_commit_txn(self)) + { + /* Assuming that in case of error, the queue is rewound by afsql_dd_commit_txn() */ + + g_string_free(insert_command, TRUE); + msg_set_context(NULL); + + return FALSE; + } } - g_string_free(table, TRUE); - g_string_free(query_string, TRUE); + out: + + if (table != NULL) + g_string_free(table, TRUE); + + if (insert_command != NULL) + g_string_free(insert_command, TRUE); msg_set_context(NULL); - if (!success) - return afsql_dd_insert_fail_handler(self, msg, &path_options); + if (success) + { + log_msg_ack(msg, &path_options); + log_msg_unref(msg); + step_sequence_number(&self->seq_num); + self->failed_message_counter = 0; + } + else + { + if (self->failed_message_counter < self->num_retries - 1) + { + if (!afsql_dd_handle_insert_row_error_depending_on_connection_availability(self, msg, &path_options)) + return FALSE; - /* we only ACK if each INSERT is a separate transaction */ - if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0) - log_msg_ack(msg, &path_options); - log_msg_unref(msg); - step_sequence_number(&self->seq_num); - self->failed_message_counter = 0; + self->failed_message_counter++; + } + else + { + msg_error("Multiple failures while inserting this record into the database, message dropped", + evt_tag_int("attempts", self->num_retries), + NULL); + stats_counter_inc(self->dropped_messages); + log_msg_drop(msg, &path_options); + self->failed_message_counter = 0; + success = TRUE; + } + } - return TRUE; + return success; } static void @@ -895,7 +978,7 @@ afsql_dd_message_became_available_in_the static void afsql_dd_wait_for_suspension_wakeup(AFSqlDestDriver *self) { - /* we got suspended, probably because of a connection error, + /* we got suspended, probably because of a connection error, * during this time we only get wakeups if we need to be * terminated. */ if (!self->db_thread_terminate) @@ -974,8 +1057,7 @@ afsql_dd_database_thread(gpointer arg) afsql_dd_commit_txn(self); } - - exit: +exit: afsql_dd_disconnect(self); msg_verbose("Database thread finished", -- 1.8.4.1