Akash
09/10/2019, 1:58 PMJerry Thomas
09/10/2019, 2:25 PMdef get_stream_data():
consumer = KafkaConsumer("my-topic",
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('ascii')),
consumer_timeout_ms=1000)
messages = [messages.value for messages in consumer]
if len(messages) == 0:
raise prefect.engine.signals.SKIP("No data received")
return messages
def process_itemwise(data):
data["result"] = 1
return data
def publish_result(data):
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.dumps(m).encode('ascii')))
producer.send("results", data)
producer.flush()
with Flow("custom") as flow:
data = get_stream_data()
result = process_itemwise.map(data)
publish_result.map(result)
while True:
flow.run()
Aakarsh Nadella
09/10/2019, 4:19 PMTune
?Joe Schmid
09/10/2019, 8:19 PMIke
09/11/2019, 4:35 PMJoe Schmid
09/12/2019, 6:32 PMwith Flow("ScaleDownDaskOffHours", environment=env, schedule=schedule) as flow:
config = get_dask_scheduler_config()
new_config = reset_config(config)
save_results = save_new_config(new_config)
save_results.set_upstream(None, flow)
ifelse(new_config != config, save_results, None)
I'm getting the warning PrefectWarning: One of the tasks passed to the switch condition has upstream dependencies: <Task: save_new_config>. Those upstream tasks could run even if the switch condition fails, which might cause unexpected results.
I thought the save_results.set_upstream(None, flow)
line would take care of that but clearly not. What's the right way to address that?David Norrish
09/13/2019, 1:14 AMfilepaths = get_filepath(directory)
# return a list of file paths
outpaths = get_output_paths(filepaths)
# return a corresponding list of paths to write to after cleaning
dataframes = read_file.map(filepaths)
# read files into memory in parallel
cleaned_dfs = clean_dataframe.map(dataframes)
# clean dataframes in parallel
write_dataframe.map(cleaned_dfs, outpaths)
# Write dataframes to file in parallel
I have 16 cores and ~400 files and am using the DaskExecutor. During execution, ALL files are read into memory before any of the cleaning child task starts, then all cleaning tasks are performed before any writing to file commences. This means the memory is being flooded as each file is ~2GB. It's kind of like a breadth-first approach, whereas I'd like it to run depth-first: read 16 files into memory at once, clean them and write to file, then a core can free the memory and read in the next file.
Is there a way to achieve this, possibly with triggers? The only workaround I've been able to think up is to write a single big task that does the reading/cleaning/writing steps - but I feel like that's kind of not the point of Prefect.
Huge thanks in advance if anyone can help!Matt Harvey
09/13/2019, 6:34 PMPrefect does not support sub-minute schedules.Is this a hard limit? I have the need to run a Prefect Flow secondly for a minute (and then stop). š
Brett Naul
09/13/2019, 7:32 PMMatt Harvey
09/14/2019, 3:11 PMCronClock
needs to be set in UTC. Not a big deal, but is it possible to override the timezone? šStewart Webb
09/15/2019, 12:24 PMAakarsh Nadella
09/16/2019, 9:13 PMModuleNotFoundError: No module named 'prefect'
. I have installed all the dependencies that need to be installed by Dask-worker and Dask-jupyter using yaml file.Argemiro Neto
09/18/2019, 4:28 PM==== START ====
=== starting the flow
== generate called
= status: [1, 2, 3]
== task runner called with 1
== task runner called with 2
== task runner called with 3
==== END ====
Code:
@task
def generate_task_list() -> list:
print("== generate called")
return [1, 2, 3]
@task(max_retries=3, retry_delay=timedelta(seconds=0))
def task_runner(x: int) -> None:
print('== task runner called with {}'.format(x))
@task
def print_status(text: any) -> None:
print('= status: {}'.format(text))
with Flow("random-mapping") as f:
print_status('=== starting the flow')
values = generate_task_list()
print_status(values)
final = task_runner.map(x=values)
print('==== START ====')
f.run()
print('==== END ====')
By running this multiple times I get different results but never what I expected. What am I missing? Thank you!Brett Naul
09/18/2019, 9:00 PMRemoteEnvironment
and my existing cluster already loads GOOGLE_APPLICATION_CREDENTIALS
from a k8s secret; for a DaskKubernetesEnvironment
I guess I need to pull it from a prefect Secret
instead. do I just need to add os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = Secret(...).get()
everywhere or is there an easier way?Argemiro Neto
09/19/2019, 12:04 AMtasks
inside tasks
and flows
inside tasks
? I was able to call a flow
inside a task but got a ValueError: Could not infer an active Flow context
error every time I tried to send parameters to the flows
from the tasks
. Got the same error when calling a task
within a task
.Alex Lopes
09/19/2019, 1:32 PMJerry Thomas
09/20/2019, 5:06 AMChristopher Stokes
09/20/2019, 9:13 PMCreateRidAlert
task twice and throws warnings:
create_alert = CreateAlert()
create_rid_alert = CreateRidAlert()
with Flow('Alert Flow') as flow:
alert_json = Parameter(name="alert_json", required=True)
alert = create_alert(alert_json)
rid_alert = create_rid_alert(alert)
ifelse(has_rid(alert), rid_alert, None)
rid = extract_rid(rid_alert)
I'd like to remove the rid_alert = create_rid_alert(alert)
line but then I don't know how to wire up the ifelse
line with a result to pass to extract_rid
for data flow. This may not be a clear question.Markus Binsteiner
09/21/2019, 8:17 AMAiden Price
09/21/2019, 11:32 PMParameter
? My actual use case is to have a dict which is a copy of one of my database tables which the incoming data needs to refer to to find its foreign key each time. If I find a new name that I donāt have in my dict Iāll need to update the table in the database and mutate my dictionary, then reference the mutated version in subsequent `flow.run()`s. Iām only new to Prefect but I have to say I love your work, thank you!Jason
09/23/2019, 6:01 PMJason
09/23/2019, 6:01 PMGary Liao
09/25/2019, 1:47 AMDavid Ojeda
09/25/2019, 7:01 PMobjects.inv
but I cannot find it for Prefect⦠Is this supported ?Gary Liao
09/26/2019, 3:23 AMJoe Schmid
09/26/2019, 5:58 PMAiden Price
09/27/2019, 5:04 AMtask.map()
over a Pandas DataFrame? So you could get each row as a Series for each task call. Or does it only work with generic iterables? Thank you.Tobias Schmidt
09/27/2019, 12:45 PMChristopher Stokes
09/27/2019, 8:32 PMwith raise_on_exception():
state = flow.run(parameters)
I get raised exceptions on conditional misses - like switch
statements missing. The raise signals.SKIP('Provided value "{}" did not match "{}"'.format(value, self.value)
call ends up firing and stopping executionChristopher Stokes
09/27/2019, 8:34 PM