Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
534 views
in Technique[技术] by (71.8m points)

google bigquery - Delete bulk rows in Big Query table based on list of unique ids

So I tried to delete some rows in Big Query table with simple query like this:

client = bigquery.Client()
query = "DELETE FROM Sample_Dataset.Sample_Table WHERE Sample_Table_Id IN {}".format(primary_key_list)
query_job = client.query(query, location="us-west1")

Where primary_key_list is some python list contains a list of Sample_Table unique id like: [123, 124, 125, ...]

Things working good with small data I retrieve but when the primary_key_list grows, it gives me an error:

The query is too large. The maximum standard SQL query length is 1024.00K characters, including comments and white space characters.

I realize the query will be long enough to reach max query length, and after searching on stack overflow I found out there's a solution to use parameterized queries, so I change my code to this:

client = bigquery.Client()
query = "DELETE FROM Sample_Dataset.Sample_Table WHERE Sample_Table_Id IN UNNEST(@List_Sample_Table_Id)"
job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.ArrayQueryParameter("List_Sample_Table_Id", "INT64", primary_key_list),
    ]
)
query_job = client.query(query, job_config=job_config)

It stops giving me maximum standard SQL query length, but returns me another error exception, any idea to delete bulk rows from Big Query?

I don't know if this information is useful or not but, I'm running this python code on google cloud Dataproc, and this is just some function I recently added, before I add this function everything is working, and here's the log I've got from running delete using parameterized queries.

Traceback (most recent call last):
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 320, in _send_until_done
    return self.connection.send(data)
  File "/opt/conda/default/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1737, in send
    self._raise_ssl_error(self._ssl, result)
  File "/opt/conda/default/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1639, in _raise_ssl_error
    raise SysCallError(errno, errorcode.get(errno))
OpenSSL.SSL.SysCallError: (32, 'EPIPE')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py", line 600, in urlopen
    chunked=chunked)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py", line 354, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1244, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1290, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1239, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1065, in _send_output
    self.send(chunk)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 987, in send
    self.sock.sendall(data)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 331, in sendall
    sent = self._send_until_done(data[total_sent:total_sent + SSL_WRITE_BLOCKSIZE])
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 326, in _send_until_done
    raise SocketError(str(e))
OSError: (32, 'EPIPE')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/default/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py", line 638, in urlopen
    _stacktrace=sys.exc_info()[2])
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/util/retry.py", line 368, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/packages/six.py", line 685, in reraise
    raise value.with_traceback(tb)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py", line 600, in urlopen
    chunked=chunked)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py", line 354, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1244, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1290, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1239, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1065, in _send_output
    self.send(chunk)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 987, in send
    self.sock.sendall(data)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 331, in sendall
    sent = self._send_until_done(data[total_sent:total_sent + SSL_WRITE_BLOCKSIZE])
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 326, in _send_until_done
    raise SocketError(str(e))
urllib3.exceptions.ProtocolError: ('Connection aborted.', OSError("(32, 'EPIPE')"))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmp/a05a15822be04a9abdc1ba05e317bb2f/ItemHistory-get.py", line 92, in <module>
    delete_duplicate_pk()
  File "/tmp/a05a15822be04a9abdc1ba05e317bb2f/ItemHistory-get.py", line 84, in delete_duplicate_pk
    query_job = client.query(query, job_config=job_config, location="asia-southeast2")
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 2893, in query
    query_job._begin(retry=retry, timeout=timeout)
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/job/query.py", line 1069, in _begin
    super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/job/base.py", line 438, in _begin
    timeout=timeout,
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 643, in _call_api
    return call()
  File "/opt/conda/default/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func
    on_error=on_error,
  File "/opt/conda/default/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target
    return target()
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/_http.py", line 434, in api_request
    timeout=timeout,
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/_http.py", line 292, in _make_request
    method, url, headers, data, target_object, timeout=timeout
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/_http.py", line 330, in _do_request
    url=url, method=method, headers=headers, data=data, timeout=timeout
  File "/opt/conda/default/lib/python3.7/site-packages/google/auth/transport/requests.py", line 470, in request
    **kwargs
  File "/opt/conda/default/lib/python3.7/site-packages/requests/sessions.py", line 533, in request
    resp = self.send(prep, **send_kwargs)
  File "/opt/conda/default/lib/python3.7/site-packages/requests/sessions.py", line 646, in send
    r = adapter.send(request, **kwargs)
  File "/opt/conda/default/lib/python3.7/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', OSError("(32, 'EPIPE')"))
question from:https://stackoverflow.com/questions/65890018/delete-bulk-rows-in-big-query-table-based-on-list-of-unique-ids

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

As @blackbishop mention, you can try to upgrade requests to the latest version (in my case it solve the problem) but since I tried to update bulk rows (let's say 500.000+ row in Big Query, which every row have a unique id), turns out it gives me a timeout with small machine type Dataproc clusters that I used (if someone has a resource to try with better Dataproc cluster and success please feel free to edit this answer).

So I go with the Merge statement as documented here: https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement

The script would be like this to update existing rows (assuming I have the new data I retrieve and already load it to Staging_Sample_Dataset.Staging_Sample_Table):

def merge_data():
    client = bigquery.Client()
    query = """MERGE INTO Sample_Dataset.Sample_Table st
            USING Staging_Sample_Dataset.Staging_Sample_Table ss
            ON st.Sample_Table_Id = ss.Sample_Table_Id
            WHEN MATCHED UPDATE SET st.Your_Column = ss.Your_Column -- and the list goes on...
            WHEN NOT MATCHED THEN INSERT ROW
            """
    query_job = client.query(query, location="asia-southeast2")
    results = query_job.result()

or I could delete bulk rows and call another function to load after this function executed:

def bulk_delete():
    client = bigquery.Client()
    query = """MERGE INTO Sample_Dataset.Sample_Table st
            USING Staging_Sample_Dataset.Staging_Sample_Table sst
            ON st.Sample_Table_Id = sst.Sample_Table_Id
            WHEN MATCHED THEN DELETE
            """
    query_job = client.query(query, location="asia-southeast2")
    results = query_job.result()

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...