As of January 1, 2020 this library no longer supports Python 2 on the latest released version.
Library versions released prior to that date will continue to be available. For more information please
visit Python 2 support on Google Cloud.
Source code for google.cloud.spanner_v1.session
# Copyright 2016 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Wrapper for Cloud Spanner Session objects."""
from functools import total_ordering
import random
import time
from google.api_core.exceptions import Aborted
from google.api_core.exceptions import GoogleAPICallError
from google.api_core.exceptions import NotFound
from google.api_core.gapic_v1 import method
from google.rpc.error_details_pb2 import RetryInfo
from google.cloud.spanner_v1 import ExecuteSqlRequest
from google.cloud.spanner_v1 import CreateSessionRequest
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.snapshot import Snapshot
from google.cloud.spanner_v1.transaction import Transaction
DEFAULT_RETRY_TIMEOUT_SECS = 30
"""Default timeout used by :meth:`Session.run_in_transaction`."""
[docs]@total_ordering
class Session(object):
"""Representation of a Cloud Spanner Session.
We can use a :class:`Session` to:
* :meth:`create` the session
* Use :meth:`exists` to check for the existence of the session
* :meth:`drop` the session
:type database: :class:`~google.cloud.spanner_v1.database.Database`
:param database: The database to which the session is bound.
:type labels: dict (str -> str)
:param labels: (Optional) User-assigned labels for the session.
"""
_session_id = None
_transaction = None
def __init__(self, database, labels=None):
self._database = database
if labels is None:
labels = {}
self._labels = labels
def __lt__(self, other):
return self._session_id < other._session_id
@property
def session_id(self):
"""Read-only ID, set by the back-end during :meth:`create`."""
return self._session_id
@property
def labels(self):
"""User-assigned labels for the session.
:rtype: dict (str -> str)
:returns: the labels dict (empty if no labels were assigned.
"""
return self._labels
@property
def name(self):
"""Session name used in requests.
.. note::
This property will not change if ``session_id`` does not, but the
return value is not cached.
The session name is of the form
``"projects/../instances/../databases/../sessions/{session_id}"``
:rtype: str
:returns: The session name.
:raises ValueError: if session is not yet created
"""
if self._session_id is None:
raise ValueError("No session ID set by back-end")
return self._database.name + "/sessions/" + self._session_id
[docs] def create(self):
"""Create this session, bound to its database.
See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.Spanner.CreateSession
:raises ValueError: if :attr:`session_id` is already set.
"""
if self._session_id is not None:
raise ValueError("Session ID already set by back-end")
api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)
request = CreateSessionRequest(database=self._database.name)
if self._labels:
request.session.labels = self._labels
with trace_call("CloudSpanner.CreateSession", self, self._labels):
session_pb = api.create_session(request=request, metadata=metadata,)
self._session_id = session_pb.name.split("/")[-1]
[docs] def exists(self):
"""Test for the existence of this session.
See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.Spanner.GetSession
:rtype: bool
:returns: True if the session exists on the back-end, else False.
"""
if self._session_id is None:
return False
api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)
with trace_call("CloudSpanner.GetSession", self) as span:
try:
api.get_session(name=self.name, metadata=metadata)
if span:
span.set_attribute("session_found", True)
except NotFound:
if span:
span.set_attribute("session_found", False)
return False
return True
[docs] def delete(self):
"""Delete this session.
See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.Spanner.GetSession
:raises ValueError: if :attr:`session_id` is not already set.
:raises NotFound: if the session does not exist
"""
if self._session_id is None:
raise ValueError("Session ID not set by back-end")
api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)
with trace_call("CloudSpanner.DeleteSession", self):
api.delete_session(name=self.name, metadata=metadata)
[docs] def ping(self):
"""Ping the session to keep it alive by executing "SELECT 1".
:raises ValueError: if :attr:`session_id` is not already set.
"""
if self._session_id is None:
raise ValueError("Session ID not set by back-end")
api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)
request = ExecuteSqlRequest(session=self.name, sql="SELECT 1")
api.execute_sql(request=request, metadata=metadata)
[docs] def snapshot(self, **kw):
"""Create a snapshot to perform a set of reads with shared staleness.
See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.TransactionOptions.ReadOnly
:type kw: dict
:param kw: Passed through to
:class:`~google.cloud.spanner_v1.snapshot.Snapshot` ctor.
:rtype: :class:`~google.cloud.spanner_v1.snapshot.Snapshot`
:returns: a snapshot bound to this session
:raises ValueError: if the session has not yet been created.
"""
if self._session_id is None:
raise ValueError("Session has not been created.")
return Snapshot(self, **kw)
[docs] def read(self, table, columns, keyset, index="", limit=0):
"""Perform a ``StreamingRead`` API request for rows in a table.
:type table: str
:param table: name of the table from which to fetch data
:type columns: list of str
:param columns: names of columns to be retrieved
:type keyset: :class:`~google.cloud.spanner_v1.keyset.KeySet`
:param keyset: keys / ranges identifying rows to be retrieved
:type index: str
:param index: (Optional) name of index to use, rather than the
table's primary key
:type limit: int
:param limit: (Optional) maximum number of rows to return
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
return self.snapshot().read(table, columns, keyset, index, limit)
[docs] def execute_sql(
self,
sql,
params=None,
param_types=None,
query_mode=None,
query_options=None,
request_options=None,
retry=method.DEFAULT,
timeout=method.DEFAULT,
):
"""Perform an ``ExecuteStreamingSql`` API request.
:type sql: str
:param sql: SQL query statement
:type params: dict, {str -> column value}
:param params: values for parameter replacement. Keys must match
the names used in ``sql``.
:type param_types:
dict, {str -> :class:`~google.spanner.v1.types.TypeCode`}
:param param_types: (Optional) explicit types for one or more param
values; overrides default type detection on the
back-end.
:type query_mode:
:class:`~google.spanner.v1.types.ExecuteSqlRequest.QueryMode`
:param query_mode: Mode governing return of results / query plan. See:
`QueryMode <https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode>`_.
:type query_options:
:class:`~google.cloud.spanner_v1.types.ExecuteSqlRequest.QueryOptions`
or :class:`dict`
:param query_options: (Optional) Options that are provided for query plan stability.
:type request_options:
:class:`google.cloud.spanner_v1.types.RequestOptions`
:param request_options:
(Optional) Common options for this request.
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
:type timeout: float
:param timeout: (Optional) The timeout for this request.
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
return self.snapshot().execute_sql(
sql,
params,
param_types,
query_mode,
query_options=query_options,
request_options=request_options,
retry=retry,
timeout=timeout,
)
[docs] def batch(self):
"""Factory to create a batch for this session.
:rtype: :class:`~google.cloud.spanner_v1.batch.Batch`
:returns: a batch bound to this session
:raises ValueError: if the session has not yet been created.
"""
if self._session_id is None:
raise ValueError("Session has not been created.")
return Batch(self)
[docs] def transaction(self):
"""Create a transaction to perform a set of reads with shared staleness.
:rtype: :class:`~google.cloud.spanner_v1.transaction.Transaction`
:returns: a transaction bound to this session
:raises ValueError: if the session has not yet been created.
"""
if self._session_id is None:
raise ValueError("Session has not been created.")
if self._transaction is not None:
self._transaction.rolled_back = True
del self._transaction
txn = self._transaction = Transaction(self)
return txn
[docs] def run_in_transaction(self, func, *args, **kw):
"""Perform a unit of work in a transaction, retrying on abort.
:type func: callable
:param func: takes a required positional argument, the transaction,
and additional positional / keyword arguments as supplied
by the caller.
:type args: tuple
:param args: additional positional arguments to be passed to ``func``.
:type kw: dict
:param kw: (Optional) keyword arguments to be passed to ``func``.
If passed:
"timeout_secs" will be removed and used to
override the default retry timeout which defines maximum timestamp
to continue retrying the transaction.
"commit_request_options" will be removed and used to set the
request options for the commit request.
:rtype: Any
:returns: The return value of ``func``.
:raises Exception:
reraises any non-ABORT exceptions raised by ``func``.
"""
deadline = time.time() + kw.pop("timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS)
commit_request_options = kw.pop("commit_request_options", None)
transaction_tag = kw.pop("transaction_tag", None)
attempts = 0
while True:
if self._transaction is None:
txn = self.transaction()
txn.transaction_tag = transaction_tag
else:
txn = self._transaction
if txn._transaction_id is None:
txn.begin()
try:
attempts += 1
return_value = func(txn, *args, **kw)
except Aborted as exc:
del self._transaction
_delay_until_retry(exc, deadline, attempts)
continue
except GoogleAPICallError:
del self._transaction
raise
except Exception:
txn.rollback()
raise
try:
txn.commit(
return_commit_stats=self._database.log_commit_stats,
request_options=commit_request_options,
)
except Aborted as exc:
del self._transaction
_delay_until_retry(exc, deadline, attempts)
except GoogleAPICallError:
del self._transaction
raise
else:
if self._database.log_commit_stats and txn.commit_stats:
self._database.logger.info(
"CommitStats: {}".format(txn.commit_stats),
extra={"commit_stats": txn.commit_stats},
)
return return_value
# Rational: this function factors out complex shared deadline / retry
# handling from two `except:` clauses.
def _delay_until_retry(exc, deadline, attempts):
"""Helper for :meth:`Session.run_in_transaction`.
Detect retryable abort, and impose server-supplied delay.
:type exc: :class:`google.api_core.exceptions.Aborted`
:param exc: exception for aborted transaction
:type deadline: float
:param deadline: maximum timestamp to continue retrying the transaction.
:type attempts: int
:param attempts: number of call retries
"""
cause = exc.errors[0]
now = time.time()
if now >= deadline:
raise
delay = _get_retry_delay(cause, attempts)
if delay is not None:
if now + delay > deadline:
raise
time.sleep(delay)
def _get_retry_delay(cause, attempts):
"""Helper for :func:`_delay_until_retry`.
:type exc: :class:`grpc.Call`
:param exc: exception for aborted transaction
:rtype: float
:returns: seconds to wait before retrying the transaction.
:type attempts: int
:param attempts: number of call retries
"""
metadata = dict(cause.trailing_metadata())
retry_info_pb = metadata.get("google.rpc.retryinfo-bin")
if retry_info_pb is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_pb)
nanos = retry_info.retry_delay.nanos
return retry_info.retry_delay.seconds + nanos / 1.0e9
return 2 ** attempts + random.random()