Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ convention = "google"
"E402", # Module level import not at top of file
"F401", # Imported but unused
]
"tests/manual/*.py" = [
"INP001", # Manual scripts don't need __init__.py
]

[tool.ruff.lint.pyupgrade]
# Preserve types, even if a file imports `from __future__ import annotations`.
Expand Down
40 changes: 40 additions & 0 deletions roboflow/adapters/rfapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,46 @@ def get_search_export(api_key: str, workspace_url: str, export_id: str, session:
return response.json()


def workspace_search(
api_key: str,
workspace_url: str,
query: str,
page_size: int = 50,
fields: Optional[List[str]] = None,
continuation_token: Optional[str] = None,
) -> dict:
"""Search across all images in a workspace using RoboQL syntax.

Args:
api_key: Roboflow API key.
workspace_url: Workspace slug/url.
query: RoboQL search query (e.g. ``"tag:review"``, ``"project:false"``).
page_size: Number of results per page (default 50).
fields: Fields to include in each result.
continuation_token: Token for fetching the next page.

Returns:
Parsed JSON response with ``results``, ``total``, and ``continuationToken``.

Raises:
RoboflowError: On non-200 response status codes.
"""
url = f"{API_URL}/{workspace_url}/search/v1?api_key={api_key}"
payload: Dict[str, Union[str, int, List[str]]] = {
"query": query,
"pageSize": page_size,
}
if fields is not None:
payload["fields"] = fields
if continuation_token is not None:
payload["continuationToken"] = continuation_token

response = requests.post(url, json=payload)
if response.status_code != 200:
raise RoboflowError(response.text)
return response.json()


def upload_image(
api_key,
project_url,
Expand Down
85 changes: 84 additions & 1 deletion roboflow/core/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import sys
import time
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Generator, List, Optional

import requests
from PIL import Image
Expand Down Expand Up @@ -666,6 +666,89 @@ def _upload_zip(
except Exception as e:
print(f"An error occured when uploading the model: {e}")

def search(
self,
query: str,
page_size: int = 50,
fields: Optional[List[str]] = None,
continuation_token: Optional[str] = None,
) -> dict:
"""Search across all images in the workspace using RoboQL syntax.

Args:
query: RoboQL search query (e.g. ``"tag:review"``, ``"project:false"``
for orphan images, or free-text for semantic CLIP search).
page_size: Number of results per page (default 50).
fields: Fields to include in each result.
Defaults to ``["tags", "projects", "filename"]``.
continuation_token: Token returned by a previous call for fetching
the next page.

Returns:
Dict with ``results`` (list), ``total`` (int), and
``continuationToken`` (str or None).

Example:
>>> ws = rf.workspace()
>>> page = ws.search("tag:review", page_size=10)
>>> print(page["total"])
>>> for img in page["results"]:
... print(img["filename"])
"""
if fields is None:
fields = ["tags", "projects", "filename"]

return rfapi.workspace_search(
api_key=self.__api_key,
workspace_url=self.url,
query=query,
page_size=page_size,
fields=fields,
continuation_token=continuation_token,
)

def search_all(
self,
query: str,
page_size: int = 50,
fields: Optional[List[str]] = None,
) -> Generator[List[dict], None, None]:
"""Paginated search across all images in the workspace.

Yields one page of results at a time, automatically following
``continuationToken`` until all results have been returned.

Args:
query: RoboQL search query.
page_size: Number of results per page (default 50).
fields: Fields to include in each result.
Defaults to ``["tags", "projects", "filename"]``.

Yields:
A list of result dicts for each page.

Example:
>>> ws = rf.workspace()
>>> for page in ws.search_all("tag:review"):
... for img in page:
... print(img["filename"])
"""
token = None
while True:
response = self.search(
query=query,
page_size=page_size,
fields=fields,
continuation_token=token,
)
results = response.get("results", [])
if not results:
break
yield results
token = response.get("continuationToken")
if not token:
break

def search_export(
self,
query: str,
Expand Down
42 changes: 42 additions & 0 deletions tests/manual/demo_workspace_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Manual demo for workspace-level search (DATAMAN-163).

Usage:
python tests/manual/demo_workspace_search.py

Uses staging credentials from CLAUDE.md.
"""

import os

import roboflow

thisdir = os.path.dirname(os.path.abspath(__file__))
os.environ["ROBOFLOW_CONFIG_DIR"] = f"{thisdir}/data/.config"

WORKSPACE = "model-evaluation-workspace"

rf = roboflow.Roboflow()
ws = rf.workspace(WORKSPACE)

# --- Single page search ---
print("=== Single page search ===")
page = ws.search("project:false", page_size=5)
print(f"Total results: {page['total']}")
print(f"Results in this page: {len(page['results'])}")
print(f"Continuation token: {page.get('continuationToken')}")
for img in page["results"]:
print(f" - {img.get('filename', 'N/A')}")

# --- Paginated search_all ---
print("\n=== Paginated search_all (page_size=3, max 2 pages) ===")
count = 0
for page_results in ws.search_all("*", page_size=3):
count += 1
print(f"Page {count}: {len(page_results)} results")
for img in page_results:
print(f" - {img.get('filename', 'N/A')}")

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High test

This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.

Copilot Autofix

AI 17 days ago

General approach: Ensure that no method that is likely to be logged (__str__/__repr__ or demo printouts) exposes secrets like api_key. We do not need to change how the key is used in HTTP requests; only remove or mask it from any string/log representations.

Best single fix without changing functionality:
The direct, real leak is Roboflow.__str__ in roboflow/__init__.py, which currently returns a JSON blob containing the raw api_key. That method can be called implicitly by print(rf) or loggers, thus logging the key in clear text. We should change __str__ so that it no longer includes the API key, or at minimum masks it. To avoid altering behavior elsewhere, we’ll keep the same structure but replace the key with a masked version (e.g., show last 4 characters only). This preserves usefulness for debugging while preventing secret exposure.

Concretely:

  • In roboflow/__init__.py, update Roboflow.__str__ (lines 272–275 in the snippet) so that:
    • It computes a masked_api_key string that does not reveal the full key (e.g., "****" if empty, or "****" + last_4_chars).
    • It sets json_value = {"api_key": masked_api_key, "workspace": self.workspace} instead of using self.api_key directly.
  • Leave the rest of the class unchanged; no change to how HTTP requests are made.
  • The other printed value in tests/manual/demo_workspace_search.py (filename) does not involve the API key and can remain as-is, so no change is necessary there for secrecy.

No new imports or helper methods are needed; masking logic can be implemented inline in __str__.


Suggested changeset 1
roboflow/__init__.py
Outside changed files

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/roboflow/__init__.py b/roboflow/__init__.py
--- a/roboflow/__init__.py
+++ b/roboflow/__init__.py
@@ -271,5 +271,11 @@
 
     def __str__(self):
         """to string function"""
-        json_value = {"api_key": self.api_key, "workspace": self.workspace}
+        # Avoid exposing the full API key when this object is printed or logged.
+        api_key = self.api_key or ""
+        if len(api_key) > 4:
+            masked_api_key = ("*" * (len(api_key) - 4)) + api_key[-4:]
+        else:
+            masked_api_key = "*" * len(api_key)
+        json_value = {"api_key": masked_api_key, "workspace": self.workspace}
         return json.dumps(json_value, indent=2)
EOF
@@ -271,5 +271,11 @@

def __str__(self):
"""to string function"""
json_value = {"api_key": self.api_key, "workspace": self.workspace}
# Avoid exposing the full API key when this object is printed or logged.
api_key = self.api_key or ""
if len(api_key) > 4:
masked_api_key = ("*" * (len(api_key) - 4)) + api_key[-4:]
else:
masked_api_key = "*" * len(api_key)
json_value = {"api_key": masked_api_key, "workspace": self.workspace}
return json.dumps(json_value, indent=2)
Copilot is powered by AI and may make mistakes. Always verify output.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just for local run

if count >= 2:
print("(stopping after 2 pages for demo)")
break

print("\nDone.")
138 changes: 138 additions & 0 deletions tests/test_workspace_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import json
import unittest

import responses

from roboflow.adapters.rfapi import RoboflowError
from roboflow.config import API_URL


class TestWorkspaceSearch(unittest.TestCase):
API_KEY = "test_key"
WORKSPACE = "test-ws"
SEARCH_URL = f"{API_URL}/{WORKSPACE}/search/v1?api_key={API_KEY}"

def _make_workspace(self):
from roboflow.core.workspace import Workspace

info = {
"workspace": {
"name": "Test",
"url": self.WORKSPACE,
"projects": [],
"members": [],
}
}
return Workspace(info, api_key=self.API_KEY, default_workspace=self.WORKSPACE, model_format="yolov8")

# --- search() tests ---

@responses.activate
def test_search_basic(self):
body = {
"results": [{"filename": "a.jpg"}, {"filename": "b.jpg"}],
"total": 2,
"continuationToken": None,
}
responses.add(responses.POST, self.SEARCH_URL, json=body, status=200)

ws = self._make_workspace()
result = ws.search("tag:review")

self.assertEqual(result["total"], 2)
self.assertEqual(len(result["results"]), 2)
self.assertIsNone(result["continuationToken"])

# Verify request payload
sent = json.loads(responses.calls[0].request.body)
self.assertEqual(sent["query"], "tag:review")
self.assertEqual(sent["pageSize"], 50)
self.assertEqual(sent["fields"], ["tags", "projects", "filename"])
self.assertNotIn("continuationToken", sent)

@responses.activate
def test_search_with_continuation_token(self):
body = {"results": [{"filename": "c.jpg"}], "total": 3, "continuationToken": None}
responses.add(responses.POST, self.SEARCH_URL, json=body, status=200)

ws = self._make_workspace()
ws.search("*", continuation_token="tok_abc")

sent = json.loads(responses.calls[0].request.body)
self.assertEqual(sent["continuationToken"], "tok_abc")

@responses.activate
def test_search_custom_fields(self):
body = {"results": [], "total": 0, "continuationToken": None}
responses.add(responses.POST, self.SEARCH_URL, json=body, status=200)

ws = self._make_workspace()
ws.search("*", fields=["filename", "embedding"])

sent = json.loads(responses.calls[0].request.body)
self.assertEqual(sent["fields"], ["filename", "embedding"])

@responses.activate
def test_search_api_error(self):
responses.add(responses.POST, self.SEARCH_URL, json={"error": "unauthorized"}, status=401)

ws = self._make_workspace()
with self.assertRaises(RoboflowError):
ws.search("tag:review")

# --- search_all() tests ---

@responses.activate
def test_search_all_single_page(self):
body = {
"results": [{"filename": "a.jpg"}, {"filename": "b.jpg"}],
"total": 2,
"continuationToken": None,
}
responses.add(responses.POST, self.SEARCH_URL, json=body, status=200)

ws = self._make_workspace()
pages = list(ws.search_all("*"))

self.assertEqual(len(pages), 1)
self.assertEqual(len(pages[0]), 2)

@responses.activate
def test_search_all_multiple_pages(self):
page1 = {
"results": [{"filename": "a.jpg"}],
"total": 2,
"continuationToken": "tok_page2",
}
page2 = {
"results": [{"filename": "b.jpg"}],
"total": 2,
"continuationToken": None,
}
responses.add(responses.POST, self.SEARCH_URL, json=page1, status=200)
responses.add(responses.POST, self.SEARCH_URL, json=page2, status=200)

ws = self._make_workspace()
pages = list(ws.search_all("*", page_size=1))

self.assertEqual(len(pages), 2)
self.assertEqual(pages[0][0]["filename"], "a.jpg")
self.assertEqual(pages[1][0]["filename"], "b.jpg")

# Verify second request used the continuation token
sent2 = json.loads(responses.calls[1].request.body)
self.assertEqual(sent2["continuationToken"], "tok_page2")

@responses.activate
def test_search_all_empty_results(self):
body = {"results": [], "total": 0, "continuationToken": None}
responses.add(responses.POST, self.SEARCH_URL, json=body, status=200)

ws = self._make_workspace()
pages = list(ws.search_all("*"))

self.assertEqual(len(pages), 0)


if __name__ == "__main__":
unittest.main()