453 lines
16 KiB
Python
453 lines
16 KiB
Python
"""
|
|
Digital Dojo Bot v3 - Radar-based detection
|
|
============================================
|
|
Problem: scan returns (0,0) when no enemy nearby = useless for navigation.
|
|
Fix: Use RADAR (0.25s cooldown) to get actual enemy positions, log the raw
|
|
structure so we can understand the API, and chase accordingly.
|
|
|
|
Usage:
|
|
pip install httpx
|
|
python dojo_bot.py --token YOUR_API_KEY
|
|
"""
|
|
|
|
import asyncio, argparse, logging, random, sys, time
|
|
import httpx
|
|
|
|
logging.basicConfig(level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S")
|
|
log = logging.getLogger("DojoBot")
|
|
|
|
BASE = "https://game-dd.countit.at"
|
|
|
|
# direction: 0=NORTH(-Y), 1=EAST(+X), 2=SOUTH(+Y), 3=WEST(-X)
|
|
NAMES = {0:"N", 1:"E", 2:"S", 3:"W"}
|
|
|
|
CD = {
|
|
"move": 0.27,
|
|
"hit": 0.27,
|
|
"radar": 0.27,
|
|
"special": 5.1,
|
|
"scan": 2.1,
|
|
"dash": 3.1,
|
|
"shoot": 1.1,
|
|
"peek": 1.1,
|
|
}
|
|
|
|
def dir_toward(dx, dy):
|
|
if abs(dx) >= abs(dy): return 1 if dx > 0 else 3
|
|
return 2 if dy > 0 else 0
|
|
|
|
|
|
class Bot:
|
|
def __init__(self, token, multiplayer=False):
|
|
self.token = token
|
|
self.multiplayer = multiplayer
|
|
self.http = httpx.AsyncClient(timeout=8.0)
|
|
self._last = {k: 0.0 for k in CD}
|
|
self.health = 10.0
|
|
self.kills = 0
|
|
self.deaths = 0
|
|
self.target_dx = None
|
|
self.target_dy = None
|
|
self.target_last_seen = 0.0
|
|
self.target_ttl = 1.8
|
|
self.last_known_dir = random.randint(0, 3)
|
|
self.radar_seen = False # have we printed raw radar yet?
|
|
self.explore_dir = random.randint(0, 3)
|
|
self.explore_steps = 0
|
|
|
|
def ready(self, a): return (time.monotonic() - self._last[a]) >= CD[a]
|
|
def mark(self, a): self._last[a] = time.monotonic()
|
|
|
|
def _set_target(self, dx, dy):
|
|
self.target_dx, self.target_dy = dx, dy
|
|
self.target_last_seen = time.monotonic()
|
|
self.last_known_dir = dir_toward(dx, dy)
|
|
|
|
def _expire_target(self):
|
|
if self.has_target and (time.monotonic() - self.target_last_seen) > self.target_ttl:
|
|
self.target_dx, self.target_dy = None, None
|
|
|
|
@property
|
|
def has_target(self): return self.target_dx is not None
|
|
|
|
@property
|
|
def dist(self):
|
|
if not self.has_target: return 999
|
|
return abs(self.target_dx) + abs(self.target_dy)
|
|
|
|
def _ci_get(self, data, *keys, default=None):
|
|
"""Case-insensitive dict getter for unstable API field casing."""
|
|
if not isinstance(data, dict):
|
|
return default
|
|
for key in keys:
|
|
if key in data:
|
|
return data[key]
|
|
lowered = {str(k).lower(): v for k, v in data.items()}
|
|
for key in keys:
|
|
hit = lowered.get(str(key).lower())
|
|
if hit is not None:
|
|
return hit
|
|
return default
|
|
|
|
@staticmethod
|
|
def _as_int(value, default=0):
|
|
try:
|
|
if value is None:
|
|
return default
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
@staticmethod
|
|
def _as_bool(value):
|
|
if isinstance(value, bool):
|
|
return value
|
|
if isinstance(value, (int, float)):
|
|
return value != 0
|
|
if isinstance(value, str):
|
|
return value.strip().lower() in {"true", "1", "yes"}
|
|
return False
|
|
|
|
async def GET(self, path):
|
|
for attempt in range(2):
|
|
try:
|
|
r = await self.http.get(f"{BASE}{path}")
|
|
if r.status_code == 200:
|
|
try:
|
|
return r.json()
|
|
except ValueError:
|
|
log.warning(f"GET {path}: invalid JSON body")
|
|
return None
|
|
|
|
if r.status_code >= 500 and attempt == 0:
|
|
await asyncio.sleep(0.15)
|
|
continue
|
|
|
|
hint = (r.text or "")[:180].replace("\n", " ")
|
|
log.warning(f"GET {path}: status={r.status_code} body={hint}")
|
|
return None
|
|
except (httpx.TimeoutException, httpx.NetworkError, httpx.RemoteProtocolError) as e:
|
|
if attempt == 0:
|
|
await asyncio.sleep(0.15)
|
|
continue
|
|
log.warning(f"GET {path}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
log.warning(f"GET {path}: {e}")
|
|
return None
|
|
return None
|
|
|
|
async def POST(self, path):
|
|
for attempt in range(2):
|
|
try:
|
|
r = await self.http.post(f"{BASE}{path}")
|
|
if r.status_code == 200:
|
|
try:
|
|
return r.json()
|
|
except ValueError:
|
|
log.warning(f"POST {path}: invalid JSON body")
|
|
return None
|
|
|
|
if r.status_code >= 500 and attempt == 0:
|
|
await asyncio.sleep(0.15)
|
|
continue
|
|
|
|
hint = (r.text or "")[:180].replace("\n", " ")
|
|
log.warning(f"POST {path}: status={r.status_code} body={hint}")
|
|
return None
|
|
except (httpx.TimeoutException, httpx.NetworkError, httpx.RemoteProtocolError) as e:
|
|
if attempt == 0:
|
|
await asyncio.sleep(0.15)
|
|
continue
|
|
log.warning(f"POST {path}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
log.warning(f"POST {path}: {e}")
|
|
return None
|
|
return None
|
|
|
|
# ── Game ──────────────────────────────────────────────────────────────────
|
|
|
|
async def start(self):
|
|
if not self.multiplayer:
|
|
r = await self.POST(f"/api/game/{self.token}/create")
|
|
log.info(f"Game: {r}")
|
|
|
|
async def stop(self):
|
|
if not self.multiplayer: await self.POST(f"/api/game/{self.token}/close")
|
|
await self.http.aclose()
|
|
log.info("Done.")
|
|
|
|
# ── Sense ─────────────────────────────────────────────────────────────────
|
|
|
|
async def radar(self):
|
|
if not self.ready("radar"): return
|
|
res = await self.GET(f"/api/player/{self.token}/radar")
|
|
if not res: return
|
|
self.mark("radar")
|
|
|
|
raw = self._ci_get(res, "RadarResults", "radarResults", default={}) or {}
|
|
|
|
# Log full radar structure once to understand server payload shape
|
|
if not self.radar_seen:
|
|
log.info(f"=== RADAR STRUCTURE ===\n{res}\n=======================")
|
|
self.radar_seen = True
|
|
|
|
nearest = self._parse_radar(raw)
|
|
if nearest is not None:
|
|
self._set_target(*nearest)
|
|
|
|
def _parse_radar(self, raw):
|
|
"""Extract nearest enemy hint from RadarResults."""
|
|
if not raw:
|
|
return None
|
|
|
|
# Documented format: directional counts in radar sectors.
|
|
if isinstance(raw, dict) and all(isinstance(v, (int, float, str)) for v in raw.values()):
|
|
buckets = {
|
|
0: ["0", "n", "north", "up"],
|
|
1: ["1", "e", "east", "right"],
|
|
2: ["2", "s", "south", "down"],
|
|
3: ["3", "w", "west", "left"],
|
|
}
|
|
counts = {}
|
|
lowered = {str(k).lower(): self._as_int(v, 0) for k, v in raw.items()}
|
|
for direction, names in buckets.items():
|
|
counts[direction] = max((lowered.get(n, 0) for n in names), default=0)
|
|
|
|
best_dir = max(counts, key=counts.get)
|
|
if counts.get(best_dir, 0) > 0:
|
|
return [(0, -2), (2, 0), (0, 2), (-2, 0)][best_dir]
|
|
return None
|
|
|
|
# Fallback: try coordinate-like structures if API payload differs.
|
|
candidates = []
|
|
if isinstance(raw, dict):
|
|
values = list(raw.values())
|
|
elif isinstance(raw, list):
|
|
values = raw
|
|
else:
|
|
values = []
|
|
|
|
for v in values:
|
|
if not isinstance(v, dict):
|
|
continue
|
|
x = self._ci_get(v, "x", "posX", "X", "PosX")
|
|
y = self._ci_get(v, "y", "posY", "Y", "PosY")
|
|
if x is None or y is None:
|
|
continue
|
|
try:
|
|
xi, yi = int(x), int(y)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
candidates.append((xi, yi))
|
|
|
|
if not candidates:
|
|
return None
|
|
return min(candidates, key=lambda c: abs(c[0]) + abs(c[1]))
|
|
|
|
async def scan(self):
|
|
"""Backup: scan returns relative delta to nearest player."""
|
|
if not self.ready("scan"): return
|
|
res = await self.GET(f"/api/player/{self.token}/scan")
|
|
if not res: return
|
|
self.mark("scan")
|
|
|
|
diff = self._ci_get(res, "DifferenceToNearestPlayer", default={}) or {}
|
|
x = self._as_int(self._ci_get(diff, "X", "x"), 0)
|
|
y = self._as_int(self._ci_get(diff, "Y", "y"), 0)
|
|
if x != 0 or y != 0:
|
|
self._set_target(x, y)
|
|
log.info(f"Scan: enemy at delta=({x:+d},{y:+d})")
|
|
|
|
async def peek_all(self):
|
|
"""Peek all 4 directions to spot enemies."""
|
|
for d in range(4):
|
|
if not self.ready("peek"): break
|
|
res = await self.GET(f"/api/player/{self.token}/peek/{d}")
|
|
if not res:
|
|
continue
|
|
self.mark("peek")
|
|
|
|
seen = self._as_int(self._ci_get(res, "PlayersInSight", default=0), 0)
|
|
if seen > 0:
|
|
dist = max(1, self._as_int(self._ci_get(res, "SightedPlayerDistance", default=1), 1))
|
|
dx, dy = [(0, -dist), (dist, 0), (0, dist), (-dist, 0)][d]
|
|
self._set_target(dx, dy)
|
|
log.info(f"Peek {NAMES[d]}: enemy dist={dist}")
|
|
return True
|
|
return False
|
|
|
|
async def stats(self):
|
|
res = await self.GET(f"/api/player/{self.token}/stats")
|
|
if not res: return
|
|
health = self._ci_get(res, "Health", default={}) or {}
|
|
self.health = float(self._ci_get(health, "Currenthealth", "CurrentHealth", "currenthealth", default=self.health))
|
|
|
|
s = self._ci_get(res, "Stats", default={}) or {}
|
|
k = self._as_int(self._ci_get(s, "Kills", "kills", default=self.kills), self.kills)
|
|
if k > self.kills: log.info(f"KILL! Total: {k}")
|
|
self.kills = k
|
|
self.deaths = self._as_int(self._ci_get(s, "Deaths", "deaths", default=self.deaths), self.deaths)
|
|
|
|
# ── Act ───────────────────────────────────────────────────────────────────
|
|
|
|
async def move(self, d):
|
|
if not self.ready("move"): return False
|
|
res = await self.POST(f"/api/player/{self.token}/move/{d}")
|
|
if not res: return False
|
|
self.mark("move")
|
|
|
|
moved = self._as_bool(self._ci_get(res, "Move", default=False))
|
|
if moved and self.has_target:
|
|
# Update relative target position as we move
|
|
deltas = [(0,1),(-1,0),(0,-1),(1,0)]
|
|
self.target_dx = (self.target_dx or 0) + deltas[d][0]
|
|
self.target_dy = (self.target_dy or 0) + deltas[d][1]
|
|
return moved
|
|
|
|
async def hit(self, d):
|
|
if not self.ready("hit"): return False
|
|
res = await self.POST(f"/api/player/{self.token}/hit/{d}")
|
|
if not res: return False
|
|
self.mark("hit")
|
|
|
|
dmg = self._as_int(self._ci_get(res, "Hit", default=0), 0)
|
|
crit = self._as_int(self._ci_get(res, "Critical", default=0), 0)
|
|
if dmg or crit:
|
|
log.info(f"Hit {NAMES[d]}! dmg={dmg} crit={crit}")
|
|
return self._as_bool(self._ci_get(res, "Executed", default=False))
|
|
|
|
async def special(self):
|
|
if not self.ready("special"): return False
|
|
res = await self.POST(f"/api/player/{self.token}/specialattack")
|
|
if not res: return False
|
|
self.mark("special")
|
|
|
|
h = self._as_int(self._ci_get(res, "Hitcount", "HitCount", default=0), 0)
|
|
if h: log.info(f"Special hit {h}!")
|
|
return self._as_bool(self._ci_get(res, "Executed", default=False))
|
|
|
|
async def dash(self, d):
|
|
if not self.ready("dash"): return False
|
|
res = await self.POST(f"/api/player/{self.token}/dash/{d}")
|
|
if not res: return False
|
|
self.mark("dash")
|
|
|
|
executed = self._as_bool(self._ci_get(res, "Executed", default=False))
|
|
if executed and self.has_target:
|
|
blocks = max(0, self._as_int(self._ci_get(res, "BlocksDashed", default=0), 0))
|
|
deltas = [(0, blocks), (-blocks, 0), (0, -blocks), (blocks, 0)]
|
|
self.target_dx = (self.target_dx or 0) + deltas[d][0]
|
|
self.target_dy = (self.target_dy or 0) + deltas[d][1]
|
|
return executed
|
|
|
|
async def shoot(self, d):
|
|
if not self.ready("shoot"): return False
|
|
res = await self.POST(f"/api/player/{self.token}/shoot/{d}")
|
|
if not res: return False
|
|
self.mark("shoot")
|
|
|
|
if self._as_bool(self._ci_get(res, "HitSomeone", default=False)):
|
|
log.info("Shoot HIT!")
|
|
return self._as_bool(self._ci_get(res, "Executed", default=False))
|
|
|
|
# ── Strategy ──────────────────────────────────────────────────────────────
|
|
|
|
async def engage(self):
|
|
d = dir_toward(self.target_dx, self.target_dy)
|
|
|
|
dist = self.dist
|
|
if dist <= 1:
|
|
await self.special()
|
|
await self.hit(d)
|
|
await self.hit((d + 1) % 4)
|
|
await self.hit((d + 3) % 4)
|
|
await self.move(d)
|
|
return
|
|
|
|
if dist <= 3:
|
|
await self.special()
|
|
await self.shoot(d)
|
|
await self.hit(d)
|
|
if self.ready("dash"):
|
|
await self.dash(d)
|
|
else:
|
|
await self.move(d)
|
|
return
|
|
|
|
await self.shoot(d)
|
|
if self.ready("dash"):
|
|
await self.dash(d)
|
|
await self.move(d)
|
|
|
|
async def explore(self):
|
|
"""Aggressive sweep while probing and pushing forward."""
|
|
self.explore_steps += 1
|
|
if self.explore_steps >= 3:
|
|
self.explore_dir = (self.explore_dir + 1) % 4
|
|
self.explore_steps = 0
|
|
|
|
# Bias exploration toward the most recent enemy direction.
|
|
preferred = self.last_known_dir if random.random() < 0.7 else self.explore_dir
|
|
|
|
moved = await self.move(preferred)
|
|
if not moved:
|
|
self.explore_dir = (self.explore_dir + 1) % 4
|
|
self.explore_steps = 0
|
|
await self.move(self.explore_dir)
|
|
else:
|
|
self.explore_dir = preferred
|
|
await self.move(preferred)
|
|
|
|
await self.shoot(preferred)
|
|
await self.special()
|
|
|
|
# ── Main ──────────────────────────────────────────────────────────────────
|
|
|
|
async def run(self):
|
|
await self.start()
|
|
log.info("Bot v3 running (aggressive mode). Ctrl+C to stop.")
|
|
|
|
tick = 0
|
|
try:
|
|
while True:
|
|
tick += 1
|
|
|
|
await self.radar() # primary detection (0.27s cooldown)
|
|
await self.scan() # backup detection (2s cooldown)
|
|
self._expire_target()
|
|
|
|
# Reacquire quickly when no target is known.
|
|
if not self.has_target and tick % 6 == 0:
|
|
await self.peek_all()
|
|
|
|
if tick % 8 == 0:
|
|
await self.stats()
|
|
t = f"delta=({self.target_dx:+d},{self.target_dy:+d}) d={self.dist}" if self.has_target else "searching"
|
|
log.info(f"HP={self.health:.0f} K={self.kills} D={self.deaths} {t}")
|
|
|
|
if self.has_target:
|
|
await self.engage()
|
|
else:
|
|
await self.explore()
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
except (KeyboardInterrupt, asyncio.CancelledError):
|
|
pass
|
|
finally:
|
|
await self.stop()
|
|
|
|
|
|
async def main():
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--token", required=True)
|
|
p.add_argument("--multiplayer", action="store_true")
|
|
args = p.parse_args()
|
|
await Bot(args.token, args.multiplayer).run()
|
|
|
|
if __name__ == "__main__":
|
|
try: asyncio.run(main())
|
|
except KeyboardInterrupt: sys.exit(0) |