Skip to content
Merged
13 changes: 13 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,19 @@ wanted staleness value. For example:
Note that the set option will be dropped when the connection is returned
back to the pool.

Request priority
~~~~~~~~~~~~~~~~~~~~~
In order to use Request Priorities feature in Cloud Spanner, SQLAlchemy provides an ``execution_options`` parameter:

.. code:: python

from google.cloud.spanner_v1 import RequestOptions

with engine.connect().execution_options(
request_priority=RequestOptions.Priority.PRIORITY_MEDIUM
) as connection:
connection.execute(select(["*"], from_obj=table)).fetchall()

DDL and transactions
~~~~~~~~~~~~~~~~~~~~

Expand Down
2 changes: 1 addition & 1 deletion create_test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def create_test_instance():
configs = list(CLIENT.list_instance_configs())
if not USE_EMULATOR:
# Filter out non "us" locations
configs = [config for config in configs if "europe-north1" in config.name]
configs = [config for config in configs if "us-south1" in config.name]

instance_config = configs[0].name
create_time = str(int(time.time()))
Expand Down
8 changes: 6 additions & 2 deletions google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,18 @@ def pre_exec(self):
"""
super(SpannerExecutionContext, self).pre_exec()

read_only = self.execution_options.get("read_only", None)
read_only = self.execution_options.get("read_only")
if read_only is not None:
self._dbapi_connection.connection.read_only = read_only

staleness = self.execution_options.get("staleness", None)
staleness = self.execution_options.get("staleness")
if staleness is not None:
self._dbapi_connection.connection.staleness = staleness

priority = self.execution_options.get("request_priority")
if priority is not None:
self._dbapi_connection.connection.request_priority = priority


class SpannerIdentifierPreparer(IdentifierPreparer):
"""Identifiers compiler.
Expand Down
18 changes: 17 additions & 1 deletion test/test_suite_13.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import time
from unittest import mock

from google.cloud.spanner_v1 import RequestOptions

import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy import inspect
Expand Down Expand Up @@ -1644,7 +1646,7 @@ def test_offset_only(self):
list(connection.execute(self._table.select().offset(offset)).fetchall())


class ExecutionOptionsStalenessTest(fixtures.TestBase):
class ExecutionOptionsTest(fixtures.TestBase):
"""
Check that `execution_options()` method correctly
sets parameters on the underlying DB API connection.
Expand Down Expand Up @@ -1680,6 +1682,20 @@ def test_staleness(self):
with engine.connect() as connection:
pass

def test_request_priority(self):
PRIORITY = RequestOptions.Priority.PRIORITY_MEDIUM
with self._engine.connect().execution_options(
request_priority=PRIORITY
) as connection:
connection.execute(select(["*"], from_obj=self._table)).fetchall()

with self._engine.connect() as connection:
assert connection.connection.request_priority is None

engine = create_engine("sqlite:///database")
with engine.connect() as connection:
pass


class TemporaryTableTest(fixtures.TestBase):
"""
Expand Down
33 changes: 33 additions & 0 deletions test/test_suite_14.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import pkg_resources
import pytest
import random
import time
from unittest import mock

from google.cloud.spanner_v1 import RequestOptions

import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy import inspect
Expand Down Expand Up @@ -2160,3 +2163,33 @@ def test_round_trip_none_as_json_null(self):
)
def test_round_trip_none_as_sql_null(self):
pass


class ExecutionOptionsRequestPriorotyTest(fixtures.TestBase):
def setUp(self):
self._engine = create_engine(get_db_url(), pool_size=1)
metadata = MetaData(bind=self._engine)

self._table = Table(
"execution_options2",
metadata,
Column("opt_id", Integer, primary_key=True),
Column("opt_name", String(16), nullable=False),
)

metadata.create_all(self._engine)
time.sleep(1)

def test_request_priority(self):
PRIORITY = RequestOptions.Priority.PRIORITY_MEDIUM
with self._engine.connect().execution_options(
request_priority=PRIORITY
) as connection:
connection.execute(select(["*"], from_obj=self._table)).fetchall()

with self._engine.connect() as connection:
assert connection.connection.request_priority is None

engine = create_engine("sqlite:///database")
with engine.connect() as connection:
pass