diff --git a/main-aggro.py b/main-aggro.py new file mode 100644 index 0000000..4192a72 --- /dev/null +++ b/main-aggro.py @@ -0,0 +1,453 @@ +""" +Digital Dojo Bot v3 - Radar-based detection +============================================ +Problem: scan returns (0,0) when no enemy nearby = useless for navigation. +Fix: Use RADAR (0.25s cooldown) to get actual enemy positions, log the raw + structure so we can understand the API, and chase accordingly. + +Usage: + pip install httpx + python dojo_bot.py --token YOUR_API_KEY +""" + +import asyncio, argparse, logging, random, sys, time +import httpx + +logging.basicConfig(level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S") +log = logging.getLogger("DojoBot") + +BASE = "https://game-dd.countit.at" + +# direction: 0=NORTH(-Y), 1=EAST(+X), 2=SOUTH(+Y), 3=WEST(-X) +NAMES = {0:"N", 1:"E", 2:"S", 3:"W"} + +CD = { + "move": 0.27, + "hit": 0.27, + "radar": 0.27, + "special": 5.1, + "scan": 2.1, + "dash": 3.1, + "shoot": 1.1, + "peek": 1.1, +} + +def dir_toward(dx, dy): + if abs(dx) >= abs(dy): return 1 if dx > 0 else 3 + return 2 if dy > 0 else 0 + + +class Bot: + def __init__(self, token, multiplayer=False): + self.token = token + self.multiplayer = multiplayer + self.http = httpx.AsyncClient(timeout=8.0) + self._last = {k: 0.0 for k in CD} + self.health = 10.0 + self.kills = 0 + self.deaths = 0 + self.target_dx = None + self.target_dy = None + self.target_last_seen = 0.0 + self.target_ttl = 1.8 + self.last_known_dir = random.randint(0, 3) + self.radar_seen = False # have we printed raw radar yet? + self.explore_dir = random.randint(0, 3) + self.explore_steps = 0 + + def ready(self, a): return (time.monotonic() - self._last[a]) >= CD[a] + def mark(self, a): self._last[a] = time.monotonic() + + def _set_target(self, dx, dy): + self.target_dx, self.target_dy = dx, dy + self.target_last_seen = time.monotonic() + self.last_known_dir = dir_toward(dx, dy) + + def _expire_target(self): + if self.has_target and (time.monotonic() - self.target_last_seen) > self.target_ttl: + self.target_dx, self.target_dy = None, None + + @property + def has_target(self): return self.target_dx is not None + + @property + def dist(self): + if not self.has_target: return 999 + return abs(self.target_dx) + abs(self.target_dy) + + def _ci_get(self, data, *keys, default=None): + """Case-insensitive dict getter for unstable API field casing.""" + if not isinstance(data, dict): + return default + for key in keys: + if key in data: + return data[key] + lowered = {str(k).lower(): v for k, v in data.items()} + for key in keys: + hit = lowered.get(str(key).lower()) + if hit is not None: + return hit + return default + + @staticmethod + def _as_int(value, default=0): + try: + if value is None: + return default + return int(value) + except (TypeError, ValueError): + return default + + @staticmethod + def _as_bool(value): + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return value != 0 + if isinstance(value, str): + return value.strip().lower() in {"true", "1", "yes"} + return False + + async def GET(self, path): + for attempt in range(2): + try: + r = await self.http.get(f"{BASE}{path}") + if r.status_code == 200: + try: + return r.json() + except ValueError: + log.warning(f"GET {path}: invalid JSON body") + return None + + if r.status_code >= 500 and attempt == 0: + await asyncio.sleep(0.15) + continue + + hint = (r.text or "")[:180].replace("\n", " ") + log.warning(f"GET {path}: status={r.status_code} body={hint}") + return None + except (httpx.TimeoutException, httpx.NetworkError, httpx.RemoteProtocolError) as e: + if attempt == 0: + await asyncio.sleep(0.15) + continue + log.warning(f"GET {path}: {e}") + return None + except Exception as e: + log.warning(f"GET {path}: {e}") + return None + return None + + async def POST(self, path): + for attempt in range(2): + try: + r = await self.http.post(f"{BASE}{path}") + if r.status_code == 200: + try: + return r.json() + except ValueError: + log.warning(f"POST {path}: invalid JSON body") + return None + + if r.status_code >= 500 and attempt == 0: + await asyncio.sleep(0.15) + continue + + hint = (r.text or "")[:180].replace("\n", " ") + log.warning(f"POST {path}: status={r.status_code} body={hint}") + return None + except (httpx.TimeoutException, httpx.NetworkError, httpx.RemoteProtocolError) as e: + if attempt == 0: + await asyncio.sleep(0.15) + continue + log.warning(f"POST {path}: {e}") + return None + except Exception as e: + log.warning(f"POST {path}: {e}") + return None + return None + + # ── Game ────────────────────────────────────────────────────────────────── + + async def start(self): + if not self.multiplayer: + r = await self.POST(f"/api/game/{self.token}/create") + log.info(f"Game: {r}") + + async def stop(self): + if not self.multiplayer: await self.POST(f"/api/game/{self.token}/close") + await self.http.aclose() + log.info("Done.") + + # ── Sense ───────────────────────────────────────────────────────────────── + + async def radar(self): + if not self.ready("radar"): return + res = await self.GET(f"/api/player/{self.token}/radar") + if not res: return + self.mark("radar") + + raw = self._ci_get(res, "RadarResults", "radarResults", default={}) or {} + + # Log full radar structure once to understand server payload shape + if not self.radar_seen: + log.info(f"=== RADAR STRUCTURE ===\n{res}\n=======================") + self.radar_seen = True + + nearest = self._parse_radar(raw) + if nearest is not None: + self._set_target(*nearest) + + def _parse_radar(self, raw): + """Extract nearest enemy hint from RadarResults.""" + if not raw: + return None + + # Documented format: directional counts in radar sectors. + if isinstance(raw, dict) and all(isinstance(v, (int, float, str)) for v in raw.values()): + buckets = { + 0: ["0", "n", "north", "up"], + 1: ["1", "e", "east", "right"], + 2: ["2", "s", "south", "down"], + 3: ["3", "w", "west", "left"], + } + counts = {} + lowered = {str(k).lower(): self._as_int(v, 0) for k, v in raw.items()} + for direction, names in buckets.items(): + counts[direction] = max((lowered.get(n, 0) for n in names), default=0) + + best_dir = max(counts, key=counts.get) + if counts.get(best_dir, 0) > 0: + return [(0, -2), (2, 0), (0, 2), (-2, 0)][best_dir] + return None + + # Fallback: try coordinate-like structures if API payload differs. + candidates = [] + if isinstance(raw, dict): + values = list(raw.values()) + elif isinstance(raw, list): + values = raw + else: + values = [] + + for v in values: + if not isinstance(v, dict): + continue + x = self._ci_get(v, "x", "posX", "X", "PosX") + y = self._ci_get(v, "y", "posY", "Y", "PosY") + if x is None or y is None: + continue + try: + xi, yi = int(x), int(y) + except (TypeError, ValueError): + continue + candidates.append((xi, yi)) + + if not candidates: + return None + return min(candidates, key=lambda c: abs(c[0]) + abs(c[1])) + + async def scan(self): + """Backup: scan returns relative delta to nearest player.""" + if not self.ready("scan"): return + res = await self.GET(f"/api/player/{self.token}/scan") + if not res: return + self.mark("scan") + + diff = self._ci_get(res, "DifferenceToNearestPlayer", default={}) or {} + x = self._as_int(self._ci_get(diff, "X", "x"), 0) + y = self._as_int(self._ci_get(diff, "Y", "y"), 0) + if x != 0 or y != 0: + self._set_target(x, y) + log.info(f"Scan: enemy at delta=({x:+d},{y:+d})") + + async def peek_all(self): + """Peek all 4 directions to spot enemies.""" + for d in range(4): + if not self.ready("peek"): break + res = await self.GET(f"/api/player/{self.token}/peek/{d}") + if not res: + continue + self.mark("peek") + + seen = self._as_int(self._ci_get(res, "PlayersInSight", default=0), 0) + if seen > 0: + dist = max(1, self._as_int(self._ci_get(res, "SightedPlayerDistance", default=1), 1)) + dx, dy = [(0, -dist), (dist, 0), (0, dist), (-dist, 0)][d] + self._set_target(dx, dy) + log.info(f"Peek {NAMES[d]}: enemy dist={dist}") + return True + return False + + async def stats(self): + res = await self.GET(f"/api/player/{self.token}/stats") + if not res: return + health = self._ci_get(res, "Health", default={}) or {} + self.health = float(self._ci_get(health, "Currenthealth", "CurrentHealth", "currenthealth", default=self.health)) + + s = self._ci_get(res, "Stats", default={}) or {} + k = self._as_int(self._ci_get(s, "Kills", "kills", default=self.kills), self.kills) + if k > self.kills: log.info(f"KILL! Total: {k}") + self.kills = k + self.deaths = self._as_int(self._ci_get(s, "Deaths", "deaths", default=self.deaths), self.deaths) + + # ── Act ─────────────────────────────────────────────────────────────────── + + async def move(self, d): + if not self.ready("move"): return False + res = await self.POST(f"/api/player/{self.token}/move/{d}") + if not res: return False + self.mark("move") + + moved = self._as_bool(self._ci_get(res, "Move", default=False)) + if moved and self.has_target: + # Update relative target position as we move + deltas = [(0,1),(-1,0),(0,-1),(1,0)] + self.target_dx = (self.target_dx or 0) + deltas[d][0] + self.target_dy = (self.target_dy or 0) + deltas[d][1] + return moved + + async def hit(self, d): + if not self.ready("hit"): return False + res = await self.POST(f"/api/player/{self.token}/hit/{d}") + if not res: return False + self.mark("hit") + + dmg = self._as_int(self._ci_get(res, "Hit", default=0), 0) + crit = self._as_int(self._ci_get(res, "Critical", default=0), 0) + if dmg or crit: + log.info(f"Hit {NAMES[d]}! dmg={dmg} crit={crit}") + return self._as_bool(self._ci_get(res, "Executed", default=False)) + + async def special(self): + if not self.ready("special"): return False + res = await self.POST(f"/api/player/{self.token}/specialattack") + if not res: return False + self.mark("special") + + h = self._as_int(self._ci_get(res, "Hitcount", "HitCount", default=0), 0) + if h: log.info(f"Special hit {h}!") + return self._as_bool(self._ci_get(res, "Executed", default=False)) + + async def dash(self, d): + if not self.ready("dash"): return False + res = await self.POST(f"/api/player/{self.token}/dash/{d}") + if not res: return False + self.mark("dash") + + executed = self._as_bool(self._ci_get(res, "Executed", default=False)) + if executed and self.has_target: + blocks = max(0, self._as_int(self._ci_get(res, "BlocksDashed", default=0), 0)) + deltas = [(0, blocks), (-blocks, 0), (0, -blocks), (blocks, 0)] + self.target_dx = (self.target_dx or 0) + deltas[d][0] + self.target_dy = (self.target_dy or 0) + deltas[d][1] + return executed + + async def shoot(self, d): + if not self.ready("shoot"): return False + res = await self.POST(f"/api/player/{self.token}/shoot/{d}") + if not res: return False + self.mark("shoot") + + if self._as_bool(self._ci_get(res, "HitSomeone", default=False)): + log.info("Shoot HIT!") + return self._as_bool(self._ci_get(res, "Executed", default=False)) + + # ── Strategy ────────────────────────────────────────────────────────────── + + async def engage(self): + d = dir_toward(self.target_dx, self.target_dy) + + dist = self.dist + if dist <= 1: + await self.special() + await self.hit(d) + await self.hit((d + 1) % 4) + await self.hit((d + 3) % 4) + await self.move(d) + return + + if dist <= 3: + await self.special() + await self.shoot(d) + await self.hit(d) + if self.ready("dash"): + await self.dash(d) + else: + await self.move(d) + return + + await self.shoot(d) + if self.ready("dash"): + await self.dash(d) + await self.move(d) + + async def explore(self): + """Aggressive sweep while probing and pushing forward.""" + self.explore_steps += 1 + if self.explore_steps >= 3: + self.explore_dir = (self.explore_dir + 1) % 4 + self.explore_steps = 0 + + # Bias exploration toward the most recent enemy direction. + preferred = self.last_known_dir if random.random() < 0.7 else self.explore_dir + + moved = await self.move(preferred) + if not moved: + self.explore_dir = (self.explore_dir + 1) % 4 + self.explore_steps = 0 + await self.move(self.explore_dir) + else: + self.explore_dir = preferred + await self.move(preferred) + + await self.shoot(preferred) + await self.special() + + # ── Main ────────────────────────────────────────────────────────────────── + + async def run(self): + await self.start() + log.info("Bot v3 running (aggressive mode). Ctrl+C to stop.") + + tick = 0 + try: + while True: + tick += 1 + + await self.radar() # primary detection (0.27s cooldown) + await self.scan() # backup detection (2s cooldown) + self._expire_target() + + # Reacquire quickly when no target is known. + if not self.has_target and tick % 6 == 0: + await self.peek_all() + + if tick % 8 == 0: + await self.stats() + t = f"delta=({self.target_dx:+d},{self.target_dy:+d}) d={self.dist}" if self.has_target else "searching" + log.info(f"HP={self.health:.0f} K={self.kills} D={self.deaths} {t}") + + if self.has_target: + await self.engage() + else: + await self.explore() + + await asyncio.sleep(0.1) + + except (KeyboardInterrupt, asyncio.CancelledError): + pass + finally: + await self.stop() + + +async def main(): + p = argparse.ArgumentParser() + p.add_argument("--token", required=True) + p.add_argument("--multiplayer", action="store_true") + args = p.parse_args() + await Bot(args.token, args.multiplayer).run() + +if __name__ == "__main__": + try: asyncio.run(main()) + except KeyboardInterrupt: sys.exit(0) \ No newline at end of file diff --git a/main-less-aggro.py b/main-less-aggro.py new file mode 100644 index 0000000..4192a72 --- /dev/null +++ b/main-less-aggro.py @@ -0,0 +1,453 @@ +""" +Digital Dojo Bot v3 - Radar-based detection +============================================ +Problem: scan returns (0,0) when no enemy nearby = useless for navigation. +Fix: Use RADAR (0.25s cooldown) to get actual enemy positions, log the raw + structure so we can understand the API, and chase accordingly. + +Usage: + pip install httpx + python dojo_bot.py --token YOUR_API_KEY +""" + +import asyncio, argparse, logging, random, sys, time +import httpx + +logging.basicConfig(level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S") +log = logging.getLogger("DojoBot") + +BASE = "https://game-dd.countit.at" + +# direction: 0=NORTH(-Y), 1=EAST(+X), 2=SOUTH(+Y), 3=WEST(-X) +NAMES = {0:"N", 1:"E", 2:"S", 3:"W"} + +CD = { + "move": 0.27, + "hit": 0.27, + "radar": 0.27, + "special": 5.1, + "scan": 2.1, + "dash": 3.1, + "shoot": 1.1, + "peek": 1.1, +} + +def dir_toward(dx, dy): + if abs(dx) >= abs(dy): return 1 if dx > 0 else 3 + return 2 if dy > 0 else 0 + + +class Bot: + def __init__(self, token, multiplayer=False): + self.token = token + self.multiplayer = multiplayer + self.http = httpx.AsyncClient(timeout=8.0) + self._last = {k: 0.0 for k in CD} + self.health = 10.0 + self.kills = 0 + self.deaths = 0 + self.target_dx = None + self.target_dy = None + self.target_last_seen = 0.0 + self.target_ttl = 1.8 + self.last_known_dir = random.randint(0, 3) + self.radar_seen = False # have we printed raw radar yet? + self.explore_dir = random.randint(0, 3) + self.explore_steps = 0 + + def ready(self, a): return (time.monotonic() - self._last[a]) >= CD[a] + def mark(self, a): self._last[a] = time.monotonic() + + def _set_target(self, dx, dy): + self.target_dx, self.target_dy = dx, dy + self.target_last_seen = time.monotonic() + self.last_known_dir = dir_toward(dx, dy) + + def _expire_target(self): + if self.has_target and (time.monotonic() - self.target_last_seen) > self.target_ttl: + self.target_dx, self.target_dy = None, None + + @property + def has_target(self): return self.target_dx is not None + + @property + def dist(self): + if not self.has_target: return 999 + return abs(self.target_dx) + abs(self.target_dy) + + def _ci_get(self, data, *keys, default=None): + """Case-insensitive dict getter for unstable API field casing.""" + if not isinstance(data, dict): + return default + for key in keys: + if key in data: + return data[key] + lowered = {str(k).lower(): v for k, v in data.items()} + for key in keys: + hit = lowered.get(str(key).lower()) + if hit is not None: + return hit + return default + + @staticmethod + def _as_int(value, default=0): + try: + if value is None: + return default + return int(value) + except (TypeError, ValueError): + return default + + @staticmethod + def _as_bool(value): + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return value != 0 + if isinstance(value, str): + return value.strip().lower() in {"true", "1", "yes"} + return False + + async def GET(self, path): + for attempt in range(2): + try: + r = await self.http.get(f"{BASE}{path}") + if r.status_code == 200: + try: + return r.json() + except ValueError: + log.warning(f"GET {path}: invalid JSON body") + return None + + if r.status_code >= 500 and attempt == 0: + await asyncio.sleep(0.15) + continue + + hint = (r.text or "")[:180].replace("\n", " ") + log.warning(f"GET {path}: status={r.status_code} body={hint}") + return None + except (httpx.TimeoutException, httpx.NetworkError, httpx.RemoteProtocolError) as e: + if attempt == 0: + await asyncio.sleep(0.15) + continue + log.warning(f"GET {path}: {e}") + return None + except Exception as e: + log.warning(f"GET {path}: {e}") + return None + return None + + async def POST(self, path): + for attempt in range(2): + try: + r = await self.http.post(f"{BASE}{path}") + if r.status_code == 200: + try: + return r.json() + except ValueError: + log.warning(f"POST {path}: invalid JSON body") + return None + + if r.status_code >= 500 and attempt == 0: + await asyncio.sleep(0.15) + continue + + hint = (r.text or "")[:180].replace("\n", " ") + log.warning(f"POST {path}: status={r.status_code} body={hint}") + return None + except (httpx.TimeoutException, httpx.NetworkError, httpx.RemoteProtocolError) as e: + if attempt == 0: + await asyncio.sleep(0.15) + continue + log.warning(f"POST {path}: {e}") + return None + except Exception as e: + log.warning(f"POST {path}: {e}") + return None + return None + + # ── Game ────────────────────────────────────────────────────────────────── + + async def start(self): + if not self.multiplayer: + r = await self.POST(f"/api/game/{self.token}/create") + log.info(f"Game: {r}") + + async def stop(self): + if not self.multiplayer: await self.POST(f"/api/game/{self.token}/close") + await self.http.aclose() + log.info("Done.") + + # ── Sense ───────────────────────────────────────────────────────────────── + + async def radar(self): + if not self.ready("radar"): return + res = await self.GET(f"/api/player/{self.token}/radar") + if not res: return + self.mark("radar") + + raw = self._ci_get(res, "RadarResults", "radarResults", default={}) or {} + + # Log full radar structure once to understand server payload shape + if not self.radar_seen: + log.info(f"=== RADAR STRUCTURE ===\n{res}\n=======================") + self.radar_seen = True + + nearest = self._parse_radar(raw) + if nearest is not None: + self._set_target(*nearest) + + def _parse_radar(self, raw): + """Extract nearest enemy hint from RadarResults.""" + if not raw: + return None + + # Documented format: directional counts in radar sectors. + if isinstance(raw, dict) and all(isinstance(v, (int, float, str)) for v in raw.values()): + buckets = { + 0: ["0", "n", "north", "up"], + 1: ["1", "e", "east", "right"], + 2: ["2", "s", "south", "down"], + 3: ["3", "w", "west", "left"], + } + counts = {} + lowered = {str(k).lower(): self._as_int(v, 0) for k, v in raw.items()} + for direction, names in buckets.items(): + counts[direction] = max((lowered.get(n, 0) for n in names), default=0) + + best_dir = max(counts, key=counts.get) + if counts.get(best_dir, 0) > 0: + return [(0, -2), (2, 0), (0, 2), (-2, 0)][best_dir] + return None + + # Fallback: try coordinate-like structures if API payload differs. + candidates = [] + if isinstance(raw, dict): + values = list(raw.values()) + elif isinstance(raw, list): + values = raw + else: + values = [] + + for v in values: + if not isinstance(v, dict): + continue + x = self._ci_get(v, "x", "posX", "X", "PosX") + y = self._ci_get(v, "y", "posY", "Y", "PosY") + if x is None or y is None: + continue + try: + xi, yi = int(x), int(y) + except (TypeError, ValueError): + continue + candidates.append((xi, yi)) + + if not candidates: + return None + return min(candidates, key=lambda c: abs(c[0]) + abs(c[1])) + + async def scan(self): + """Backup: scan returns relative delta to nearest player.""" + if not self.ready("scan"): return + res = await self.GET(f"/api/player/{self.token}/scan") + if not res: return + self.mark("scan") + + diff = self._ci_get(res, "DifferenceToNearestPlayer", default={}) or {} + x = self._as_int(self._ci_get(diff, "X", "x"), 0) + y = self._as_int(self._ci_get(diff, "Y", "y"), 0) + if x != 0 or y != 0: + self._set_target(x, y) + log.info(f"Scan: enemy at delta=({x:+d},{y:+d})") + + async def peek_all(self): + """Peek all 4 directions to spot enemies.""" + for d in range(4): + if not self.ready("peek"): break + res = await self.GET(f"/api/player/{self.token}/peek/{d}") + if not res: + continue + self.mark("peek") + + seen = self._as_int(self._ci_get(res, "PlayersInSight", default=0), 0) + if seen > 0: + dist = max(1, self._as_int(self._ci_get(res, "SightedPlayerDistance", default=1), 1)) + dx, dy = [(0, -dist), (dist, 0), (0, dist), (-dist, 0)][d] + self._set_target(dx, dy) + log.info(f"Peek {NAMES[d]}: enemy dist={dist}") + return True + return False + + async def stats(self): + res = await self.GET(f"/api/player/{self.token}/stats") + if not res: return + health = self._ci_get(res, "Health", default={}) or {} + self.health = float(self._ci_get(health, "Currenthealth", "CurrentHealth", "currenthealth", default=self.health)) + + s = self._ci_get(res, "Stats", default={}) or {} + k = self._as_int(self._ci_get(s, "Kills", "kills", default=self.kills), self.kills) + if k > self.kills: log.info(f"KILL! Total: {k}") + self.kills = k + self.deaths = self._as_int(self._ci_get(s, "Deaths", "deaths", default=self.deaths), self.deaths) + + # ── Act ─────────────────────────────────────────────────────────────────── + + async def move(self, d): + if not self.ready("move"): return False + res = await self.POST(f"/api/player/{self.token}/move/{d}") + if not res: return False + self.mark("move") + + moved = self._as_bool(self._ci_get(res, "Move", default=False)) + if moved and self.has_target: + # Update relative target position as we move + deltas = [(0,1),(-1,0),(0,-1),(1,0)] + self.target_dx = (self.target_dx or 0) + deltas[d][0] + self.target_dy = (self.target_dy or 0) + deltas[d][1] + return moved + + async def hit(self, d): + if not self.ready("hit"): return False + res = await self.POST(f"/api/player/{self.token}/hit/{d}") + if not res: return False + self.mark("hit") + + dmg = self._as_int(self._ci_get(res, "Hit", default=0), 0) + crit = self._as_int(self._ci_get(res, "Critical", default=0), 0) + if dmg or crit: + log.info(f"Hit {NAMES[d]}! dmg={dmg} crit={crit}") + return self._as_bool(self._ci_get(res, "Executed", default=False)) + + async def special(self): + if not self.ready("special"): return False + res = await self.POST(f"/api/player/{self.token}/specialattack") + if not res: return False + self.mark("special") + + h = self._as_int(self._ci_get(res, "Hitcount", "HitCount", default=0), 0) + if h: log.info(f"Special hit {h}!") + return self._as_bool(self._ci_get(res, "Executed", default=False)) + + async def dash(self, d): + if not self.ready("dash"): return False + res = await self.POST(f"/api/player/{self.token}/dash/{d}") + if not res: return False + self.mark("dash") + + executed = self._as_bool(self._ci_get(res, "Executed", default=False)) + if executed and self.has_target: + blocks = max(0, self._as_int(self._ci_get(res, "BlocksDashed", default=0), 0)) + deltas = [(0, blocks), (-blocks, 0), (0, -blocks), (blocks, 0)] + self.target_dx = (self.target_dx or 0) + deltas[d][0] + self.target_dy = (self.target_dy or 0) + deltas[d][1] + return executed + + async def shoot(self, d): + if not self.ready("shoot"): return False + res = await self.POST(f"/api/player/{self.token}/shoot/{d}") + if not res: return False + self.mark("shoot") + + if self._as_bool(self._ci_get(res, "HitSomeone", default=False)): + log.info("Shoot HIT!") + return self._as_bool(self._ci_get(res, "Executed", default=False)) + + # ── Strategy ────────────────────────────────────────────────────────────── + + async def engage(self): + d = dir_toward(self.target_dx, self.target_dy) + + dist = self.dist + if dist <= 1: + await self.special() + await self.hit(d) + await self.hit((d + 1) % 4) + await self.hit((d + 3) % 4) + await self.move(d) + return + + if dist <= 3: + await self.special() + await self.shoot(d) + await self.hit(d) + if self.ready("dash"): + await self.dash(d) + else: + await self.move(d) + return + + await self.shoot(d) + if self.ready("dash"): + await self.dash(d) + await self.move(d) + + async def explore(self): + """Aggressive sweep while probing and pushing forward.""" + self.explore_steps += 1 + if self.explore_steps >= 3: + self.explore_dir = (self.explore_dir + 1) % 4 + self.explore_steps = 0 + + # Bias exploration toward the most recent enemy direction. + preferred = self.last_known_dir if random.random() < 0.7 else self.explore_dir + + moved = await self.move(preferred) + if not moved: + self.explore_dir = (self.explore_dir + 1) % 4 + self.explore_steps = 0 + await self.move(self.explore_dir) + else: + self.explore_dir = preferred + await self.move(preferred) + + await self.shoot(preferred) + await self.special() + + # ── Main ────────────────────────────────────────────────────────────────── + + async def run(self): + await self.start() + log.info("Bot v3 running (aggressive mode). Ctrl+C to stop.") + + tick = 0 + try: + while True: + tick += 1 + + await self.radar() # primary detection (0.27s cooldown) + await self.scan() # backup detection (2s cooldown) + self._expire_target() + + # Reacquire quickly when no target is known. + if not self.has_target and tick % 6 == 0: + await self.peek_all() + + if tick % 8 == 0: + await self.stats() + t = f"delta=({self.target_dx:+d},{self.target_dy:+d}) d={self.dist}" if self.has_target else "searching" + log.info(f"HP={self.health:.0f} K={self.kills} D={self.deaths} {t}") + + if self.has_target: + await self.engage() + else: + await self.explore() + + await asyncio.sleep(0.1) + + except (KeyboardInterrupt, asyncio.CancelledError): + pass + finally: + await self.stop() + + +async def main(): + p = argparse.ArgumentParser() + p.add_argument("--token", required=True) + p.add_argument("--multiplayer", action="store_true") + args = p.parse_args() + await Bot(args.token, args.multiplayer).run() + +if __name__ == "__main__": + try: asyncio.run(main()) + except KeyboardInterrupt: sys.exit(0) \ No newline at end of file