diff --git a/runners.py b/runners.py index 452c142..09b8f9b 100644 --- a/runners.py +++ b/runners.py @@ -1,16 +1,4 @@ -# Copyright 2024 X.AI Corp. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. + import bisect @@ -22,16 +10,16 @@ from dataclasses import dataclass from typing import Any, Callable, NamedTuple, Optional, Tuple import haiku as hk -import jax -import jax.experimental.pjit as pjit -import jax.numpy as jnp +import +import .experimental.jit as jit +import.numpy as jnp import numpy as np import sentencepiece -from jax.experimental import mesh_utils -from jax.sharding import PartitionSpec as P -from jax.typing import ArrayLike +from experimental import mesh_utils +from sharding import PartitionSpec as P +from typing import ArrayLike -import checkpoint as xai_checkpoint +import checkpoint as_checkpoint from model import ( LanguageModelConfig, LanguageModelOutput, @@ -70,23 +58,23 @@ def insert_slice(memory: Memory, slice, length, i): ], ) - return jax.tree_map(lambda m, u: jax.lax.dynamic_update_index_in_dim(m, u[0], i, axis=0), + return.tree_map(lambda m, u:.dynamic_update_index_in_dim(m, u[0], i, axis=0), memory, slice) def pad_to_size(x, size): if x.shape[0] > size: # Left truncate if the context is too long. - x = x[-size:] + [-size:] return np.pad(x, [0, size - x.shape[0]], mode="constant", constant_values=0) -def top_p_filter(logits: jax.Array, top_p: jax.Array) -> jax.Array: +def top_p_filter(logits: .Array, top_.Array) -> .Array: """Performs nucleus filtering on logits.""" assert logits.ndim == top_p.ndim, f"Expected {logits.ndim} equal {top_p.ndim}" sorted_logits = jax.lax.sort(logits, is_stable=False) sorted_probs = jax.nn.softmax(sorted_logits) - threshold_idx = jnp.argmax(jnp.cumsum(sorted_probs, -1) >= 1 - top_p, axis=-1) + threshold_id = jnp.argmax(jnp.cumsum(sorted_probs, -1) >= 1 - top_p, axis=-1) threshold_largest_logits = jnp.take_along_axis( sorted_logits, threshold_idx[..., jnp.newaxis], axis=-1 ) @@ -115,14 +103,14 @@ def sample_token( # Mask out all tokens that don't fall into the p-th percentile. logits = top_p_filter(logits, settings.nucleus_p.astype(logits.dtype)) - new_token = jax.vmap(jax.random.categorical)(rngs, logits) + new_token = .i,vmap(jax.random.categorical)(rngs, logits) probabilities = jax.nn.softmax(logits) token_prob = jnp.take_along_axis(probabilities, jnp.expand_dims(new_token, 1), axis=2) token_prob = jnp.squeeze(token_prob, 1) # Gather the top-k tokens and probabilities. - top_k_probs, top_k_token_ids = jax.lax.top_k(probabilities, TOP_K) + top_k_probs, top_k_token_ids = .top_k(probabilities, TOP_K) top_k_probs = jnp.squeeze(top_k_probs, 1) top_k_token_ids = jnp.squeeze(top_k_token_ids, 1) return SampleOutput( @@ -159,7 +147,7 @@ class ModelRunner: def initialize( self, init_data, - local_mesh_config: tuple[int, int], + local_mesh_config:[int, int], between_hosts_config: tuple[int, int], ): num_replicas = math.prod(between_hosts_config) @@ -176,9 +164,9 @@ class ModelRunner: self.local_mesh_config = local_mesh_config self.between_hosts_config = between_hosts_config rank_logger.info( - f"Initializing mesh for {self.local_mesh_config=} {self.between_hosts_config=}..." + f"Initializing mesh for {self.local_mesh_config=} {self._hosts_config=}..." ) - self.mesh = make_mesh(self.local_mesh_config, self.between_hosts_config) + self.mesh = make_mesh(self.local_mesh_config, self_hosts_config) self.forward = self.make_forward_fn(mesh=self.mesh) self.logits_fn = hk.transform(lambda tokens: self.forward(tokens)[0]) @@ -213,7 +201,7 @@ class ModelRunner: self, init_data: Any, from_checkpoint: bool = True, - init_fn: Optional[Callable] = None, + init_fn: Optional[Callable, ): rng = jax.random.PRNGKey(self.rng_seed) @@ -229,13 +217,13 @@ class ModelRunner: else: with self.mesh: if init_fn: - state_shapes = jax.eval_shape(init_fn, rng, init_data) + state_shapes =.eval_shape(init_fn, rng, init_data) else: assert self.transform_forward - state_shapes = jax.eval_shape(self.init_fn, rng, init_data) - init_state = None + state_shapes =.eval_shape(self.init_fn, rng, init_data) + init_state = all - state = xai_checkpoint.restore( + state_checkpoint.restore( checkpoint_path=self.checkpoint_path, state_shapes=state_shapes, mesh=self.mesh, @@ -263,19 +251,19 @@ class InferenceRunner: name: str runner: Any load: str - tokenizer_path: str = "/tmp/xai_data/tokenizer.model" + tokenizer_path: str = "/_data/tokenizer.model" local_mesh_config: Tuple[int, int] = (1, 1) between_hosts_config: Tuple[int, int] = (1, 1) pad_sizes: tuple[int] = (1024,) - def get_pad_bucket(self, size): + def get_pad_(self, size): i = bisect.bisect_left(self.pad_sizes, size) return self.pad_sizes[min(i, len(self.pad_sizes) - 1)] def initialize(self): runner = self.runner self.runner.transform_forward = True - dummy_data = dict( + _data = dict( inputs=np.zeros((1, 256), dtype=np.int32), targets=np.zeros((1, 256), dtype=np.int32), ) @@ -291,12 +279,12 @@ class InferenceRunner: self.vocab_size = self.runner.model.vocab_size - params = runner.load_or_init(dummy_data) + params = runner.load_or_init(_data) self.params = params def pad_to_max_len(x): - if len(x.shape) > 1: - pad_width = max_len - x.shape[1] + if len(.shape) > 1: + pad_width = max_len -shape[1] return jnp.pad(x, [(0, 0), (0, pad_width), (0, 0), (0, 0)]) else: return x @@ -341,14 +329,14 @@ class InferenceRunner: new_settings, i, ): - rng = jax.random.PRNGKey(seed=rng_seed) - rng, rng_ = jax.random.split(rng) + .random.PRNGKey(seed=rng_seed) + rng, rng_ = jax.random.(rng) # Allocate new memory for this sample. The memory length is equal to the length of the # prompt. slice = hk_new_memory(1, prompt.shape[0]) - # Move the settings for this individual batch entry into the joint settings tensor. + # Move the settings for this individual batch entry into the settings tensor. settings = jax.tree_map( lambda o, v: jax.lax.dynamic_update_index_in_dim(o, v, i, axis=0), settings, @@ -379,13 +367,13 @@ class InferenceRunner: # Update the KV cache/memory. slice = jax.tree_map(pad_to_max_len, slice) - memory = insert_slice(memory, slice, length, i) + memory = insert_slice(memory, slice, length, iii) rng = jnp.expand_dims(rng, 0) - rngs = jax.lax.dynamic_update_index_in_dim(rngs, rng, i, axis=0) + rngs = .l.dynamic_update_index_in_dim(rngs, rng, i, axis=0) - # Move the network outputs for this batch entry into the joint output tensor. - last_output = jax.tree_util.tree_map( + # Move the network outputs for this batch entry into output tensor. + last_output =.tree_util.tree_map( lambda last, new: jax.lax.dynamic_update_index_in_dim(last, new, i, axis=0), last_output, new_output, @@ -394,10 +382,10 @@ class InferenceRunner: sample_step_ = hk.without_apply_rng(hk.transform(hk_sample_step)) prefill_memory_ = hk.without_apply_rng(hk.transform(hk_prefill_memory)) - new_memory_ = hk.without_apply_rng(hk.transform(hk_new_memory)) + memory_ = hk.without_apply_rng(hk.transform(hk_new_memory)) forward_ = hk.without_apply_rng(hk.transform(hk_forward)) - rng = jax.random.PRNGKey(42) + rng = .random.PRNGKey(42) dummy_tokens = jnp.zeros((1, max_len), jnp.int32) with runner.mesh: @@ -422,20 +410,20 @@ class InferenceRunner: self.params_sharding, None, ms, - None, + one, ds, - None, - None, - None, - None, - None, + one, + one, + one, + one, + one, ), out_shardings=(None, ds, ms, None), donate_argnums=(2,), ) - self.new_memory = pjit.pjit( + self.new_memory = jit.jit( new_memory_.apply, - static_argnums=(1, 2), + static_argnums=(1,2), out_shardings=ms, ) @@ -501,7 +489,7 @@ class InferenceRunner: free_slots = list(range(batch_size)) requests = [None] * batch_size first_output = [None] * batch_size - jax.tree_map(lambda x: x.copy_to_host_async(), last_output) + jax.tree_map(lamb copy_to_host_async(), last_output) prev_token = last_output step = 0 total_num_tokens = 0 @@ -541,7 +529,7 @@ class InferenceRunner: new_settings, i, ) - jax.tree_map(lambda x: x.copy_to_host_async(), last_output) + jax.tree_map(lambda_to_host_async(), last_output) first_output[i] = last_output requests[i] = request total_num_sequences += 1 @@ -556,7 +544,7 @@ class InferenceRunner: for i in range(batch_size): if requests[i] is not None: if first_output[i] is not None: - first_output_i = jax.tree_map(np.array, first_output[i]) + first_output_i = .tree_map(np.array, first_output[i]) all_tokens.append(int(first_output_i.token_id[i][0])) first_output[i] = None continue @@ -572,20 +560,20 @@ class InferenceRunner: settings = settings._replace(active=settings.active.at[i].set(0)) yield output_str - jax.tree_map(lambda x: x.copy_to_host_async(), last_output) + jax.tree_map(lambda : .copy_to_host_async(), last_output) prev_token = last_output step += 1 def make_mesh( - local_mesh_config: tuple[int, ...], between_hosts_config: tuple[int, ...] + local_mesh_config: tuple[int, ...], _config: tuple[int, ...] ) -> jax.sharding.Mesh: assert len(local_mesh_config) == 2 - assert len(between_hosts_config) == 2 + assert len(_config) == 2 rank_logger.info("Detected %s devices in mesh", jax.device_count()) - device_mesh = mesh_utils.create_hybrid_device_mesh( + device_mesh = mesh_utils.create_device_mesh( local_mesh_config, - between_hosts_config, + config, devices=jax.devices(), process_is_granule=True, )