I’m attempting to create a script using the Catalog API to reassign ownership of objects. I can traverse the catalog, but owner information is not in the response. For example, I can see the owner of virtual datasets in the UI, but I can’t seem to get the information from the API.
How can I retrieve owner information using the API?
Here is the sample Python code I’ve been playing with. I’m new to it as well.
import requests
import logging
import json
import argparse
import urllib3
# Suppress only the single InsecureRequestWarning from urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
def get_user_id(api_url, api_key, username):
logger.info(f'Fetching user ID for username: {username}')
headers = {'Authorization': f'Bearer {api_key}'}
try:
response = requests.get(f'{api_url}/api/v3/user/{username}', headers=headers, verify=False)
response.raise_for_status() # Raise an exception for HTTP errors
user_data = response.json()
return user_data.get('id')
except requests.exceptions.RequestException as e:
logger.error(f'Failed to fetch user ID for username {username}: {e}')
return None
def get_catalog_object(api_url, api_key, object_id):
headers = {'Authorization': f'Bearer {api_key}'}
try:
response = requests.get(f'{api_url}/api/v3/catalog/{object_id}', headers=headers, verify=False)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f'Failed to fetch catalog object {object_id}: {e}')
return None
def get_all_catalog_objects(api_url, api_key):
logger.info(f'Fetching all catalog objects from {api_url}')
headers = {'Authorization': f'Bearer {api_key}'}
objects = []
try:
response = requests.get(f'{api_url}/api/v3/catalog', headers=headers, verify=False)
response.raise_for_status() # Raise an exception for HTTP errors
objects.extend(response.json().get('data', []))
except requests.exceptions.RequestException as e:
logger.error(f'Failed to fetch catalog objects: {e}')
return objects
def get_objects_owned_by_users(api_url, api_key, users):
logger.info(f'Fetching objects owned by users: {users} from {api_url}')
all_objects = get_all_catalog_objects(api_url, api_key)
total_objects = len(all_objects)
objects = []
# Build a dictionary of user IDs
user_ids = {}
for user in users:
user_id = get_user_id(api_url, api_key, user)
if user_id:
user_ids[user_id.lower()] = user.lower()
else:
logger.error(f'Could not retrieve user ID for user {user}')
# Iterate over the catalog and check the owner for each object
for obj_index, obj in enumerate(all_objects):
obj_name = obj.get('path', 'N/A')
obj_owner = obj.get('owner', 'N/A')
logger.info(f'Checking object: Name={obj_name}, Owner={obj_owner}')
container_type = obj.get('containerType')
if container_type == 'SPACE':
space_objects = get_catalog_object(api_url, api_key, obj['id']).get('children', [])
objects.extend(drill_down_objects(api_url, api_key, space_objects, user_ids, total_objects))
elif container_type == 'FOLDER':
catalog_object = get_catalog_object(api_url, api_key, obj['id'])
if catalog_object:
folder_objects = catalog_object.get('children', [])
objects.extend(drill_down_objects(api_url, api_key, folder_objects, user_ids, total_objects, obj_index))
elif obj.get('owner') and obj.get('owner',{}).get('ownerId').lower() in user_ids:
objects.append(obj)
# Log progress
if obj_index % 10 == 0: # Log every 10 objects
progress = (obj_index + 1) / total_objects * 100
logger.info(f'Checked {obj_index + 1}/{total_objects} objects ({progress:.2f}%)')
return objects
def drill_down_objects(api_url, api_key, objects, user_ids, total_objects):
owned_objects = []
for obj in objects:
# obj_name = obj.get('path', 'N/A')
# obj_owner = obj.get('owner', 'N/A')
# logger.info(f'Checking object: Name={obj_name}, Owner={obj_owner}')
container_type = obj.get('containerType')
if container_type == 'FOLDER':
folder_objects = get_catalog_object(api_url, api_key, obj['id']).get('children', [])
owned_objects.extend(drill_down_objects(api_url, api_key, folder_objects, user_ids, total_objects))
elif container_type is None:
if obj.get('owner') in user_ids:
owned_objects.append(obj)
elif obj.get('owner') and obj.get('owner',{}).get('ownerId').lower() in user_ids:
owned_objects.append(obj)
# Log progress
# current_index += 1
# if current_index % 10 == 0: # Log every 10 objects
# progress = (current_index + 1) / total_objects * 100
# logger.info(f'Checked {current_index + 1}/{total_objects} objects ({progress:.2f}%)')
return owned_objects
def update_object_ownership(api_url, api_key, object_id, new_owner):
logger.info(f'Updating ownership of object {object_id} to {new_owner} at {api_url}')
headers = {'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json'}
data = json.dumps({'owner': new_owner})
try:
response = requests.put(f'{api_url}/api/v3/catalog/{object_id}', headers=headers, data=data, verify=False)
response.raise_for_status() # Raise an exception for HTTP errors
logger.info(f'Successfully updated ownership of object {object_id} to {new_owner}')
except requests.exceptions.RequestException as e:
logger.error(f'Failed to update ownership of object {object_id}: {e}')
def preview_changes(objects):
logger.info('Objects to be changed:')
for obj in objects:
obj_id = obj.get('id', 'N/A')
obj_name = obj.get('path', 'N/A')
obj_type = obj.get('containertype', 'N/A')
obj_owner = obj.get('owner', 'N/A')
logger.info(f'ID: {obj_id}, Name: {obj_name}, Type: {obj_type}, Owner: {obj_owner}')
def main():
parser = argparse.ArgumentParser(description='Update ownership of Dremio objects.')
parser.add_argument('--api_url', required=True, help='Dremio API URL')
parser.add_argument('--api_key', required=True, help='Dremio API Key')
parser.add_argument('--users', required=True, nargs='+', help='List of users to find objects for')
parser.add_argument('--new_owner', required=True, help='New owner for the objects')
args = parser.parse_args()
logger.info(f'Starting ownership update process with API URL: {args.api_url}, API Key: {args.api_key}, Users: {args.users}, New Owner: {args.new_owner}')
objects = get_objects_owned_by_users(args.api_url, args.api_key, args.users)
if not objects:
logger.info('No objects found for the specified users.')
return
preview_changes(objects)
confirm = input('Do you want to proceed with updating the ownership of these objects? (yes/no): ')
if confirm.lower() != 'yes':
logger.info('Operation cancelled by user.')
return
for obj in objects:
update_object_ownership(args.api_url, args.api_key, obj['id'], args.new_owner)
if __name__ == '__main__':
main()