Zoho Analytics Integration
#Log Integration procedure:
Introduction This document outlines the process for integrating Zoho Analytics with Logstash to collect and process internal logs (Activity, Access, and API logs).
Prerequisites
Client credentials for Zoho Analytics API access:
• Client ID
• Client Secret
• Refresh Token
• Workspace ID
• Views ID (unique for each log table)
Permissions:
• DeviceManagementManagedDevices.Read.All
• DeviceManagementManagedDevices.PrivilgedOperations.All
Configuration:
• Access Token generate, Log Fetching and Processing Script.
Update the script with your Zoho credentials, Logstash details (host and port), and API URLs, WORKSPACE_ID and View IDs.
Code Snippet (Generate access token, Log Fetching and Processing Script)
Python
import requests
import socket
import csv
import json from io import StringIO
import datetime
import os
Constants
CLIENT_ID = '1000.CL8HEMBZNW8LZAADBUNI4ZJBBLB8BA'
CLIENT_SECRET = 'f27949628fa35667f33233854de215bca7ca75be6d'
REFRESH_TOKEN = '1000.b3013542a1eda321f28b692d3dd9c1b8.eb938502b041b8b43dc98cd6184d3ef4' REDIRECT_URI = 'http://www.zoho.com/'
GRANT_TYPE = 'refresh_token'
ORG_ID = '756191860'
BASE_URL = 'https://analyticsapi.zoho.com/restapi/v2/'
WORKSPACE_ID = '2408598000001819004'
View IDs
VIEWS = {
'Activity Log': '2408598000001819448', '
API Logs': '2408598000017048838',
'Access Logs': '2408598000001819010'
}
Logstash configuration
LOGSTASH_HOST = 'localhost' # Change to your Logstash host if different
LOGSTASH_PORT = 12340 # Change to your Logstash port if different
LAST_FETCHED_FILE = "last_fetched_timestamps.json"
def get_access_token():
url = 'https://accounts.zoho.com/oauth/v2/token'
params = {
'refresh_token': REFRESH_TOKEN,
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'redirect_uri': REDIRECT_URI,
'grant_type': GRANT_TYPE
}
response = requests.post(url, params=params)
response_data = response.json()
if response.status_code == 200:
return response_data['access_token']
else:
raise Exception(f"Failed to get access token: {response_data}")
def fetch_logs(view_id, access_token):
url = f"{BASE_URL}workspaces/{WORKSPACE_ID}/views/{view_id}/data" headers = { 'Authorization': f'Zoho-oauthtoken {access_token}',
'ZANALYTICS-ORGID': ORG_ID
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.text
else:
response_data = response.json()
raise Exception(f"Failed to fetch logs for view {view_id}: {response_data}")
def parse_csv_data(csv_data):
logs = []
csv_file = StringIO(csv_data)
reader = csv.DictReader(csv_file)
for row in reader:
logs.append(row)
return logs
def send_to_logstash(logs, log_type):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.connect((LOGSTASH_HOST, LOGSTASH_PORT))
for log in logs:
message = json.dumps({log_type: log})
# message = {log_type: log}
sock.sendall(message.encode('utf-8'))
sock.sendall(b'\n’) except Exception as e:
print(f"Failed to send logs to Logstash: {e}")
def load_last_fetched_timestamps():
if os.path.exists(LAST_FETCHED_FILE):
try:
with open(LAST_FETCHED_FILE, 'r') as file:
data = file.read().strip()
if data:
return json.loads(data)
except json.JSONDecodeError as e:
print(f"Error reading {LAST_FETCHED_FILE}: {e}")
return {}
def save_last_fetched_timestamps(timestamps):
with open(LAST_FETCHED_FILE, 'w') as file:
json.dump(timestamps, file)
def main():
try: access_token = get_access_token()
last_fetched_timestamps = load_last_fetched_timestamps()
if name == "main": main()
Note: Copy the code from Meydan and SPC client.
3. Setup a Logstash pipeline to fetch the logs at input port 12340/tcp and follow further steps.
Last updated