Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
install:
uv sync --all-extras

update:
rm -f uv.lock
uv sync

test:
uv run --all-extras pytest

Expand Down
132 changes: 90 additions & 42 deletions datashield/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def close(self, save: str = None) -> None:
for conn in self.conns:
try:
if save:
conn.save_workspace(f"{conn.name}:{save}")
conn.save_workspace(f"{conn.get_name()}:{save}")
conn.disconnect()
except DSError:
# silently fail
Expand All @@ -162,7 +162,7 @@ def get_connection_names(self) -> list[str]:
:return: The list of opened connection names
"""
if self.conns:
return [conn.name for conn in self.conns]
return [conn.get_name() for conn in self.conns]
else:
return []

Expand Down Expand Up @@ -194,7 +194,53 @@ def tables(self) -> dict:
"""
rval = {}
for conn in self.conns:
rval[conn.name] = conn.list_tables()
rval[conn.get_name()] = conn.list_tables()
return rval

def variables(self, table: str = None, tables: dict = None) -> dict:
"""
List available variables from the data repository, for a given table.

:param table: The default name of the table to list variables for
:param tables: The name of the table to list variables for, per server name. If not defined, 'table' is used.
:return: The available variables from the data repository, for a given table, per remote server name
"""
rval = {}
for conn in self.conns:
name = table
if tables and conn.get_name() in tables:
name = tables[conn.get_name()]
if name:
rval[conn.get_name()] = conn.list_table_variables(name)
else:
rval[conn.get_name()] = None
return rval

def taxonomies(self) -> dict:
"""
List available taxonomies from the data repository. A taxonomy is a hierarchical structure of vocabulary
terms that can be used to annotate variables in the data repository.
Depending on the data repository's capabilities, taxonomies can be used to perform structured
queries when searching for variables.

:return: The available taxonomies from the data repository, per remote server name
"""
rval = {}
for conn in self.conns:
rval[conn.get_name()] = conn.list_taxonomies()
return rval

def search_variables(self, query: str) -> dict:
"""
Search for variable names matching a given query across all tables in the data repository.

:param query: The query to search for in variable names, e.g., a full-text search and/or structured
query (based on taxonomy terms), depending on the data repository's capabilities
:return: The matching variable names from the data repository, per remote server name
"""
rval = {}
for conn in self.conns:
rval[conn.get_name()] = conn.search_variables(query)
return rval

def resources(self) -> dict:
Expand All @@ -205,7 +251,7 @@ def resources(self) -> dict:
"""
rval = {}
for conn in self.conns:
rval[conn.name] = conn.list_resources()
rval[conn.get_name()] = conn.list_resources()
return rval

def profiles(self) -> dict:
Expand All @@ -216,7 +262,7 @@ def profiles(self) -> dict:
"""
rval = {}
for conn in self.conns:
rval[conn.name] = conn.list_profiles()
rval[conn.get_name()] = conn.list_profiles()
return rval

def packages(self) -> dict:
Expand All @@ -227,7 +273,7 @@ def packages(self) -> dict:
"""
rval = {}
for conn in self.conns:
rval[conn.name] = conn.list_packages()
rval[conn.get_name()] = conn.list_packages()
return rval

def methods(self, type: str = "aggregate") -> dict:
Expand All @@ -239,7 +285,7 @@ def methods(self, type: str = "aggregate") -> dict:
"""
rval = {}
for conn in self.conns:
rval[conn.name] = conn.list_methods(type)
rval[conn.get_name()] = conn.list_methods(type)
return rval

#
Expand All @@ -254,7 +300,7 @@ def workspaces(self) -> dict:
"""
rval = {}
for conn in self.conns:
rval[conn.name] = conn.list_workspaces()
rval[conn.get_name()] = conn.list_workspaces()
return rval

def workspace_save(self, name: str) -> None:
Expand All @@ -264,7 +310,7 @@ def workspace_save(self, name: str) -> None:
:param name: The name of the workspace
"""
for conn in self.conns:
conn.save_workspace(f"{conn.name}:{name}")
conn.save_workspace(f"{conn.get_name()}:{name}")

