module Lwt_stream:sig..end
type 'a t
'a.
Naming convention: in this module, all functions applying a function to each element of a stream are suffixed by:
_s when the function returns a thread and calls are serialised_p when the function returns a thread and calls are parallelisedLwt_stream. In
the meantime, you may want to consider using alternatives such as
lwt-pipe.val from : (unit -> 'a option Lwt.t) -> 'a tfrom f creates a stream from the given input function. f is
called each time more input is needed, and the stream ends when
f returns None.
If f, or the thread produced by f, raises an exception, that exception
is forwarded to the consumer of the stream (for example, a caller of
Lwt_stream.get). Note that this does not end the stream. A subsequent attempt to
read from the stream will cause another call to f, which may succeed
with a value.
val from_direct : (unit -> 'a option) -> 'a tfrom_direct f does the same as Lwt_stream.from but with a function
that does not return a thread. It is preferred that this
function be used rather than wrapping f into a function which
returns a thread.
The behavior when f raises an exception is the same as for Lwt_stream.from,
except that f does not produce a thread.
exception Closed
= None) has been
pushed.val create : unit -> 'a t * ('a option -> unit)create () returns a new stream and a push function.
To notify the stream's consumer of errors, either use a separate
communication channel, or use a
result stream. There is no way to push an exception into a
push-stream.
val create_with_reference : unit -> 'a t * ('a option -> unit) * ('b -> unit)create_with_reference () returns a new stream and a push
function. The last function allows a reference to be set to an
external source. This prevents the external source from being
garbage collected.
For example, to convert a reactive event to a stream:
let stream, push, set_ref = Lwt_stream.create_with_reference () in
set_ref (map_event push event)
exception Full
class type['a]bounded_push =object..end
val create_bounded : int -> 'a t * 'a bounded_pushcreate_bounded size returns a new stream and a bounded push
source. The stream can hold a maximum of size elements. When
this limit is reached, pushing a new element will block until
one is consumed.
Note that you cannot clone or parse (with Lwt_stream.parse) a bounded
stream. These functions will raise Invalid_argument if you try
to do so.
It raises Invalid_argument if size < 0.
val of_list : 'a list -> 'a tof_list l creates a stream returning all elements of l. The elements are
pushed into the stream immediately, resulting in a closed stream (in the
sense of Lwt_stream.is_closed).val of_array : 'a array -> 'a tof_array a creates a stream returning all elements of a. The elements
are pushed into the stream immediately, resulting in a closed stream (in the
sense of Lwt_stream.is_closed).val of_string : string -> char tof_string str creates a stream returning all characters of str. The
characters are pushed into the stream immediately, resulting in a closed
stream (in the sense of Lwt_stream.is_closed).val clone : 'a t -> 'a tclone st clone the given stream. Operations on each stream
will not affect the other.
For example:
# let st1 = Lwt_stream.of_list [1; 2; 3];;
val st1 : int Lwt_stream.t = <abstr>
# let st2 = Lwt_stream.clone st1;;
val st2 : int Lwt_stream.t = <abstr>
# lwt x = Lwt_stream.next st1;;
val x : int = 1
# lwt y = Lwt_stream.next st2;;
val y : int = 1
It raises Invalid_argument if st is a bounded
push-stream.
val to_list : 'a t -> 'a list Lwt.tval to_string : char t -> string Lwt.texception Empty
val peek : 'a t -> 'a option Lwt.tpeek st returns the first element of the stream, if any,
without removing it.val npeek : int -> 'a t -> 'a list Lwt.tnpeek n st returns at most the first n elements of st,
without removing them.val get : 'a t -> 'a option Lwt.tget st removes and returns the first element of the stream, if
any.val nget : int -> 'a t -> 'a list Lwt.tnget n st removes and returns at most the first n elements of
st.val get_while : ('a -> bool) -> 'a t -> 'a list Lwt.t
val get_while_s : ('a -> bool Lwt.t) -> 'a t -> 'a list Lwt.tget_while f st returns the longest prefix of st where all
elements satisfy f.val next : 'a t -> 'a Lwt.tnext st removes and returns the next element of the stream or
fails with Lwt_stream.Empty, if the stream is empty.val last_new : 'a t -> 'a Lwt.tlast_new st returns the last element that can be obtained
without sleeping, or wait for one if none is available.
It fails with Lwt_stream.Empty if the stream has no more elements.
val junk : 'a t -> unit Lwt.tjunk st removes the first element of st.val njunk : int -> 'a t -> unit Lwt.tnjunk n st removes at most the first n elements of the
stream.val junk_while : ('a -> bool) -> 'a t -> unit Lwt.t
val junk_while_s : ('a -> bool Lwt.t) -> 'a t -> unit Lwt.tjunk_while f st removes all elements at the beginning of the
streams which satisfy f.val junk_old : 'a t -> unit Lwt.tjunk_old st removes all elements that are ready to be read
without yielding from st.
For example, the read_password function of Lwt_read_line
uses it to flush keys previously typed by the user.
val get_available : 'a t -> 'a listget_available st returns all available elements of l without
blocking.val get_available_up_to : int -> 'a t -> 'a listget_available_up_to n st returns up to n elements of l
without blocking.val is_empty : 'a t -> bool Lwt.tis_empty st returns whether the given stream is empty.val is_closed : 'a t -> boolis_closed st returns whether the given stream has been closed. A closed
stream is not necessarily empty. It may still contain unread elements. If
is_closed s = true, then all subsequent reads until the end of the
stream are guaranteed not to block.val closed : 'a t -> unit Lwt.tclosed st returns a thread that will sleep until the stream has been
closed.val on_termination : 'a t -> (unit -> unit) -> unitLwt_stream.closed.on_termination st f executes f when the end of the stream st
is reached. Note that the stream may still contain elements if
Lwt_stream.peek or similar was used.val on_terminate : 'a t -> (unit -> unit) -> unit
For example:
# let st1 = Lwt_stream.of_list [1; 2; 3];;
val st1 : int Lwt_stream.t = <abstr>
# let st2 = Lwt_stream.map string_of_int st1;;
val st2 : string Lwt_stream.t = <abstr>
# lwt x = Lwt_stream.next st1;;
val x : int = 1
# lwt y = Lwt_stream.next st2;;
val y : string = "2"
val choose : 'a t list -> 'a tchoose l creates an stream from a list of streams. The
resulting stream will return elements returned by any stream of
l in an unspecified order.val map : ('a -> 'b) -> 'a t -> 'b t
val map_s : ('a -> 'b Lwt.t) -> 'a t -> 'b tmap f st maps the value returned by st with fval filter : ('a -> bool) -> 'a t -> 'a t
val filter_s : ('a -> bool Lwt.t) -> 'a t -> 'a tfilter f st keeps only values, x, such that f x is trueval filter_map : ('a -> 'b option) -> 'a t -> 'b t
val filter_map_s : ('a -> 'b option Lwt.t) -> 'a t -> 'b tfilter_map f st filter and map st at the same timeval map_list : ('a -> 'b list) -> 'a t -> 'b t
val map_list_s : ('a -> 'b list Lwt.t) -> 'a t -> 'b tmap_list f st applies f on each element of st and flattens
the lists returnedval fold : ('a -> 'b -> 'b) -> 'a t -> 'b -> 'b Lwt.t
val fold_s : ('a -> 'b -> 'b Lwt.t) -> 'a t -> 'b -> 'b Lwt.tfold f s x fold_like function for streams.val iter : ('a -> unit) -> 'a t -> unit Lwt.t
val iter_p : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t
val iter_s : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.titer f s iterates over all elements of the stream.val find : ('a -> bool) -> 'a t -> 'a option Lwt.t
val find_s : ('a -> bool Lwt.t) -> 'a t -> 'a option Lwt.tfind f s find an element in a stream.val find_map : ('a -> 'b option) -> 'a t -> 'b option Lwt.t
val find_map_s : ('a -> 'b option Lwt.t) -> 'a t -> 'b option Lwt.tfind_map f s find and map at the same time.val combine : 'a t -> 'b t -> ('a * 'b) tcombine s1 s2 combines two streams. The stream will end when
either stream ends.val append : 'a t -> 'a t -> 'a tappend s1 s2 returns a stream which returns all elements of
s1, then all elements of s2val concat : 'a t t -> 'a tconcat st returns the concatenation of all streams of st.val flatten : 'a list t -> 'a tflatten st = map_list (fun l -> l) stval wrap_exn : 'a t -> 'a Lwt.result twrap_exn s is a stream s' such that each time s yields a value v,
s' yields Result.Ok v, and when the source of s raises an exception
e, s' yields Result.Error e.
Note that push-streams (as returned by Lwt_stream.create) never raise exceptions.
If the stream source keeps raising the same exception e each time the
stream is read, s' is unbounded. Reading it will produce Result.Error e
indefinitely.
Since 2.7.0
val parse : 'a t -> ('a t -> 'b Lwt.t) -> 'b Lwt.tparse st f parses st with f. If f raise an exception,
st is restored to its previous state.
It raises Invalid_argument if st is a bounded
push-stream.
val hexdump : char t -> string thexdump byte_stream returns a stream which is the same as the
output of hexdump -C.
Basically, here is a simple implementation of hexdump -C:
let () = Lwt_main.run (Lwt_io.write_lines Lwt_io.stdout (Lwt_stream.hexdump (Lwt_io.read_lines Lwt_io.stdin)))
type 'a result =
| |
Value of |
|||
| |
Error of |
(* | *) |
val map_exn : 'a t -> 'a result tLwt_stream.wrap_exn.map_exn s returns a stream that captures all exceptions raised
by the source of the stream (the function passed to Lwt_stream.from).
Note that for push-streams (as returned by Lwt_stream.create) all
elements of the mapped streams are values.
If the stream source keeps raising the same exception e each time the
stream is read, the stream produced by map_exn is unbounded. Reading it
will produce Lwt_stream.Error e indefinitely.