int
hb_main(const char* conninfo)
{
PGconn *conn;
PGresult *res;
PGnotify *notify;
int nnotifies;
int checked;
int result;
char number[5];
char notify_buf[1024];
srand((unsigned)time(NULL));
/* Make a connection to the database */
conn = PQconnectdb(conninfo);
/* Check to see that the backend connection was successfully made */
if (PQstatus(conn) != CONNECTION_OK)
{
elog(WARNING, "Connection to database failed: %s",
PQerrorMessage(conn));
exit_nicely(conn);
}
/*
* Issue LISTEN command to enable notifications from the rule's NOTIFY.
*/
res = PQexec(conn, "LISTEN HB_SV");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
elog(WARNING, "LISTEN command failed: %s", PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
}
/*
* should PQclear PGresult whenever it is no longer needed to avoid memory
* leaks
*/
PQclear(res);
/* Set Secret Number */
memset(number, 0x00, 5);
create_random_number(number);
elog(LOG , "hb_worker: set secret number=%s\n", number);
/* Quit after four notifies are received. */
nnotifies = 0;
while (1)
{
/*
* Sleep until something happens on the connection. We use select(2)
* to wait for input, but you could also use poll() or similar
* facilities.
*/
int sock;
fd_set input_mask;
sock = PQsocket(conn);
if (sock < 0)
break; /* shouldn't happen */
FD_ZERO(&input_mask);
FD_SET(sock, &input_mask);
if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0)
{
elog(WARNING, "select() failed: %s\n", strerror(errno));
exit_nicely(conn);
}
/* Now check for input */
PQconsumeInput(conn);
while ((notify = PQnotifies(conn)) != NULL)
{
checked = check_number(notify->extra);
switch (checked) {
case NUMBER_COMMAND:
result = compare_numbers(number, notify->extra);
if (GET_HITS(result) == 4) {
// Notify Game Clear, and Set new number.
elog(LOG, "hb_worker: NOTIFY HB_CL,'4 Hit! Conguratulatoins!, next new game.'\n");
strcpy(notify_buf, "NOTIFY HB_CL,'4 Hit! Conguratulatoins!, next new game.'");
PQexec(conn, notify_buf);
create_random_number(number);
elog(LOG, "hb_worker: set secret number=%s\n", number);
} else {
// Notify Hit&blow
elog(LOG, "NOTIFY HB_CL,'%d Hit / %d Blow.'",
GET_HITS(result), GET_BLOWS(result));
sprintf(notify_buf, "NOTIFY HB_CL,'%d Hit / %d Blow.'",
GET_HITS(result), GET_BLOWS(result));
PQexec(conn, notify_buf);
}
break;
case START_COMMAND:
//.........这里部分代码省略.........
/* Complex checking of the query status */
int
pgsql_query_checkstatus(PGresult *res)
{
int status;
int nFields;
long long nTuples;
/*
For successful queries, there are two options: either the query returns
results or not. If it does not return data, but successfully completed,
then the status will be set to
PGRES_COMMAND_OK.
If success and returns tuples, then it will be set to
PGRES_TUPLES_OK
*/
status = PQresultStatus(res);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
/* Success but no results */
IDL_Message(IDL_M_NAMED_GENERIC, IDL_MSG_INFO, PQcmdStatus(res));
return(MYPG_NO_RESULT);
}
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
IDL_Message(IDL_M_NAMED_GENERIC, IDL_MSG_INFO,
PQresultErrorMessage(res));
switch (status)
{
/* No results */
case PGRES_COPY_IN: return(MYPG_NO_RESULT);
case PGRES_COPY_OUT: return(MYPG_NO_RESULT);
case PGRES_BAD_RESPONSE: return(MYPG_NO_RESULT);
case PGRES_NONFATAL_ERROR: return(MYPG_NO_RESULT);
/* An error */
case PGRES_FATAL_ERROR: return(MYPG_FATAL_ERROR);
default: break;
}
}
/* How many fields and rows did we return? */
nFields = PQnfields(res);
nTuples = PQntuples(res);
/* Result is empty, either an error or this query returns no data */
if (nTuples == 0)
{
if (nFields == 0)
{
/* Query returns no data. Try to print a message to clarify */
IDL_Message(IDL_M_NAMED_GENERIC, IDL_MSG_INFO, PQcmdStatus(res));
return(MYPG_NO_RESULT);
}
else
/* Probably nothing matched the query */
return(MYPG_NO_RESULT);
}
return(MYPG_SUCCESS);
}
/*
* Establish the connection to the primary server for XLOG streaming
*/
static bool
libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
{
char conninfo_repl[MAXCONNINFO + 18];
char *primary_sysid;
char standby_sysid[32];
TimeLineID primary_tli;
TimeLineID standby_tli;
PGresult *res;
char cmd[64];
/* Connect using deliberately undocumented parameter: replication */
snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);
streamConn = PQconnectdb(conninfo_repl);
if (PQstatus(streamConn) != CONNECTION_OK)
ereport(ERROR,
(errmsg("could not connect to the primary server: %s",
PQerrorMessage(streamConn))));
/*
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
ereport(ERROR,
(errmsg("could not receive database system identifier and timeline ID from "
"the primary server: %s",
PQerrorMessage(streamConn))));
}
if (PQnfields(res) != 2 || PQntuples(res) != 1)
{
int ntuples = PQntuples(res);
int nfields = PQnfields(res);
PQclear(res);
ereport(ERROR,
(errmsg("invalid response from primary server"),
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
ntuples, nfields)));
}
primary_sysid = PQgetvalue(res, 0, 0);
primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
/*
* Confirm that the system identifier of the primary is the same as ours.
*/
snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
GetSystemIdentifier());
if (strcmp(primary_sysid, standby_sysid) != 0)
{
PQclear(res);
ereport(ERROR,
(errmsg("database system identifier differs between the primary and standby"),
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
primary_sysid, standby_sysid)));
}
/*
* Confirm that the current timeline of the primary is the same as the
* recovery target timeline.
*/
standby_tli = GetRecoveryTargetTLI();
PQclear(res);
if (primary_tli != standby_tli)
ereport(ERROR,
(errmsg("timeline %u of the primary does not match recovery target timeline %u",
primary_tli, standby_tli)));
ThisTimeLineID = primary_tli;
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
PQclear(res);
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
PQerrorMessage(streamConn))));
}
PQclear(res);
justconnected = true;
ereport(LOG,
(errmsg("streaming replication successfully connected to primary")));
return true;
}
int pgQueryThread::execute()
{
rowsInserted = -1L;
if (!conn->conn)
return(raiseEvent(0));
wxCharBuffer queryBuf = query.mb_str(*conn->conv);
if (!queryBuf && !query.IsEmpty())
{
conn->SetLastResultError(NULL, _("The query could not be converted to the required encoding."));
return(raiseEvent(0));
}
if (!PQsendQuery(conn->conn, queryBuf))
{
conn->SetLastResultError(NULL);
conn->IsAlive();
return(raiseEvent(0));
}
int resultsRetrieved = 0;
PGresult *lastResult = 0;
while (true)
{
if (TestDestroy())
{
if (rc != -3)
{
if (!PQrequestCancel(conn->conn)) // could not abort; abort failed.
return(raiseEvent(-1));
rc = -3;
}
}
if (!PQconsumeInput(conn->conn))
return(raiseEvent(0));
if (PQisBusy(conn->conn))
{
Yield();
this->Sleep(10);
continue;
}
// If resultToRetrieve is given, the nth result will be returned,
// otherwise the last result set will be returned.
// all others are discarded
PGresult *res = PQgetResult(conn->conn);
if (!res)
break;
resultsRetrieved++;
if (resultsRetrieved == resultToRetrieve)
{
result = res;
insertedOid = PQoidValue(res);
if (insertedOid && insertedOid != (OID) - 1)
appendMessage(wxString::Format(_("Query inserted one row with OID %d.\n"), insertedOid));
else
appendMessage(wxString::Format(wxPLURAL("Query result with %d row will be returned.\n", "Query result with %d rows will be returned.\n",
PQntuples(result)), PQntuples(result)));
continue;
}
if (lastResult)
{
if (PQntuples(lastResult))
appendMessage(wxString::Format(wxPLURAL("Query result with %d row discarded.\n", "Query result with %d rows discarded.\n",
PQntuples(lastResult)), PQntuples(lastResult)));
PQclear(lastResult);
}
lastResult = res;
}
if (!result)
result = lastResult;
conn->SetLastResultError(result);
appendMessage(wxT("\n"));
rc = PQresultStatus(result);
insertedOid = PQoidValue(result);
if (insertedOid == (OID) - 1)
insertedOid = 0;
if (rc == PGRES_TUPLES_OK)
{
dataSet = new pgSet(result, conn, *conn->conv, conn->needColQuoting);
dataSet->MoveFirst();
}
else if (rc == PGRES_COMMAND_OK)
{
char *s = PQcmdTuples(result);
if (*s)
rowsInserted = atol(s);
}
else if (rc == PGRES_FATAL_ERROR)
{
appendMessage(conn->GetLastError() + wxT("\n"));
}
return(raiseEvent(1));
//.........这里部分代码省略.........
/*************************************************************************
*
* Function: sql_query
*
* Purpose: Issue a query to the database
*
*************************************************************************/
static sql_rcode_t sql_query(rlm_sql_handle_t * handle, UNUSED rlm_sql_config_t *config, char const *query)
{
rlm_sql_postgres_conn_t *conn = handle->conn;
int numfields = 0;
char *errorcode;
char *errormsg;
if (!conn->db) {
ERROR("rlm_sql_postgresql: Socket not connected");
return RLM_SQL_RECONNECT;
}
conn->result = PQexec(conn->db, query);
/*
* Returns a PGresult pointer or possibly a null pointer.
* A non-null pointer will generally be returned except in
* out-of-memory conditions or serious errors such as inability
* to send the command to the server. If a null pointer is
* returned, it should be treated like a PGRES_FATAL_ERROR
* result.
*/
if (!conn->result)
{
ERROR("rlm_sql_postgresql: PostgreSQL Query failed Error: %s",
PQerrorMessage(conn->db));
/* As this error COULD be a connection error OR an out-of-memory
* condition return value WILL be wrong SOME of the time regardless!
* Pick your poison....
*/
return RLM_SQL_RECONNECT;
} else {
ExecStatusType status = PQresultStatus(conn->result);
DEBUG("rlm_sql_postgresql: Status: %s", PQresStatus(status));
switch (status){
case PGRES_COMMAND_OK:
/*Successful completion of a command returning no data.*/
/*affected_rows function only returns
the number of affected rows of a command
returning no data...
*/
conn->affected_rows = affected_rows(conn->result);
DEBUG("rlm_sql_postgresql: query affected rows = %i", conn->affected_rows);
return 0;
break;
case PGRES_TUPLES_OK:
/*Successful completion of a command returning data (such as a SELECT or SHOW).*/
conn->cur_row = 0;
conn->affected_rows = PQntuples(conn->result);
numfields = PQnfields(conn->result); /*Check row storing functions..*/
DEBUG("rlm_sql_postgresql: query affected rows = %i , fields = %i", conn->affected_rows, numfields);
return 0;
break;
case PGRES_BAD_RESPONSE:
/*The server's response was not understood.*/
DEBUG("rlm_sql_postgresql: Bad Response From Server!!");
return -1;
break;
case PGRES_NONFATAL_ERROR:
/*A nonfatal error (a notice or warning) occurred. Possibly never returns*/
return -1;
break;
case PGRES_FATAL_ERROR:
#if defined(PG_DIAG_SQLSTATE) && defined(PG_DIAG_MESSAGE_PRIMARY)
/*A fatal error occurred.*/
errorcode = PQresultErrorField(conn->result, PG_DIAG_SQLSTATE);
errormsg = PQresultErrorField(conn->result, PG_DIAG_MESSAGE_PRIMARY);
DEBUG("rlm_sql_postgresql: Error %s", errormsg);
return check_fatal_error(errorcode);
#endif
break;
default:
/* FIXME: An unhandled error occurred.*/
/* PGRES_EMPTY_QUERY PGRES_COPY_OUT PGRES_COPY_IN */
return -1;
//.........这里部分代码省略.........
/*
* ExecQueryUsingCursor: run a SELECT-like query using a cursor
*
* This feature allows result sets larger than RAM to be dealt with.
*
* Returns true if the query executed successfully, false otherwise.
*
* If pset.timing is on, total query time (exclusive of result-printing) is
* stored into *elapsed_msec.
*/
static bool
ExecQueryUsingCursor(const char *query, double *elapsed_msec)
{
bool OK = true;
PGresult *results;
PQExpBufferData buf;
printQueryOpt my_popt = pset.popt;
FILE *queryFout_copy = pset.queryFout;
bool queryFoutPipe_copy = pset.queryFoutPipe;
bool started_txn = false;
bool did_pager = false;
int ntuples;
int fetch_count;
char fetch_cmd[64];
instr_time before,
after;
int flush_error;
*elapsed_msec = 0;
/* initialize print options for partial table output */
my_popt.topt.start_table = true;
my_popt.topt.stop_table = false;
my_popt.topt.prior_records = 0;
if (pset.timing)
INSTR_TIME_SET_CURRENT(before);
/* if we're not in a transaction, start one */
if (PQtransactionStatus(pset.db) == PQTRANS_IDLE)
{
results = PQexec(pset.db, "BEGIN");
OK = AcceptResult(results) &&
(PQresultStatus(results) == PGRES_COMMAND_OK);
PQclear(results);
if (!OK)
return false;
started_txn = true;
}
/* Send DECLARE CURSOR */
initPQExpBuffer(&buf);
appendPQExpBuffer(&buf, "DECLARE _psql_cursor NO SCROLL CURSOR FOR\n%s",
query);
results = PQexec(pset.db, buf.data);
OK = AcceptResult(results) &&
(PQresultStatus(results) == PGRES_COMMAND_OK);
PQclear(results);
termPQExpBuffer(&buf);
if (!OK)
goto cleanup;
if (pset.timing)
{
INSTR_TIME_SET_CURRENT(after);
INSTR_TIME_SUBTRACT(after, before);
*elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
}
/*
* In \gset mode, we force the fetch count to be 2, so that we will throw
* the appropriate error if the query returns more than one row.
*/
if (pset.gset_prefix)
fetch_count = 2;
else
fetch_count = pset.fetch_count;
snprintf(fetch_cmd, sizeof(fetch_cmd),
"FETCH FORWARD %d FROM _psql_cursor",
fetch_count);
/* prepare to write output to \g argument, if any */
if (pset.gfname)
{
/* keep this code in sync with PrintQueryTuples */
pset.queryFout = stdout; /* so it doesn't get closed */
/* open file/pipe */
if (!setQFout(pset.gfname))
{
pset.queryFout = queryFout_copy;
pset.queryFoutPipe = queryFoutPipe_copy;
OK = false;
goto cleanup;
}
}
/* clear any pre-existing error indication on the output stream */
//.........这里部分代码省略.........
/*
* Generate a new buffer id supposed to be unique for a given layer name
*/
buffer *ows_psql_generate_id(ows * o, buffer * layer_name)
{
ows_layer_node *ln;
buffer * id, *sql_id;
FILE *fp;
PGresult * res;
int i, seed_len;
char * seed = NULL;
assert(o);
assert(o->layers);
assert(layer_name);
/* Retrieve layer node pointer */
for (ln = o->layers->first ; ln ; ln = ln->next) {
if (ln->layer->name && ln->layer->storage
&& !strcmp(ln->layer->name->buf, layer_name->buf)) break;
}
assert(ln);
id = buffer_init();
/* If PK have a sequence in PostgreSQL database,
* retrieve next available sequence value
*/
if (ln->layer->storage->pkey_sequence) {
sql_id = buffer_init();
buffer_add_str(sql_id, "SELECT nextval('");
buffer_copy(sql_id, ln->layer->storage->pkey_sequence);
buffer_add_str(sql_id, "');");
res = ows_psql_exec(o, sql_id->buf);
buffer_free(sql_id);
if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1) {
buffer_add_str(id, (char *) PQgetvalue(res, 0, 0));
PQclear(res);
return id;
}
/* FIXME: Shouldn't we return an error there instead ? */
PQclear(res);
}
/*
* If we don't have a PostgreSQL Sequence, we will try to
* generate a pseudo random keystring using /dev/urandom
* Will so work only on somes/commons Unix system
*/
seed_len = 6;
seed = malloc(sizeof(char) * (seed_len * 3 + 1)); /* multiply by 3 to be able to deal
with hex2dec conversion */
assert(seed);
seed[0] = '\0';
fp = fopen("/dev/urandom","r");
if (fp) {
for (i=0 ; i<seed_len ; i++)
sprintf(seed,"%s%03d", seed, fgetc(fp));
fclose(fp);
buffer_add_str(id, seed);
free(seed);
return id;
}
free(seed);
/* Case where we not using PostgreSQL sequence,
* and OS don't have a /dev/urandom support
* This case don't prevent to produce ID collision
* Don't use it unless really no others choices !!!
*/
srand((int) (time(NULL) ^ rand() % 1000) + 42);
srand((rand() % 1000 ^ rand() % 1000) + 42);
buffer_add_int(id, rand());
return id;
}
/*
* PSQLexecWatch
*
* This function is used for \watch command to send the query to
* the server and print out the results.
*
* Returns 1 if the query executed successfully, 0 if it cannot be repeated,
* e.g., because of the interrupt, -1 on error.
*/
int
PSQLexecWatch(const char *query, const printQueryOpt *opt)
{
PGresult *res;
double elapsed_msec = 0;
instr_time before;
instr_time after;
if (!pset.db)
{
psql_error("You are currently not connected to a database.\n");
return 0;
}
SetCancelConn();
if (pset.timing)
INSTR_TIME_SET_CURRENT(before);
res = PQexec(pset.db, query);
ResetCancelConn();
if (!AcceptResult(res))
{
PQclear(res);
return 0;
}
if (pset.timing)
{
INSTR_TIME_SET_CURRENT(after);
INSTR_TIME_SUBTRACT(after, before);
elapsed_msec = INSTR_TIME_GET_MILLISEC(after);
}
/*
* If SIGINT is sent while the query is processing, the interrupt
* will be consumed. The user's intention, though, is to cancel
* the entire watch process, so detect a sent cancellation request and
* exit in this case.
*/
if (cancel_pressed)
{
PQclear(res);
return 0;
}
switch (PQresultStatus(res))
{
case PGRES_TUPLES_OK:
printQuery(res, opt, pset.queryFout, pset.logfile);
break;
case PGRES_COMMAND_OK:
fprintf(pset.queryFout, "%s\n%s\n\n", opt->title, PQcmdStatus(res));
break;
case PGRES_EMPTY_QUERY:
psql_error(_("\\watch cannot be used with an empty query\n"));
PQclear(res);
return -1;
case PGRES_COPY_OUT:
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
psql_error(_("\\watch cannot be used with COPY\n"));
PQclear(res);
return -1;
default:
psql_error(_("unexpected result status for \\watch\n"));
PQclear(res);
return -1;
}
PQclear(res);
fflush(pset.queryFout);
/* Possible microtiming output */
if (pset.timing)
printf(_("Time: %.3f ms\n"), elapsed_msec);
return 1;
}
请发表评论