def workspace_restore(self, name: str) -> None:
"""
Expand All @@ -274,7 +320,7 @@ def workspace_restore(self, name: str) -> None:
:param name: The name of the workspace
"""
for conn in self.conns:
conn.restore_workspace(f"{conn.name}:{name}")
conn.restore_workspace(f"{conn.get_name()}:{name}")

def workspace_rm(self, name: str) -> None:
"""
Expand All @@ -284,7 +330,7 @@ def workspace_rm(self, name: str) -> None:
:param name: The name of the workspace
"""
for conn in self.conns:
conn.rm_workspace(f"{conn.name}:{name}")
conn.rm_workspace(f"{conn.get_name()}:{name}")

#
# R session
Expand Down Expand Up @@ -321,17 +367,17 @@ def sessions(self) -> dict:
if not conn.has_session():
conn.start_session(asynchronous=True)
except Exception as e:
logging.warning(f"Failed to start session: {conn.name} - {e}")
excluded_conns.append(conn.name)
logging.warning(f"Failed to start session: {conn.get_name()} - {e}")
excluded_conns.append(conn.get_name())

# check for session status and wait until all are started
for conn in [c for c in self.conns if c.name not in excluded_conns]:
for conn in [c for c in self.conns if c.get_name() not in excluded_conns]:
try:
if conn.is_session_started():
started_conns.append(conn.name)
started_conns.append(conn.get_name())
except Exception as e:
logging.warning(f"Failed to check session status: {conn.name} - {e}")
excluded_conns.append(conn.name)
logging.warning(f"Failed to check session status: {conn.get_name()} - {e}")
excluded_conns.append(conn.get_name())

# wait until all sessions are started, excluding those that have failed to start or check status
start_time = time.time()
Expand All @@ -340,23 +386,25 @@ def sessions(self) -> dict:
raise DSError("Timed out waiting for R sessions to start")
time.sleep(self.start_delay)
remaining_conns = [
conn for conn in self.conns if conn.name not in started_conns and conn.name not in excluded_conns
conn
for conn in self.conns
if conn.get_name() not in started_conns and conn.get_name() not in excluded_conns
]
for conn in remaining_conns:
try:
if conn.is_session_started():
started_conns.append(conn.name)
started_conns.append(conn.get_name())
except Exception as e:
logging.warning(f"Failed to check session status: {conn.name} - {e}")
excluded_conns.append(conn.name)
logging.warning(f"Failed to check session status: {conn.get_name()} - {e}")
excluded_conns.append(conn.get_name())

# at this point, all sessions that could be started have been started, and those that failed to start or check status have been excluded
for conn in self.conns:
if conn.name in started_conns:
rval[conn.name] = conn.get_session()
if conn.get_name() in started_conns:
rval[conn.get_name()] = conn.get_session()
if len(excluded_conns) > 0:
logging.error(f"Some sessions have been excluded due to errors: {', '.join(excluded_conns)}")
self.conns = [conn for conn in self.conns if conn.name not in excluded_conns]
self.conns = [conn for conn in self.conns if conn.get_name() not in excluded_conns]
if len(self.conns) == 0:
raise DSError("No sessions could be started successfully.")
return rval
Expand All @@ -372,10 +420,10 @@ def ls(self) -> dict:
rval = {}
for conn in self.conns:
try:
rval[conn.name] = conn.list_symbols()
rval[conn.get_name()] = conn.list_symbols()
except Exception as e:
self._append_error(conn, e)
rval[conn.name] = None
rval[conn.get_name()] = None
self._check_errors()
return rval

