mirror of https://github.com/vacp2p/nim-libp2p.git
Workaround for bug https://github.com/libp2p/go-libp2p-pubsub/issues/130.
This commit is contained in:
parent
39deeca600
commit
0b807e7ee5
|
@ -482,34 +482,38 @@ proc closeConnection*(api: DaemonAPI, transp: StreamTransport) {.async.} =
|
||||||
transp.close()
|
transp.close()
|
||||||
await transp.join()
|
await transp.join()
|
||||||
|
|
||||||
proc socketExists(filename: string): bool =
|
when not defined(windows):
|
||||||
var res: Stat
|
proc socketExists(filename: string): bool =
|
||||||
result = stat(filename, res) >= 0'i32
|
var res: Stat
|
||||||
|
result = stat(filename, res) >= 0'i32
|
||||||
|
|
||||||
proc loggingHandler(api: DaemonAPI): Future[void] =
|
proc loggingHandler(api: DaemonAPI): Future[void] =
|
||||||
var retFuture = newFuture[void]("logging.handler")
|
var retFuture = newFuture[void]("logging.handler")
|
||||||
var loop = getGlobalDispatcher()
|
var loop = getGlobalDispatcher()
|
||||||
let pfd = SocketHandle(api.process.outputHandle)
|
let pfd = SocketHandle(api.process.outputHandle)
|
||||||
var fd = AsyncFD(pfd)
|
var fd = AsyncFD(pfd)
|
||||||
if not setSocketBlocking(pfd, false):
|
if not setSocketBlocking(pfd, false):
|
||||||
discard close(cint(pfd))
|
discard close(cint(pfd))
|
||||||
raiseOsError(osLastError())
|
retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
|
||||||
register(AsyncFD(pfd))
|
|
||||||
|
|
||||||
proc readOutputLoop(udata: pointer) {.gcsafe.} =
|
proc readOutputLoop(udata: pointer) {.gcsafe.} =
|
||||||
var buffer: array[2048, char]
|
var buffer: array[2048, char]
|
||||||
let res = posix.read(cint(fd), addr buffer[0], 2000)
|
let res = posix.read(cint(fd), addr buffer[0], 2000)
|
||||||
if res == -1 or res == 0:
|
if res == -1 or res == 0:
|
||||||
removeReader(fd)
|
removeReader(fd)
|
||||||
retFuture.complete()
|
retFuture.complete()
|
||||||
else:
|
else:
|
||||||
var cstr = cast[cstring](addr buffer[0])
|
var cstr = cast[cstring](addr buffer[0])
|
||||||
api.log.add(cstr)
|
api.log.add(cstr)
|
||||||
# let offset = len(api.log)
|
register(AsyncFD(pfd))
|
||||||
# api.log.setLen(offset + res)
|
addReader(fd, readOutputLoop, nil)
|
||||||
# copyMem(addr api.log[offset], addr buffer[0], res)
|
result = retFuture
|
||||||
addReader(fd, readOutputLoop, nil)
|
else:
|
||||||
result = retFuture
|
proc socketExists(filename: string): bool = false
|
||||||
|
|
||||||
|
proc loggingHandler(api: DaemonAPI): Future[void] =
|
||||||
|
# Not ready yet.
|
||||||
|
discard
|
||||||
|
|
||||||
proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
|
proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
|
||||||
bootstrapNodes: seq[string] = @[],
|
bootstrapNodes: seq[string] = @[],
|
||||||
|
|
|
@ -48,13 +48,16 @@ proc provideBadCidTest(): Future[bool] {.async.} =
|
||||||
finally:
|
finally:
|
||||||
await api.close()
|
await api.close()
|
||||||
|
|
||||||
# proc getOnlyIPv4Addresses(addresses: seq[MultiAddress]): seq[MultiAddress] =
|
proc getOnlyOneIPv4Address(addresses: seq[MultiAddress]): seq[MultiAddress] =
|
||||||
# if len(addresses) > 0:
|
## We doing this becuase of bug in `go-pubsub`
|
||||||
# result = newSeqOfCap[MultiAddress](len(addresses))
|
## https://github.com/libp2p/go-libp2p-pubsub/issues/130
|
||||||
# let ip4 = multiCodec("ip4")
|
if len(addresses) > 0:
|
||||||
# for item in addresses:
|
result = newSeqOfCap[MultiAddress](len(addresses))
|
||||||
# if item.protoCode() == ip4:
|
let ip4 = multiCodec("ip4")
|
||||||
# result.add(item)
|
for item in addresses:
|
||||||
|
if item.protoCode() == ip4:
|
||||||
|
result.add(item)
|
||||||
|
break
|
||||||
|
|
||||||
proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
|
proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
|
||||||
var pubsubData = "TEST MESSAGE"
|
var pubsubData = "TEST MESSAGE"
|
||||||
|
@ -69,11 +72,6 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
|
||||||
|
|
||||||
var resultsCount = 0
|
var resultsCount = 0
|
||||||
|
|
||||||
# var topics10 = await api1.pubsubGetTopics()
|
|
||||||
# var peers10 = await api1.pubsubListPeers("test-topic")
|
|
||||||
# var topics20 = await api2.pubsubGetTopics()
|
|
||||||
# var peers20 = await api2.pubsubListPeers("test-topic")
|
|
||||||
|
|
||||||
var handlerFuture1 = newFuture[void]()
|
var handlerFuture1 = newFuture[void]()
|
||||||
var handlerFuture2 = newFuture[void]()
|
var handlerFuture2 = newFuture[void]()
|
||||||
|
|
||||||
|
@ -98,8 +96,9 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
|
||||||
result = false
|
result = false
|
||||||
|
|
||||||
# Not subscribed to any topics everything must be 0.
|
# Not subscribed to any topics everything must be 0.
|
||||||
await api1.connect(id2.peer, id2.addresses)
|
# We are making only one connection, because of bug
|
||||||
await api2.connect(id1.peer, id1.addresses)
|
# https://github.com/libp2p/go-libp2p-pubsub/issues/130
|
||||||
|
await api1.connect(id2.peer, getOnlyOneIPv4Address(id2.addresses))
|
||||||
|
|
||||||
var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1)
|
var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1)
|
||||||
var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2)
|
var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2)
|
||||||
|
@ -143,6 +142,6 @@ when isMainModule:
|
||||||
test "GossipSub test":
|
test "GossipSub test":
|
||||||
check:
|
check:
|
||||||
waitFor(pubsubTest({PSGossipSub})) == true
|
waitFor(pubsubTest({PSGossipSub})) == true
|
||||||
# test "FloodSub test":
|
test "FloodSub test":
|
||||||
# check:
|
check:
|
||||||
# waitFor(pubsubTest({PSFloodSub})) == true
|
waitFor(pubsubTest({PSFloodSub})) == true
|
||||||
|
|
Loading…
Reference in New Issue