First of all, I'd probably name that operator >=<, since >=> is Kleisli composition in Control.Monad.<br><br>Second, you're going to need new threads for this, since you'll be reading from two sources concurrently. This isn't as big a problem as you might think, because Haskell threads are dirt cheap, orders of magnitude cheaper than pthread threads. If you're using multiple threads with conduits, I just wrote a library to help you out with that! As Michael already mentioned, stm-conduit could do this synchronization for you. This turns your >=< function into:<br>
<br>infixl 5 >=<<br>(>=<) :: ResourceIO m<br> => Source m a<br> -> Source m a<br> -> ResourceT m (Source m a)<br>sa >=< sb = do c <- liftIO . atomically $ newTMChan<br> _ <- resourceForkIO $ sa $$ sinkTMChan c<br>
_ <- resourceForkIO $ sb $$ sinkTMChan c<br> return $ sourceTMChan c<br><br>which returns a new source, combining two sources.<br><br>This can further be generalized to combining any number of sources:<br>
<br>mergeSources :: ResourceIO m<br> => [Source m a]<br> -> ResourceT m (Source m a)<br>mergeSources sx = do c <- liftIO . atomically $ newTMChan<br> mapM_ (\s -> resourceForkIO $ s $$ sinkTMChan c) sx<br>
return $ sourceTMChan c<br><br>Hope this helps somewhat,<br> - clark<br><br>On Tue, Feb 28, 2012 at 11:04 AM, Alexander V Vershilov <<a href="mailto:alexander.vershilov@gmail.com">alexander.vershilov@gmail.com</a>> wrote:<br>
><br>> Hello, cafe.<br>><br>> Is it possible to read data from different concurrent sources,<br>> i.e. read data from source as soon as it become avaliable, e.g.<br>><br>> runResourceT $ (source1 stdin $= CL.map Left)<br>
> >=> (source2 handle $= CL.map Right)<br>> $= application<br>> $$ sink<br>> where >=> - stands for concurrent combining of sources<br>><br>> It would be good if it can be sources of different types (handle or<br>
> STM channel, etc..).<br>><br>> Currently I've found no good way to handle with this situation,<br>> except of using STM Channels for collecting data<br>><br>> source1 ---+ |<br>> | sink | output sink<br>
> +---] Channel [-------> application----->]<br>> | source<br>> source2 ---+ |<br>><br>> From this point of view application takes concurent data, but this<br>
> implementation requires additional thread per data processing. Also<br>> in many cases it will require run additional runResourceT (see later<br>> example).<br>><br>> So if there any possible simplifications? Or ideas how to make (>=>)<br>
> operator.<br>><br>> Example:<br>><br>> So I've got next code in my network-conduit based application:<br>><br>> main :: IO ()<br>> main = do<br>> pool <- createDBPool "..." 10<br>
> let r = ServerInit pool<br>> forkIO $ forever clientConsole --read channel list and send "Left"<br>> flip runReaderT r $<br>> runTCPServer (ServerSettings 3500 Nothing) (protoServer)<br>
><br>> myServer src sink = do<br>> ch <- liftIO $ atomically $ newTBMChan 16<br>> initState <- lift $ ask<br>> _ <- liftIO $ fork . (flip runReaderT initState) $<br>> runResourceT $ src $= C.sequence decode<br>
> $= CL.map Right $$ sinkTBMChan ch<br>> sourceTBMChan ch<br>> $= process $= C.sequence encode $$ sinkHandle stdout<br>><br>> But in this situation I don't know if freeing of all resources are guaranteed,<br>
> because I'm running additional resourceT in main resourceT scope.<br>><br>> So can you advice is it possible to make concurrent sources now with currenly<br>> implemented library?<br>> If it's not possible but worth of implementing, so I can make that functions?<br>
> Is it correct to runResourceT inside another resourceT?<br>><br>> --<br>> Best regards,<br>> Alexander V Vershilov<br>><br>> _______________________________________________<br>> Haskell-Cafe mailing list<br>
> <a href="mailto:Haskell-Cafe@haskell.org">Haskell-Cafe@haskell.org</a><br>> <a href="http://www.haskell.org/mailman/listinfo/haskell-cafe">http://www.haskell.org/mailman/listinfo/haskell-cafe</a><br>><br>