Expand Down Expand Up @@ -418,12 +466,12 @@ def assign_table(
cmd = {}
for conn in self.conns:
name = table
if tables and conn.name in tables:
name = tables[conn.name]
if tables and conn.get_name() in tables:
name = tables[conn.get_name()]
if name:
try:
res = conn.assign_table(symbol, name, variables, missings, identifiers, id_name, asynchronous)
cmd[conn.name] = res
cmd[conn.get_name()] = res
except Exception as e:
self._append_error(conn, e)
self._do_wait(cmd)
Expand All @@ -445,12 +493,12 @@ def assign_resource(
cmd = {}
for conn in self.conns:
name = resource
if resources and conn.name in resources:
name = resources[conn.name]
if resources and conn.get_name() in resources:
name = resources[conn.get_name()]
if name:
try:
res = conn.assign_resource(symbol, name, asynchronous)
cmd[conn.name] = res
cmd[conn.get_name()] = res
except Exception as e:
self._append_error(conn, e)
self._do_wait(cmd)
Expand All @@ -470,7 +518,7 @@ def assign_expr(self, symbol: str, expr: str, asynchronous: bool = True) -> None
for conn in self.conns:
try:
res = conn.assign_expr(symbol, expr, asynchronous)
cmd[conn.name] = res
cmd[conn.get_name()] = res
except Exception as e:
self._append_error(conn, e)
self._do_wait(cmd)
Expand All @@ -492,10 +540,10 @@ def aggregate(self, expr: str, asynchronous: bool = True) -> dict:
for conn in self.conns:
try:
res = conn.aggregate(expr, asynchronous)
cmd[conn.name] = res
cmd[conn.get_name()] = res
except Exception as e:
self._append_error(conn, e)
rval[conn.name] = None
rval[conn.get_name()] = None
rval = self._do_wait(cmd)
self._check_errors()
return rval
Expand All @@ -511,15 +559,15 @@ def _do_wait(self, cmd: dict) -> dict:
rval = {}
while cmd:
for conn in self.conns:
if conn.name in cmd:
res = cmd[conn.name]
# print(f"..checking {conn.name} -> {res.is_completed()}")
if conn.get_name() in cmd:
res = cmd[conn.get_name()]
# print(f"..checking {conn.get_name()} -> {res.is_completed()}")
if res.is_completed():
try:
rval[conn.name] = res.fetch()
rval[conn.get_name()] = res.fetch()
except Exception as e:
self._append_error(conn, e)
cmd.pop(conn.name, None)
cmd.pop(conn.get_name(), None)
else:
conn.keep_alive()
time.sleep(0.1)
Expand All @@ -535,8 +583,8 @@ def _append_error(self, conn: DSConnection, error: Exception) -> None:
"""
Append an error.
"""
logging.error(f"[{conn.name}] {error}")
self.errors[conn.name] = error
logging.error(f"[{conn.get_name()}] {error}")
self.errors[conn.get_name()] = error

def _check_errors(self) -> None:
"""
Expand Down
38 changes: 38 additions & 0 deletions datashield/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ class DSConnection:
Connection class to a DataSHIELD server.
"""

def get_name(self) -> str:
"""
Get the name of the connection, which is typically the name of the server or data repository.

:return: The name of the connection
"""
raise NotImplementedError("DSConnection function not available")

#
# Content listing
#
Expand All @@ -215,6 +223,36 @@ def has_table(self, name: str) -> bool:
"""
raise NotImplementedError("DSConnection function not available")

def list_table_variables(self, table: str) -> list:
"""
List available variables for a given table from the data repository.

:param table: The name of the table to list variables for
:return: The list of available variables for the given table
"""
raise NotImplementedError("DSConnection function not available")

def list_taxonomies(self) -> list:
"""
List available taxonomies from the data repository. A taxonomy is a hierarchical structure of vocabulary
terms that can be used to annotate variables in the data repository.
Depending on the data repository's capabilities, taxonomies can be used to perform structured
queries when searching for variables.

:return: The list of available taxonomy names
"""
raise NotImplementedError("DSConnection function not available")

def search_variables(self, query: str) -> dict:
"""
Search for variable names matching a given query across all tables in the data repository.

:param query: The query to search for in variable names, e.g., a full-text search and/or structured
query (based on taxonomy terms), depending on the data repository's capabilities
:return: The search result for variables matching the given query across all tables
"""
raise NotImplementedError("DSConnection function not available")

def list_resources(self) -> list:
"""
List available resource names from the data repository.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "datashield"
version = "0.3.0"
version = "0.4.0"
description = "DataSHIELD Client Interface in Python."
authors = [
{name = "Yannick Marcon", email = "yannick.marcon@obiba.org"}
Expand Down
Loading
Loading