Zoho Analytics Integration

#Log Integration procedure:

Introduction This document outlines the process for integrating Zoho Analytics with Logstash to collect and process internal logs (Activity, Access, and API logs).

Prerequisites

Client credentials for Zoho Analytics API access:

• Client ID

• Client Secret

• Refresh Token

• Workspace ID

• Views ID (unique for each log table)

Permissions:

• DeviceManagementManagedDevices.Read.All

• DeviceManagementManagedDevices.PrivilgedOperations.All

Configuration:

• Access Token generate, Log Fetching and Processing Script.

Update the script with your Zoho credentials, Logstash details (host and port), and API URLs, WORKSPACE_ID and View IDs.

Code Snippet (Generate access token, Log Fetching and Processing Script)

Python

import requests

import socket

import csv

import json from io import StringIO

import datetime

import os

Constants

CLIENT_ID = '1000.CL8HEMBZNW8LZAADBUNI4ZJBBLB8BA'

CLIENT_SECRET = 'f27949628fa35667f33233854de215bca7ca75be6d'

REFRESH_TOKEN = '1000.b3013542a1eda321f28b692d3dd9c1b8.eb938502b041b8b43dc98cd6184d3ef4' REDIRECT_URI = 'http://www.zoho.com/'

GRANT_TYPE = 'refresh_token'

ORG_ID = '756191860'

BASE_URL = 'https://analyticsapi.zoho.com/restapi/v2/'

WORKSPACE_ID = '2408598000001819004'

View IDs

VIEWS = {

'Activity Log': '2408598000001819448', '

API Logs': '2408598000017048838',

'Access Logs': '2408598000001819010'

}

Logstash configuration

LOGSTASH_HOST = 'localhost' # Change to your Logstash host if different

LOGSTASH_PORT = 12340 # Change to your Logstash port if different

LAST_FETCHED_FILE = "last_fetched_timestamps.json"

def get_access_token():

url = 'https://accounts.zoho.com/oauth/v2/token'

params = {

'refresh_token': REFRESH_TOKEN,

'client_id': CLIENT_ID,

'client_secret': CLIENT_SECRET,

'redirect_uri': REDIRECT_URI,

'grant_type': GRANT_TYPE

}

response = requests.post(url, params=params)

response_data = response.json()

if response.status_code == 200:

return response_data['access_token']

else:

raise Exception(f"Failed to get access token: {response_data}")

def fetch_logs(view_id, access_token):

url = f"{BASE_URL}workspaces/{WORKSPACE_ID}/views/{view_id}/data" headers = { 'Authorization': f'Zoho-oauthtoken {access_token}',

'ZANALYTICS-ORGID': ORG_ID

}

response = requests.get(url, headers=headers)

if response.status_code == 200:

return response.text

else:

response_data = response.json()

raise Exception(f"Failed to fetch logs for view {view_id}: {response_data}")

def parse_csv_data(csv_data):

logs = []

csv_file = StringIO(csv_data)

reader = csv.DictReader(csv_file)

for row in reader:

logs.append(row)

return logs

def send_to_logstash(logs, log_type):

try:

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.connect((LOGSTASH_HOST, LOGSTASH_PORT))

for log in logs:

message = json.dumps({log_type: log})

# message = {log_type: log}

sock.sendall(message.encode('utf-8'))

sock.sendall(b'\n’) except Exception as e:

print(f"Failed to send logs to Logstash: {e}")

def load_last_fetched_timestamps():

if os.path.exists(LAST_FETCHED_FILE):

try:

with open(LAST_FETCHED_FILE, 'r') as file:

data = file.read().strip()

if data:

return json.loads(data)

except json.JSONDecodeError as e:

print(f"Error reading {LAST_FETCHED_FILE}: {e}")

return {}

def save_last_fetched_timestamps(timestamps):

with open(LAST_FETCHED_FILE, 'w') as file:

json.dump(timestamps, file)

def main():

try: access_token = get_access_token()

last_fetched_timestamps = load_last_fetched_timestamps()

    for log_type, view_id in VIEWS.items():
        csv_data = fetch_logs(view_id, access_token)
        logs = parse_csv_data(csv_data)

        # Filter logs that are newer than the last fetched timestamp
        last_fetched_timestamp = last_fetched_timestamps.get(log_type)
        if last_fetched_timestamp:
            if log_type == "Activity Log":
                last_fetched_dt = datetime.datetime.strptime(last_fetched_timestamp, '%d %b, %Y %H:%M:%S')
                logs = [log for log in logs if datetime.datetime.strptime(log.get("\ufeffDate & Time"), '%d %b, %Y %H:%M:%S') > last_fetched_dt]
            elif log_type == "API Logs":
                last_fetched_dt = datetime.datetime.strptime(last_fetched_timestamp, '%d %b, %Y')
                logs = [log for log in logs if datetime.datetime.strptime(log.get("\ufeffDate"), '%d %b, %Y') > last_fetched_dt]
            elif log_type == "Access Logs":
                last_fetched_dt = datetime.datetime.strptime(last_fetched_timestamp, '%Y-%m-%d %H:%M:%S')
                if logs:
                    logs = [log for log in logs if datetime.datetime.strptime(log.get("\ufeffAccessed Time"), '%Y-%m-%d %H:%M:%S') > last_fetched_dt]

        if logs:
            if log_type == "Activity Log":
                # Sort logs by date and time in descending order
                logs.sort(key=lambda log: datetime.datetime.strptime(log.get("\ufeffDate & Time"), '%d %b, %Y %H:%M:%S'), reverse=True)

                latest_log_dt = datetime.datetime.strptime(logs[0].get("\ufeffDate & Time"), '%d %b, %Y %H:%M:%S')
                last_fetched_timestamps[log_type] = latest_log_dt.strftime('%d %b, %Y %H:%M:%S')

            elif log_type == "API Logs":
                logs.sort(key=lambda log: datetime.datetime.strptime(log.get("\ufeffDate"), '%d %b, %Y'), reverse=True)

                latest_log_dt = datetime.datetime.strptime(logs[0].get("\ufeffDate"), '%d %b, %Y')
                last_fetched_timestamps[log_type] = latest_log_dt.strftime('%d %b, %Y')

            elif log_type == "Access Logs":
                logs.sort(key=lambda log: datetime.datetime.strptime(log.get("\ufeffAccessed Time"), '%Y-%m-%d %H:%M:%S'), reverse=True)

                latest_log_dt = datetime.datetime.strptime(logs[0].get("\ufeffAccessed Time"), '%Y-%m-%d %H:%M:%S')
                last_fetched_timestamps[log_type] = latest_log_dt.strftime('%Y-%m-%d %H:%M:%S')

     #   print(f"** FINAL LOGS for {log_type} **")
     #   for log in logs:
     #       print(log)
                
        send_to_logstash(logs, log_type)
        save_last_fetched_timestamps(last_fetched_timestamps)
except Exception as e:
    print(e)

if name == "main": main()

Note: Copy the code from Meydan and SPC client.

3. Setup a Logstash pipeline to fetch the logs at input port 12340/tcp and follow further steps.

Last updated