from datetime import datetime, timedelta, timezone import tiktoken from modules.chat_processor.helpers import update_user_info, convert_to_dict def get_time(): return datetime.now(timezone.utc).isoformat() async def check_user_cooldown( user_info, current_time, COOLDOWN_TIME, TOKENS_LEFT, REGEN_TIME ): # # Check if no tokens left tokens_left = user_info.metadata.get("tokens_left", 0) if tokens_left > 0 and not user_info.metadata.get("in_cooldown", False): return False, None user_info = convert_to_dict(user_info) last_message_time_str = user_info["metadata"].get("last_message_time") # Convert from ISO format string to datetime object and ensure UTC timezone last_message_time = datetime.fromisoformat(last_message_time_str).replace( tzinfo=timezone.utc ) current_time = datetime.fromisoformat(current_time).replace(tzinfo=timezone.utc) # Calculate the elapsed time elapsed_time = current_time - last_message_time elapsed_time_in_seconds = elapsed_time.total_seconds() # Calculate when the cooldown period ends cooldown_end_time = last_message_time + timedelta(seconds=COOLDOWN_TIME) cooldown_end_time_iso = cooldown_end_time.isoformat() # Check if the user is still in cooldown if elapsed_time_in_seconds < COOLDOWN_TIME: return True, cooldown_end_time_iso # Return in ISO 8601 format user_info["metadata"]["in_cooldown"] = False # If not in cooldown, regenerate tokens await reset_tokens_for_user(user_info, TOKENS_LEFT, REGEN_TIME) return False, None async def reset_tokens_for_user(user_info, TOKENS_LEFT, REGEN_TIME): user_info = convert_to_dict(user_info) last_message_time_str = user_info["metadata"].get("last_message_time") last_message_time = datetime.fromisoformat(last_message_time_str).replace( tzinfo=timezone.utc ) current_time = datetime.fromisoformat(get_time()).replace(tzinfo=timezone.utc) # Calculate the elapsed time since the last message elapsed_time_in_seconds = (current_time - last_message_time).total_seconds() # Current token count (can be negative) current_tokens = user_info["metadata"].get("tokens_left_at_last_message", 0) current_tokens = min(current_tokens, TOKENS_LEFT) # Maximum tokens that can be regenerated max_tokens = user_info["metadata"].get("max_tokens", TOKENS_LEFT) # Calculate how many tokens should have been regenerated proportionally if current_tokens < max_tokens: # Calculate the regeneration rate per second based on REGEN_TIME for full regeneration # If current_tokens is close to 0, then the regeneration rate is relatively high, and if current_tokens is close to max_tokens, then the regeneration rate is relatively low regeneration_rate_per_second = ( max_tokens - max(current_tokens, 0) ) / REGEN_TIME # Calculate how many tokens should have been regenerated based on the elapsed time tokens_to_regenerate = int( elapsed_time_in_seconds * regeneration_rate_per_second ) # Ensure the new token count does not exceed max_tokens new_token_count = min(current_tokens + tokens_to_regenerate, max_tokens) # Update the user's token count user_info["metadata"]["tokens_left"] = new_token_count await update_user_info(user_info) def get_num_tokens(text, model): encoding = tiktoken.encoding_for_model(model) tokens = encoding.encode(text) return len(tokens)