Skip to content

Commit 6584cb9

Browse files
committed
2 parents aa6cd2e + 52a3c71 commit 6584cb9

25 files changed

+4697
-131
lines changed

ipfs_datasets_py/ipfs_datasets.py

Lines changed: 962 additions & 129 deletions
Large diffs are not rendered by default.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .ipfs_embeddings import ipfs_embeddings_py
2+
from .ipfs_only_hash import ipfs_only_hash_py
3+
from .ipfs_multiformats import ipfs_multiformats_py
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
from .ipfs_multiformats import *
2+
from .ipfs_only_hash import *
3+
import asyncio
4+
import subprocess
5+
import os
6+
import datasets
7+
8+
class ipfs_embeddings_py:
9+
def __init__(self, resources, metedata):
10+
self.multiformats = ipfs_multiformats_py(resources, metedata)
11+
self.ipfs_only_hash = ipfs_only_hash_py(resources, metedata)
12+
self.tei_https_endpoints = {}
13+
self.libp2p_endpoints = {}
14+
self.cid_queue = iter([])
15+
self.knn_queue = iter([])
16+
self.cid_index = {}
17+
self.knn_index = {}
18+
self.endpoint_status = {}
19+
return None
20+
21+
def load_index(self, index):
22+
self.index = index
23+
return None
24+
25+
def add_tei_https_endpoint(self, model, endpoint, batch_size):
26+
if model not in self.tei_https_endpoints:
27+
self.tei_https_endpoints[model] = {}
28+
if endpoint not in self.tei_https_endpoints[model]:
29+
self.tei_https_endpoints[model][endpoint] = batch_size
30+
return None
31+
32+
def add_libp2p_endpoint(self, model, endpoint, batch_size):
33+
if model not in self.libp2p_endpoints:
34+
self.libp2p_endpoints[model] = {}
35+
if endpoint not in self.libp2p_endpoints[model]:
36+
self.libp2p_endpoints[model][endpoint] = batch_size
37+
return None
38+
39+
def rm_tei_https_endpoint(self, model, endpoint):
40+
if model in self.tei_https_endpoints and endpoint in self.tei_https_endpoints[model]:
41+
del self.tei_https_endpoints[model][endpoint]
42+
del self.endpoint_status[endpoint]
43+
return None
44+
45+
def rm_libp2p_endpoint(self, model, endpoint):
46+
if model in self.libp2p_endpoints and endpoint in self.libp2p_endpoints[model]:
47+
del self.libp2p_endpoints[model][endpoint]
48+
del self.endpoint_status[endpoint]
49+
return None
50+
51+
def test_tei_https_endpoint(self, model, endpoint):
52+
if model in self.tei_https_endpoints and endpoint in self.tei_https_endpoints[model]:
53+
return True
54+
return False
55+
56+
def test_libp2p_endpoint(self, model, endpoint):
57+
if model in self.libp2p_endpoints and endpoint in self.libp2p_endpoints[model]:
58+
return True
59+
return False
60+
61+
def get_tei_https_endpoint(self, model):
62+
if model in self.tei_https_endpoints:
63+
return self.tei_https_endpoints[model]
64+
return None
65+
66+
def request_tei_https_endpoint(self, model, batch_size):
67+
if model in self.tei_https_endpoints:
68+
for endpoint in self.tei_https_endpoints[model]:
69+
if self.endpoint_status[endpoint] == 1:
70+
return endpoint
71+
return None
72+
73+
def index_ipfs(self, samples):
74+
if type(samples) is None:
75+
raise ValueError("samples must be a list")
76+
if type(samples) is str:
77+
samples = [samples]
78+
if type(samples) is iter:
79+
for this_sample in samples:
80+
this_sample_cid = self.multiformats.get_cid(this_sample)
81+
self.cid_index[this_sample_cid] = this_sample
82+
pass
83+
if type(samples) is list:
84+
for this_sample in samples:
85+
this_sample_cid = self.multiformats.get_cid(this_sample)
86+
self.cid_index[this_sample_cid] = this_sample
87+
return None
88+
89+
def index_knn(self, samples):
90+
if type(samples) is None:
91+
raise ValueError("samples must be a list")
92+
if type(samples) is str:
93+
samples = [samples]
94+
if type(samples) is iter:
95+
for this_sample in samples:
96+
this_sample_cid = self.multiformats.get_cid(this_sample)
97+
self.knn_index[this_sample_cid] = this_sample
98+
pass
99+
if type(samples) is list:
100+
for this_sample in samples:
101+
this_sample_cid = self.multiformats.get_cid(this_sample)
102+
self.knn_index[this_sample_cid] = this_sample
103+
return None
104+
105+
def queue_index_cid(self, samples):
106+
if type(samples) is None:
107+
raise ValueError("samples must be a list")
108+
if type(samples) is str:
109+
samples = [samples]
110+
if type(samples) is iter:
111+
for this_sample in samples:
112+
self.cid_queue.append(this_sample)
113+
pass
114+
if type(samples) is list:
115+
for this_sample in samples:
116+
self.cid_queue.append(this_sample)
117+
118+
return None
119+
120+
def choose_endpoint(self):
121+
filtered_endpoints = {}
122+
filtered_endpoints = {k: v for k, v in self.endpoint_status.items() if v == 1}
123+
if len(filtered_endpoints) == 0:
124+
return None
125+
else:
126+
return filtered_endpoints
127+
128+
def https_index_cid(self, samples, endpoint):
129+
endpoint_chunk_size = self.tei_https_endpoints[endpoint]
130+
all_chunk = []
131+
this_chunk = []
132+
for i in range(samples):
133+
self
134+
## request endpoint
135+
pass
136+
return None
137+
138+
def pop_https_index_cid(self, samples):
139+
140+
choose_endpoint = self.choose_endpoint()
141+
endpoint_chunk_size = self.tei_https_endpoints[choose_endpoint]
142+
all_chunk = []
143+
this_chunk = []
144+
for i in range(samples):
145+
this_chunk.append(self.cid_queue.pop())
146+
if i % endpoint_chunk_size == 0:
147+
all_chunk.append(this_chunk)
148+
this_chunk = []
149+
150+
151+
def test(self):
152+
self.add_tei_https_endpoint("BAAI/bge-m3", "62.146.169.111:80/embed",1)
153+
self.add_tei_https_endpoint("BAAI/bge-m3", "62.146.169.111:8080/embed",1)
154+
self.add_tei_https_endpoint("BAAI/bge-m3", "62.146.168.111:8081/embed",1)
155+
test_knn_index = {}
156+
test_cid_index = {}
157+
test_data = {
158+
"test1", "test2", "test3"
159+
}
160+
161+
for data in test_data:
162+
test_cid_index = self.index_ipfs(data)
163+
test_knn_index = self.index_knn(data)
164+
165+
166+
print("test")
167+
168+
def status(self):
169+
return self.endpointStatus
170+
171+
def setStatus(self,endpoint , status):
172+
self.endpointStatus[endpoint] = status
173+
return None
174+
175+
if __name__ == '__main__':
176+
resources = {}
177+
metedata = {}
178+
ipfs_embeddings = ipfs_embeddings_py(resources, metedata)
179+
ipfs_embeddings.test()
180+
print("test")
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import hashlib
2+
from multiformats import CID, multihash
3+
4+
class ipfs_multiformats_py:
5+
def __init__(self, resources, metadata):
6+
self.multihash = multihash
7+
return None
8+
9+
# Step 1: Hash the file content with SHA-256
10+
def get_file_sha256(self, file_path):
11+
hasher = hashlib.sha256()
12+
with open(file_path, 'rb') as f:
13+
while chunk := f.read(8192):
14+
hasher.update(chunk)
15+
return hasher.digest()
16+
17+
# Step 2: Wrap the hash in Multihash format
18+
def get_multihash_sha256(self, file_content_hash):
19+
mh = self.multihash.wrap(file_content_hash, 'sha2-256')
20+
return mh
21+
22+
# Step 3: Generate CID from Multihash (CIDv1)
23+
def get_cid(self, file_path):
24+
file_content_hash = self.get_file_sha256(file_path)
25+
mh = self.get_multihash_sha256(file_content_hash)
26+
cid = CID('base32', 'raw', mh)
27+
return str(cid)
28+
29+
30+
if __name__ == '__main__':
31+
ipfs_multiformats = ipfs_multiformats_py()
32+
file_path = 'path_to_your_file'
33+
cid = ipfs_multiformats.get_cid(file_path)
34+
print(f"CID: {cid}")
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import os
2+
import subprocess
3+
4+
class ipfs_only_hash_py:
5+
def __init__(self, resources, metadata):
6+
7+
return None
8+
9+
def __call__(self, file_path):
10+
absolute_path = os.path.abspath(file_path)
11+
ipfs_hash_cmd = "bash -c 'npx ipfs-only-hash " + absolute_path
12+
ipfs_hash = subprocess.check_output(ipfs_hash_cmd, shell=True).decode('utf-8').strip()
13+
return ipfs_hash
14+
15+
def __test__(self):
16+
test_file_path = "test.txt"
17+
test_ipfs_hash = self(test_file_path)
18+
print(test_ipfs_hash)
19+
return None
20+
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
2+
from datasets import load_dataset
3+
import ipfs_embeddings_py.ipfs_embeddings as ipfs_embeddings
4+
5+
class test_ipfs_embeddings:
6+
def __init__(self):
7+
resources = {}
8+
metadata = {}
9+
self.dataset = {}
10+
self.ipfs_embeddings = ipfs_embeddings.ipfs_embeddings_py(resources, metadata)
11+
return None
12+
13+
def process(self, dataset, output):
14+
num_rows = dataset.num_rows['data']
15+
processed_data = {}
16+
for i in range(num_rows):
17+
row = dataset['data'][i]
18+
data = row['data']
19+
processed_data[row] = self.ipfs_embeddings.add_tei_https_queue(data, self.callback())
20+
return None
21+
22+
def callback(self, data):
23+
return None
24+
25+
26+
def test(self):
27+
load_these_datasets = ["laion/Wikipedia-X", "laion/Wikipedia-X-Full", "laion/Wikipedia-X-Concat", "laion/Wikipedia-X-M3"]
28+
self.dataset = load_dataset(load_these_datasets[0])
29+
print(len(self.dataset))
30+
self.ipfs_embeddings
31+
return None
32+
33+
if __name__ == '__main__':
34+
test = test_ipfs_embeddings()
35+
test.test()
36+
print("Test passed")
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import os
2+
import torch.nn.functional as F
3+
from torch import inference_mode, float16, Tensor
4+
from transformers import AutoTokenizer, AutoModelForCausalLM, StoppingCriteriaList
5+
from transformers.generation.streamers import TextStreamer
6+
from cloudkit_worker import dispatch_result
7+
from sentence_transformers import SentenceTransformer
8+
from InstructorEmbedding import INSTRUCTOR
9+
from FlagEmbedding import FlagModel
10+
import json
11+
12+
embedding_models = [
13+
"text-embedding-ada-002",
14+
"gte-large",
15+
"gte-base",
16+
"gte-small",
17+
"gte-tiny",
18+
"bge-small-en-v1.5",
19+
"bge-base-en-v1.5",
20+
"bge-large-en-v1.5",
21+
"instructor-base",
22+
"instructor-large",
23+
"instructor-xl",
24+
"UAE-Large-V1"
25+
]
26+
27+
class hf_embed:
28+
29+
def __init__(self, resources, meta):
30+
self.modelName = meta['modelName']
31+
self.hf_embed = self.embed
32+
self.instruct_embed = self.embed
33+
if "gte" in resources['checkpoint']:
34+
self.tokenizer = AutoTokenizer.from_pretrained(resources['checkpoint'])
35+
if "instructor" in resources['checkpoint']:
36+
self.model = INSTRUCTOR(resources['checkpoint'])
37+
elif "gte" in resources['checkpoint']:
38+
self.model = SentenceTransformer(
39+
resources['checkpoint']
40+
)
41+
elif "bge" in resources['checkpoint']:
42+
self.model = None
43+
44+
45+
def __call__(self, method, **kwargs):
46+
if method == 'hf_embed':
47+
return self.embed(**kwargs)
48+
elif method == 'instruct_embed':
49+
return self.embed(**kwargs)
50+
else:
51+
raise Exception('unknown method: %s' % method)
52+
53+
def embed(self, instruction, text , **kwargs):
54+
self.input = text
55+
self.method = 'embed'
56+
embeddings = None
57+
if "instructor" in self.modelName:
58+
embeddings = self.model.encode([[instruction,self.input]])
59+
print(embeddings)
60+
if "gte" in self.modelName:
61+
embeddings = self.model.encode([self.input])
62+
print(embeddings)
63+
if "bge" in self.modelName:
64+
if self.model == None:
65+
self.model = FlagModel(
66+
'BAAI/'+self.modelName, query_instruction_for_retrieval=instruction,
67+
use_fp16=True
68+
)
69+
embeddings = self.model.encode(str(self.input))
70+
print(embeddings)
71+
72+
if type(embeddings) != str:
73+
embeddings = json.dumps(embeddings.tolist())
74+
75+
return {
76+
'text': embeddings,
77+
'done': True
78+
}
79+
80+
def average_pool(last_hidden_states: Tensor, attention_mask: Tensor) -> Tensor:
81+
last_hidden = last_hidden_states.masked_fill(~attention_mask[..., None].bool(), 0.0)
82+
return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None]
83+
84+
85+
86+
def test():
87+
cwd = os.getcwd()
88+
dir = os.path.dirname(__file__)
89+
grandparent = os.path.dirname(dir)
90+
models = os.path.join(grandparent, "models")
91+
checkpoint = 'bge-base-en-v1.5'
92+
resources = {}
93+
resources['checkpoint'] = models + "/" + checkpoint + "@hf"
94+
95+
print(resources["checkpoint"])
96+
meta = {"modelName":"bge-base-en-v1.5"}
97+
text = "sample text to embed"
98+
model = "bge-base-en-v1.5"
99+
instruction = "Represent this sentence for searching relevant passages:"
100+
embed = hf_embed(resources, meta)
101+
results = embed.embed(instruction, text)
102+
print(results)
103+
return results
104+
105+
if __name__ == '__main__':
106+
test()
107+
# pass

0 commit comments

Comments
 (0)